2 Provide a Data Grid

Coherence provides the ideal infrastructure for building Data Grid services, and the client and server-based applications that use a Data Grid. At a basic level, Coherence can manage an immense amount of data across a large number of servers in a grid; it can provide close to zero latency access for that data; it supports parallel queries across that data; and it supports integration with database and EIS systems that act as the system of record for that data. Additionally, Coherence provides several services that are ideal for building effective data grids.

For more information on the infrastructure for the Data Grid features in Coherence, see Chapter 3, "Provide a Queryable Data Fabric".

Note:

All of the Data Grid capabilities described in the following sections are features of Coherence Enterprise Edition and higher.

2.1 Targeted Execution

Coherence provides for the ability to execute an agent against an entry in any map of data managed by the Data Grid:

map.invoke(key, agent);

In the case of partitioned data, the agent executes on the grid node that owns the data to execute against. This means that the queuing, concurrency management, agent execution, data access by the agent and data modification by the agent all occur on that grid node. (Only the synchronous backup of the resultant data modification, if any, requires additional network traffic.) For many processing purposes, it is much more efficient to move the serialized form of the agent (usually only a few hundred bytes, at most) than to handle distributed concurrency control, coherency and data updates.

For request/response processing, the agent returns a result:

Object oResult = map.invoke(key, agent);

In other words, Coherence as a Data Grid will determine the location to execute the agent based on the configuration for the data topology, move the agent there, execute the agent (automatically handling concurrency control for the item while executing the agent), back up the modifications if any, and return a result.

2.2 Parallel Execution

Coherence additionally provides for the ability to execute an agent against an entire collection of entries. In a partitioned Data Grid, the execution occurs in parallel, meaning that the more nodes that are in the grid, the broader the work is load-balanced across the Data Grid:

map.invokeAll(collectionKeys, agent);

For request/response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(collectionKeys, agent);

In other words, Coherence determines the optimal location(s) to execute the agent based on the configuration for the data topology, moves the agent there, executes the agent (automatically handling concurrency control for the item(s) while executing the agent), backing up the modifications if any, and returning the coalesced results.

2.3 Query-Based Execution

As discussed in Chapter 3, "Provide a Queryable Data Fabric", Coherence supports the ability to query across the entire data grid. For example, in a trading system it is possible to query for all open Order objects for a particular trader:

Example 2-1 Querying Across a Data Grid

NamedCache map    = CacheFactory.getCache("trades");
Filter     filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                                  new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTradeIds = mapTrades.keySet(filter);

By combining this feature with Parallel Execution in the data grid, Coherence provides for the ability to execute an agent against a query. As in the previous section, the execution occurs in parallel, and instead of returning the identities or entries that match the query, Coherence executes the agents against the entries:

map.invokeAll(filter, agent);

For request/response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(filter, agent);

In other words, Coherence combines its Parallel Query and its Parallel Execution together to achieve query-based agent invocation against a Data Grid.

2.4 Data-Grid-Wide Execution

Passing an instance of AlwaysFilter (or a null) to the invokeAll method will cause the passed agent to be executed against all entries in the InvocableMap:

map.invokeAll((Filter) null, agent);

As with the other types of agent invocation, request/response processing is supported:

Map mapResults = map.invokeAll((Filter) null, agent);

In other words, with a single line of code, an application can process all the data spread across a particular map in the Data Grid.

2.5 Agents for Targeted, Parallel and Query-Based Execution

An agent implements the EntryProcessor interface, typically by extending the AbstractProcessor class.

Several agents are included with Coherence, including:

  • AbstractProcessor - an abstract base class for building an EntryProcessor

  • ExtractorProcessor - extracts and returns a specific value (such as a property value) from an object stored in an InvocableMap

  • CompositeProcessor - bundles together a collection of EntryProcessor objects that are invoked sequentially against the same Entry

  • ConditionalProcessor - conditionally invokes an EntryProcessor if a Filter against the Entry-to-process evaluates to true

  • PropertyProcessor - an abstract base class for EntryProcessor implementations that depend on a PropertyManipulator

  • NumberIncrementor - pre- or post-increments any property of a primitive integral type, and Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal

  • NumberMultiplier - multiplies any property of a primitive integral type, and Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal, and returns either the previous or new value

The EntryProcessor interface (contained within the InvocableMap interface) contains only two methods:

Example 2-2 Methods in the EntryProcessor Interface

/**
* An invocable agent that operates against the Entry objects within a
* Map.
*/
public interface EntryProcessor
        extends Serializable
    {
    /**
    * Process a Map Entry.
    *
    * @param entry  the Entry to process
    *
    * @return the result of the processing, if any
    */
    public Object process(Entry entry);

    /**
    * Process a Set of InvocableMap Entry objects. This method is
    * semantically equivalent to:
    * <pre>
    *   Map mapResults = new ListMap();
    *   for (Iterator iter = setEntries.iterator(); iter.hasNext(); )
    *       {
    *       Entry entry = (Entry) iter.next();
    *       mapResults.put(entry.getKey(), process(entry));
    *       }
    *   return mapResults;
    * </pre>
    *
    * @param setEntries  a read-only Set of InvocableMap Entry objects to
    *                    process
    *
    * @return a Map containing the results of the processing, up to one
    *         entry for each InvocableMap Entry that was processed, keyed
    *         by the keys of the Map that were processed, with a
    *         corresponding value being the result of the processing for
    *         each key
    */
    public Map processAll(Set setEntries);
    }

(The AbstractProcessor implements the processAll method as described in the previous example.)

The InvocableMap.Entry that is passed to an EntryProcessor is an extension of the Map.Entry interface that allows an EntryProcessor implementation to obtain the necessary information about the entry and to make the necessary modifications in the most efficient manner possible:

Example 2-3 InvocableMap.Entry API

/**
* An InvocableMap Entry contains additional information and exposes
* additional operations that the basic Map Entry does not. It allows
* non-existent entries to be represented, thus allowing their optional
* creation. It allows existent entries to be removed from the Map. It
* supports several optimizations that can ultimately be mapped
* through to indexes and other data structures of the underlying Map.
*/
public interface Entry
        extends Map.Entry
    {
    // ----- Map Entry interface ------------------------------------

    /**
    * Return the key corresponding to this entry. The resultant key does
    * not necessarily exist within the containing Map, which is to say
    * that <tt>InvocableMap.this.containsKey(getKey)</tt> could return
    * false. To test for the presence of this key within the Map, use
    * {@link #isPresent}, and to create the entry for the key, use
    * {@link #setValue}.
     *
    * @return the key corresponding to this entry; may be null if the
    *         underlying Map supports null keys
    */
    public Object getKey();

    /**
    * Return the value corresponding to this entry. If the entry does
    * not exist, then the value will be null. To differentiate between
    * a null value and a non-existent entry, use {@link #isPresent}.
    * <p/>
    * <b>Note:</b> any modifications to the value retrieved using this
    * method are not guaranteed to persist unless followed by a
    * {@link #setValue} or {@link #update} call.
    *
    * @return the value corresponding to this entry; may be null if the
    *         value is null or if the Entry does not exist in the Map
    */
    public Object getValue();

    /**
    * Store the value corresponding to this entry. If the entry does
    * not exist, then the entry will be created by invoking this method,
    * even with a null value (assuming the Map supports null values).
    *
    * @param oValue  the new value for this Entry
    *
    * @return the previous value of this Entry, or null if the Entry did
    *         not exist
    */
    public Object setValue(Object oValue);

    // ----- InvocableMap Entry interface ---------------------------

    /**
    * Store the value corresponding to this entry. If the entry does
    * not exist, then the entry will be created by invoking this method,
    * even with a null value (assuming the Map supports null values).
    * <p/>
    * Unlike the other form of {@link #setValue(Object) setValue}, this
    * form does not return the previous value, and consequently may be
    * significantly less expensive (in terms of cost of execution) for
    * certain Map implementations.
    *
    * @param oValue      the new value for this Entry
    * @param fSynthetic  pass true only if the insertion into or
    *                    modification of the Map should be treated as a
    *                    synthetic event
    */
    public void setValue(Object oValue, boolean fSynthetic);

    /**
    * Extract a value out of the Entry's value. Calling this method is
    * semantically equivalent to
    * <tt>extractor.extract(entry.getValue())</tt>, but this method may
    * be significantly less expensive because the resultant value may be
    * obtained from a forward index, for example.
    *
    * @param extractor  a ValueExtractor to apply to the Entry's value
    *
    * @return the extracted value
    */
    public Object extract(ValueExtractor extractor);

    /**
    * Update the Entry's value. Calling this method is semantically
    * equivalent to:
    * <pre>
    *   Object oTarget = entry.getValue();
    *   updater.update(oTarget, oValue);
    *   entry.setValue(oTarget, false);
    * </pre>
    * The benefit of using this method is that it may allow the Entry
    * implementation to significantly optimize the operation, such as
    * for purposes of delta updates and backup maintenance.
    *
    * @param updater  a ValueUpdater used to modify the Entry's value
    */
    public void update(ValueUpdater updater, Object oValue);

    /**
    * Determine if this Entry exists in the Map. If the Entry is not
    * present, it can be created by calling {@link #setValue} or
    * {@link #setValue}. If the Entry is present, it can be destroyed by
    * calling {@link #remove}.
    *
    * @return true iff this Entry is existent in the containing Map
    */
    public boolean isPresent();

    /**
    * Remove this Entry from the Map if it is present in the Map.
    * <p/>
    * This method supports both the operation corresponding to
    * {@link Map#remove} and synthetic operations such as
    * eviction. If the containing Map does not differentiate between
    * the two, then this method will always be identical to
    * <tt>InvocableMap.this.remove(getKey())</tt>.
    *
    * @param fSynthetic  pass true only if the removal from the Map
    *                    should be treated as a synthetic event
    */
    public void remove(boolean fSynthetic);
    }

2.6 Data Grid Aggregation

While the agent discussion in the previous section corresponds to scalar agents, the InvocableMap interface also supports aggregation:

Example 2-4 Aggregation in the InvocableMap API

/**
* Perform an aggregating operation against the entries specified by the
* passed keys.
*
* @param collKeys  the Collection of keys that specify the entries within
*                  this Map to aggregate across
* @param agent     the EntryAggregator that is used to aggregate across
*                  the specified entries of this Map
*
* @return the result of the aggregation
*/
public Object aggregate(Collection collKeys, EntryAggregator agent);

/**
* Perform an aggregating operation against the set of entries that are
* selected by the given Filter.
* <p/>
* <b>Note:</b> calling this method on partitioned caches requires a
* Coherence Enterprise Edition (or higher) license.
*
* @param filter  the Filter that is used to select entries within this
*                Map to aggregate across
* @param agent   the EntryAggregator that is used to aggregate across
*                the selected entries of this Map
*
* @return the result of the aggregation
*/
public Object aggregate(Filter filter, EntryAggregator agent);

A simple EntryAggregator processes a set of InvocableMap.Entry objects to achieve a result:

Example 2-5 EntryAggregator API

/**
* An EntryAggregator represents processing that can be directed to occur
* against some subset of the entries in an InvocableMap, resulting in a
* aggregated result. Common examples of aggregation include functions
* such as min(), max() and avg(). However, the concept of aggregation
* applies to any process that must evaluate a group of entries to
* come up with a single answer.
*/
public interface EntryAggregator
        extends Serializable
    {
    /**
    * Process a set of InvocableMap Entry objects to produce an
    * aggregated result.
    *
    * @param setEntries  a Set of read-only InvocableMap Entry objects to
    *                    aggregate
    *
    * @return the aggregated result from processing the entries
    */
    public Object aggregate(Set setEntries);
    }

For efficient execution in a Data Grid, an aggregation process must be designed to operate in a parallel manner. The

Example 2-6 ParallelAwareAggregator API for running Aggregation in Parallel

/**
* A ParallelAwareAggregator is an advanced extension to EntryAggregator
* that is explicitly capable of being run in parallel, for example in a
* distributed environment.
*/
public interface ParallelAwareAggregator
        extends EntryAggregator
    {
    /**
    * Get an aggregator that can take the place of this aggregator in
    * situations in which the InvocableMap can aggregate in parallel.
    *
    * @return the aggregator that will be run in parallel
    */
    public EntryAggregator getParallelAggregator();

    /**
    * Aggregate the results of the parallel aggregations.
    *
    * @return the aggregation of the parallel aggregation results
    */
    public Object aggregateResults(Collection collResults);
    }

Coherence comes with all of the natural aggregation functions, including:

2.7 Node-Based Execution

Coherence provides an Invocation Service which allows execution of single-pass agents (called Invocable objects) anywhere within the grid. The agents can be executed on any particular node of the grid, in parallel on any particular set of nodes in the grid, or in parallel on all nodes of the grid.

An invocation service is configured using the <invocation-scheme> element in the cache configuration file. Using the name of the service, the application can easily obtain a reference to the service:

InvocationService service = CacheFactory.getInvocationService("agents");

Agents are simply runnable classes that are part of the application. The simplest example is a simple agent that is designed to request a GC from the JVM:

Example 2-7 Simple Agent to Request Garbage Collection

/**
* Agent that issues a garbage collection.
*/
public class GCAgent
        extends AbstractInvocable
    {
    public void run()
        {
        System.gc();
        }
    }

To execute that agent across the entire cluster, it takes one line of code:

service.execute(new GCAgent(), null, null);

Here is an example of an agent that supports a grid-wide request/response model:

Example 2-8 Agent to Support a Grid-Wide Request and Response Model

/**
* Agent that determines how much free memory a grid node has.
*/
public class FreeMemAgent
        extends AbstractInvocable
    {
    public void run()
        {
        Runtime runtime = Runtime.getRuntime();
        int cbFree  = runtime.freeMemory();
        int cbTotal = runtime.totalMemory();
        setResult(new int[] {cbFree, cbTotal});
        }
    }

To execute that agent across the entire grid and retrieve all the results from it, it still takes only one line of code:

Map map = service.query(new FreeMemAgent(), null);

While it is easy to do a grid-wide request/response, it takes a bit more code to print out the results:

Example 2-9 Printing the Results from a Grid-Wide Request or Response

Iterator iter = map.entrySet().iterator();
while (iter.hasNext())
    {
    Map.Entry entry  = (Map.Entry) iter.next();
    Member    member = (Member) entry.getKey();
    int[]     anInfo = (int[]) entry.getValue();
    if (anInfo != null) // nullif member died
        System.out.println("Member " + member + " has "
            + anInfo[0] + " bytes free out of "
            + anInfo[1] + " bytes total");
    }

The agent operations can be stateful, which means that their invocation state is serialized and transmitted to the grid nodes on which the agent is to be run.

Example 2-10 Stateful Agent Operations

/**
* Agent that carries some state with it.
*/
public class StatefulAgent
        extends AbstractInvocable
    {
    public StatefulAgent(String sKey)
        {
        m_sKey = sKey;
        }

    public void run()
        {
        // the agent has the key that it was constructed with
        String sKey = m_sKey;
        // ...
        }

    private String m_sKey;
    }

2.8 Work Manager

Coherence provides a grid-enabled implementation of the IBM and BEA CommonJ Work Manager, which is the basis for JSR-237. Once JSR-237 is complete, Oracle has committed to support the standardized Java EE API for Work Manager as well.

Using a Work Manager, an application can submit a collection of work that must be executed. The Work Manager distributes that work in such a way that it is executed in parallel, typically across the grid. In other words, if there are ten work items submitted and ten servers in the grid, then each server will likely process one work item. Further, the distribution of work items across the grid can be tailored, so that certain servers (for example, one that acts as a gateway to a particular mainframe service) will be the first choice to run certain work items, for sake of efficiency and locality of data.

The application can then wait for the work to be completed, and can provide a timeout for how long it is willing to wait. The API for this purpose is quite powerful, allowing an application to wait for the first work item to complete, or for a specified set of the work items to complete. By combining methods from this API, it is possible to do things like "Here are 10 items to execute; for these 7 unimportant items, wait no more than 5 seconds, and for these 3 important items, wait no more than 30 seconds".

Example 2-11 Using a Work Manager

Work[] aWork = ...
Collection collBigItems = new ArrayList();
Collection collAllItems = new ArrayList();
for (int i = 0, c = aWork.length; i < c; ++i)
    {
    WorkItem item = manager.schedule(aWork[i]);

    if (i < 3)
        {
        // the first three work items are the important ones
        collBigItems.add(item);
        }

    collAllItems.add(item);
    }

Collection collDone = manager.waitForAll(collAllItems, 5000L);
if (!collDone.containsAll(collBigItems))
    {
    // wait the remainder of 30 seconds for the important work to finish
    manager.waitForAll(collBigItems, 25000L);
    }

2.8.1 Oracle Coherence Work Manager: Feedback from a Major Financial Institution

Our primary use case for the Work Manager is to allow our application to serve coarse-grained service requests using our blade infrastructure in a standards-based way. We often have what appears to be a simple request, like "give me this family's information." In reality, however, this request expands into a large number of requests to several diverse back-end data sources consisting of web services, RDMBS calls, and so on. This use case expands into two different but related problems that we are looking to the distributed version of the work manager to solve.

  • How do we take a coarse-grained request that expands into several fine-grained requests and execute them in parallel to avoid blocking the caller for an unreasonable time? In the previous example, we may have to make upwards of 100 calls to various places to retrieve the information. Since Java EE has no legal threading model, and since the threading we observed when trying a message-based approach to this was unacceptable, we decided to use the Coherence Work Manager implementation.

  • Given that we want to make many external system calls in parallel while still leveraging low-cost blades, we are hoping that fanning the required work across many dual processor (logically 4-processor because of hyperthreading) machines allows us to scale an inherently vertical scalability problem with horizontal scalability at the hardware level. We think this is reasonable because the cost to marshall the request to a remote Work Manager instance is small compared to the cost to execute the service, which usually involves dozens or hundreds of milliseconds.

2.9 Summary

Coherence provides an extensive set of capabilities that make Data Grid services simple, seamless and seriously scalable. While the data fabric provides an entire unified view of the complete data domain, the Data Grid features enable applications to take advantage of the partitioning of data that Coherence provides in a scale-out environment.