Provide a Data Grid

Overview

Coherence provides the ideal infrastructure for building Data Grid services, as well as the client and server-based applications that utilize a Data Grid. At a basic level, Coherence can manage an immense amount of data across a large number of servers in a grid; it can provide close to zero latency access for that data; it supports parallel queries across that data; and it supports integration with database and EIS systems that act as the system of record for that data. For more information on the infrastructure for the Data Grid features in Coherence, refer to the discussion on Data Fabric capabilities. Additionally, Coherence provides a number of services that are ideal for building effective data grids.

All of the Data Grid capabilities described below are features of the Coherence Enterprise Edition.

Targeted Execution

Coherence provides for the ability to execute an agent against an entry in any map of data managed by the Data Grid:

map.invoke(key, agent);

In the case of partitioned data, the agent executes on the grid node that owns the data to execute against. This means that the queueing, concurrency management, agent execution, data access by the agent and data modification by the agent all occur on that grid node. (Only the synchronous backup of the resultant data modification, if any, requires additional network traffic.) For many processing purposes, it is much more efficient to move the serialized form of the agent (usually only a few hundred bytes, at most) than to handle distributed concurrency control, coherency and data updates.

For request/response processing, the agent returns a result:

Object oResult = map.invoke(key, agent);

In other words, Coherence as a Data Grid will determine the location to execute the agent based on the configuration for the data topology, move the agent there, execute the agent (automatically handling concurrency control for the item while executing the agent), back up the modifications if any, and return a result.

Parallel Execution

Coherence additionally provides for the ability to execute an agent against an entire collection of entries. In a partitioned Data Grid, the execution occurs in parallel, meaning that the more nodes that are in the grid, the broader the work is load-balanced across the Data Grid:

map.invokeAll(collectionKeys, agent);

For request/response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(collectionKeys, agent);

In other words, Coherence determines the optimal location(s) to execute the agent based on the configuration for the data topology, moves the agent there, executes the agent (automatically handling concurrency control for the item(s) while executing the agent), backing up the modifications if any, and returning the coalesced results.

Query-Based Execution

As discussed in the queryable data fabric topic, Coherence supports the ability to query across the entire data grid. For example, in a trading system it is possible to query for all open "Order" objects for a particular trader:

NamedCache map    = CacheFactory.getCache("trades");
Filter     filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                                  new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTradeIds = mapTrades.keySet(filter);

By combining this feature with Parallel Execution in the data grid, Coherence provides for the ability to execute an agent against a query. As in the previous section, the execution occurs in parallel, and instead of returning the identities or entries that match the query, Coherence executes the agents against the entries:

map.invokeAll(filter, agent);

For request/response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(filter, agent);

In other words, Coherence combines its Parallel Query and its Parallel Execution together to achieve query-based agent invocation against a Data Grid.

Data-Grid-Wide Execution

Passing an instance of AlwaysFilter (or a null) to the invokeAll method will cause the passed agent to be executed against all entries in the InvocableMap:

map.invokeAll((Filter) null, agent);

As with the other types of agent invocation, request/response processing is supported:

Map mapResults = map.invokeAll((Filter) null, agent);

In other words, with a single line of code, an application can process all the data spread across a particular map in the Data Grid.

Agents for Targeted, Parallel and Query-Based Execution

An agent implements the EntryProcessor interface, typically by extending the AbstractProcessor class.

A number of agents are included with Coherence, including:

The EntryProcessor interface (contained within the InvocableMap interface) contains only two methods:

/**
* An invocable agent that operates against the Entry objects within a
* Map.
*/
public interface EntryProcessor
        extends Serializable
    {
    /**
    * Process a Map Entry.
    *
    * @param entry  the Entry to process
    *
    * @return the result of the processing, if any
    */
    public Object process(Entry entry);

    /**
    * Process a Set of InvocableMap Entry objects. This method is
    * semantically equivalent to:
    * <pre>
    *   Map mapResults = new ListMap();
    *   for (Iterator iter = setEntries.iterator(); iter.hasNext(); )
    *       {
    *       Entry entry = (Entry) iter.next();
    *       mapResults.put(entry.getKey(), process(entry));
    *       }
    *   return mapResults;
    * </pre>
    *
    * @param setEntries  a read-only Set of InvocableMap Entry objects to
    *                    process
    *
    * @return a Map containing the results of the processing, up to one
    *         entry for each InvocableMap Entry that was processed, keyed
    *         by the keys of the Map that were processed, with a
    *         corresponding value being the result of the processing for
    *         each key
    */
    public Map processAll(Set setEntries);
    }

(The AbstractProcessor implements the processAll method as described in the JavaDoc above.)

The InvocableMap.Entry that is passed to an EntryProcessor is an extension of the Map.Entry interface that allows an EntryProcessor implementation to obtain the necessary information about the entry and to make the necessary modifications in the most efficient manner possible:

/**
* An InvocableMap Entry contains additional information and exposes
* additional operations that the basic Map Entry does not. It allows
* non-existent entries to be represented, thus allowing their optional
* creation. It allows existent entries to be removed from the Map. It
* supports a number of optimizations that can ultimately be mapped
* through to indexes and other data structures of the underlying Map.
*/
public interface Entry
        extends Map.Entry
    {
    // ----- Map Entry interface ------------------------------------

    /**
    * Return the key corresponding to this entry. The resultant key does
    * not necessarily exist within the containing Map, which is to say
    * that <tt>InvocableMap.this.containsKey(getKey)</tt> could return
    * false. To test for the presence of this key within the Map, use
    * {@link #isPresent}, and to create the entry for the key, use
    * {@link #setValue}.
     *
    * @return the key corresponding to this entry; may be null if the
    *         underlying Map supports null keys
    */
    public Object getKey();

    /**
    * Return the value corresponding to this entry. If the entry does
    * not exist, then the value will be null. To differentiate between
    * a null value and a non-existent entry, use {@link #isPresent}.
    * <p/>
    * <b>Note:</b> any modifications to the value retrieved using this
    * method are not guaranteed to persist unless followed by a
    * {@link #setValue} or {@link #update} call.
    *
    * @return the value corresponding to this entry; may be null if the
    *         value is null or if the Entry does not exist in the Map
    */
    public Object getValue();

    /**
    * Store the value corresponding to this entry. If the entry does
    * not exist, then the entry will be created by invoking this method,
    * even with a null value (assuming the Map supports null values).
    *
    * @param oValue  the new value for this Entry
    *
    * @return the previous value of this Entry, or null if the Entry did
    *         not exist
    */
    public Object setValue(Object oValue);

    // ----- InvocableMap Entry interface ---------------------------

    /**
    * Store the value corresponding to this entry. If the entry does
    * not exist, then the entry will be created by invoking this method,
    * even with a null value (assuming the Map supports null values).
    * <p/>
    * Unlike the other form of {@link #setValue(Object) setValue}, this
    * form does not return the previous value, and as a result may be
    * significantly less expensive (in terms of cost of execution) for
    * certain Map implementations.
    *
    * @param oValue      the new value for this Entry
    * @param fSynthetic  pass true only if the insertion into or
    *                    modification of the Map should be treated as a
    *                    synthetic event
    */
    public void setValue(Object oValue, boolean fSynthetic);

    /**
    * Extract a value out of the Entry's value. Calling this method is
    * semantically equivalent to
    * <tt>extractor.extract(entry.getValue())</tt>, but this method may
    * be significantly less expensive because the resultant value may be
    * obtained from a forward index, for example.
    *
    * @param extractor  a ValueExtractor to apply to the Entry's value
    *
    * @return the extracted value
    */
    public Object extract(ValueExtractor extractor);

    /**
    * Update the Entry's value. Calling this method is semantically
    * equivalent to:
    * <pre>
    *   Object oTarget = entry.getValue();
    *   updater.update(oTarget, oValue);
    *   entry.setValue(oTarget, false);
    * </pre>
    * The benefit of using this method is that it may allow the Entry
    * implementation to significantly optimize the operation, such as
    * for purposes of delta updates and backup maintenance.
    *
    * @param updater  a ValueUpdater used to modify the Entry's value
    */
    public void update(ValueUpdater updater, Object oValue);

    /**
    * Determine if this Entry exists in the Map. If the Entry is not
    * present, it can be created by calling {@link #setValue} or
    * {@link #setValue}. If the Entry is present, it can be destroyed by
    * calling {@link #remove}.
    *
    * @return true iff this Entry is existent in the containing Map
    */
    public boolean isPresent();

    /**
    * Remove this Entry from the Map if it is present in the Map.
    * <p/>
    * This method supports both the operation corresponding to
    * {@link Map#remove} as well as synthetic operations such as
    * eviction. If the containing Map does not differentiate between
    * the two, then this method will always be identical to
    * <tt>InvocableMap.this.remove(getKey())</tt>.
    *
    * @param fSynthetic  pass true only if the removal from the Map
    *                    should be treated as a synthetic event
    */
    public void remove(boolean fSynthetic);
    }

Data Grid Aggregation

While the above agent discussion correspond to scalar agents, the InvocableMap interface also supports aggregation:

/**
* Perform an aggregating operation against the entries specified by the
* passed keys.
*
* @param collKeys  the Collection of keys that specify the entries within
*                  this Map to aggregate across
* @param agent     the EntryAggregator that is used to aggregate across
*                  the specified entries of this Map
*
* @return the result of the aggregation
*/
public Object aggregate(Collection collKeys, EntryAggregator agent);

/**
* Perform an aggregating operation against the set of entries that are
* selected by the given Filter.
* <p/>
* <b>Note:</b> calling this method on partitioned caches requires a
* Coherence Enterprise Edition license.
*
* @param filter  the Filter that is used to select entries within this
*                Map to aggregate across
* @param agent   the EntryAggregator that is used to aggregate across
*                the selected entries of this Map
*
* @return the result of the aggregation
*/
public Object aggregate(Filter filter, EntryAggregator agent);

A simple EntryAggregator processes a set of InvocableMap.Entry objects to achieve a result:

/**
* An EntryAggregator represents processing that can be directed to occur
* against some subset of the entries in an InvocableMap, resulting in a
* aggregated result. Common examples of aggregation include functions
* such as min(), max() and avg(). However, the concept of aggregation
* applies to any process that needs to evaluate a group of entries to
* come up with a single answer.
*/
public interface EntryAggregator
        extends Serializable
    {
    /**
    * Process a set of InvocableMap Entry objects in order to produce an
    * aggregated result.
    *
    * @param setEntries  a Set of read-only InvocableMap Entry objects to
    *                    aggregate
    *
    * @return the aggregated result from processing the entries
    */
    public Object aggregate(Set setEntries);
    }

For efficient execution in a Data Grid, an aggregation process must be designed to operate in a parallel manner. The

/**
* A ParallelAwareAggregator is an advanced extension to EntryAggregator
* that is explicitly capable of being run in parallel, for example in a
* distributed environment.
*/
public interface ParallelAwareAggregator
        extends EntryAggregator
    {
    /**
    * Get an aggregator that can take the place of this aggregator in
    * situations in which the InvocableMap can aggregate in parallel.
    *
    * @return the aggregator that will be run in parallel
    */
    public EntryAggregator getParallelAggregator();

    /**
    * Aggregate the results of the parallel aggregations.
    *
    * @return the aggregation of the parallel aggregation results
    */
    public Object aggregateResults(Collection collResults);
    }

Coherence comes with all of the natural aggregation functions, including:

All aggregators that come with Coherence are parallel-aware.

See the com.tangosol.util.aggregator package for a list of Coherence aggregators. To implement your own aggregator, see the AbstractAggregator abstract base class.

Node-Based Execution

Coherence provides an Invocation Service which allows execution of single-pass agents (called Invocable objects) anywhere within the grid. The agents can be executed on any particular node of the grid, in parallel on any particular set of nodes in the grid, or in parallel on all nodes of the grid.

An invocation service is configured using the invocation-scheme element in the cache configuration file. Using the name of the service, the application can easily obtain a reference to the service:

InvocationService service = CacheFactory.getInvocationService("agents");

Agents are simply runnable classes that are part of the application. The simplest example is a simple agent that is designed to request a GC from the JVM:

/**
* Agent that issues a garbage collection.
*/
public class GCAgent
        extends AbstractInvocable
    {
    public void run()
        {
        System.gc();
        }
    }

To execute that agent across the entire cluster, it takes one line of code:

service.execute(new GCAgent(), null, null);

Here is an example of an agent that supports a grid-wide request/response model:

/**
* Agent that determines how much free memory a grid node has.
*/
public class FreeMemAgent
        extends AbstractInvocable
    {
    public void run()
        {
        Runtime runtime = Runtime.getRuntime();
        int cbFree  = runtime.freeMemory();
        int cbTotal = runtime.totalMemory();
        setResult(new int[] {cbFree, cbTotal});
        }
    }

To execute that agent across the entire grid and retrieve all the results from it, it still takes only one line of code:

Map map = service.query(new FreeMemAgent(), null);

While it is easy to do a grid-wide request/response, it takes a bit more code to print out the results:

Iterator iter = map.entrySet().iterator(); 
while (iter.hasNext())
    {
    Map.Entry entry  = (Map.Entry) iter.next();
    Member    member = (Member) entry.getKey();
    int[]     anInfo = (int[]) entry.getValue();
    if (anInfo != null) // null if member died
        System.out.println("Member " + member + " has "
            + anInfo[0] + " bytes free out of "
            + anInfo[1] + " bytes total");
    }

The agent operations can be stateful, which means that their invocation state is serialized and transmitted to the grid nodes on which the agent is to be run.

/**
* Agent that carries some state with it.
*/
public class StatefulAgent
        extends AbstractInvocable
    {
    public StatefulAgent(String sKey)
        {
        m_sKey = sKey;
        }

    public void run()
        {
        // the agent has the key that it was constructed with
        String sKey = m_sKey;
        // ...
        }

    private String m_sKey;
    }

Work Manager

Coherence provides a grid-enabled implementation of the IBM and BEA CommonJ Work Manager, which is the basis for JSR-237. Once JSR-237 is complete, Tangosol has committed to support the standardized J2EE API for Work Manager as well.

Using a Work Manager, an application can submit a collection of work that needs to be executed. The Work Manager distributes that work in such a way that it is executed in parallel, typically across the grid. In other words, if there are ten work items submitted and ten servers in the grid, then each server will likely process one work item. Further, the distribution of work items across the grid can be tailored, so that certain servers (e.g. one that acts as a gateway to a particular mainframe service) will be the first choice to run certain work items, for sake of efficiency and locality of data.

The application can then wait for the work to be completed, and can provide a timeout for how long it is willing to wait. The API for this purpose is quite powerful, allowing an application to wait for the first work item to complete, or for a specified set of the work items to complete. By combining methods from this API, it is possible to do things like "Here are 10 items to execute; for these 7 unimportant items, wait no more than 5 seconds, and for these 3 important items, wait no more than 30 seconds":

Work[] aWork = ...
Collection collBigItems = new ArrayList();
Collection collAllItems = new ArrayList();
for (int i = 0, c = aWork.length; i < c; ++i)
    {
    WorkItem item = manager.schedule(aWork[i]);

    if (i < 3)
        {
        // the first three work items are the important ones
        collBigItems.add(item);
        }

    collAllItems.add(item);
    }

Collection collDone = manager.waitForAll(collAllItems, 5000L);
if (!collDone.containsAll(collBigItems))
    {
    // wait the remainder of 30 seconds for the important work to finish
    manager.waitForAll(collBigItems, 25000L);
    }

Of course, the best descriptions come from real-world production usage:

Tangosol Coherence Work Manager: Feedback from a Major Financial Institution

Our primary use case for the Work Manager is to allow our application to serve coarse-grained service requests using our blade infrastructure in a standards-based way. We often have what appears to be a simple request, like "give me this family's information." In reality, however, this request expands into a large number of requests to several diverse back-end data sources consisting of web services, RDMBS calls, etc. This use case expands into two different but related problems that we are looking to the distributed version of the work manager to solve.

1. How do we take a coarse-grained request that expands into several fine-grained requests and execute them in parallel to avoid blocking the caller for an unreasonable time? In the above example, we may have to make upwards of 100 calls to various places to retrieve the information. Since J2EE has no legal threading model, and since the threading we observed when trying a message-based approach to this was unacceptable, we decided to use the Coherence Work Manager implementation.

2. Given that we want to make many external system calls in parallel while still leveraging low-cost blades, we are hoping that fanning the required work across many dual processor (logically 4-processor because of hyperthreading) machines allows us to scale an inherently vertical scalability problem with horizontal scalability at the hardware level. We think this is reasonable because the cost to marshall the request to a remote Work Manager instance is small compared to the cost to execute the service, which usually involves dozens or hundreds of milliseconds.

For more information on the Work Manager Specification and API, see Timer and Work Manager for Application Servers on the BEA dev2dev web site and JSR 237.

Summary

Coherence provides an extensive set of capabilities that make Data Grid services simple, seamless and seriously scalable. While the data fabric provides an entire unified view of the complete data domain, the Data Grid features enable applications to take advantage of the partitioning of data that Coherence provides in a scale-out environment.