24 Processing Data In a Cache

This chapter provides instructions for using entry processors and aggregators to perform data grid processing across a cluster. These data grid features perform in a similar manner to other map-reduce patterns and allow the processing of large amounts of data at very low latencies.

This chapter includes the following sections:

24.1 Overview of Processing Data In a Cache

Coherence provides the ideal infrastructure for building data grid services and the client and server-based applications that use a data grid. At a basic level, Coherence can manage large amounts of data across a large number of servers in a grid; it can provide close to zero latency access for that data; it supports parallel queries across that data in a map-reduce manner; and it supports integration with database and EIS systems that act as the system of record for that data.

24.1.1 Performing Targeted Processing

Coherence provides for the ability to execute an agent against an entry in any map of data managed by a data grid:

map.invoke(key, agent);

In the case of partitioned data, the agent executes on the grid node that owns the data to execute against. The queuing, concurrency management, agent execution, data access by the agent, and data modification by the agent all occur on that grid node. (Only the synchronous backup of the resultant data modification, if any, requires additional network traffic.) For many processing purposes, it is much more efficient to move the serialized form of the agent (at most a few hundred bytes) than to handle distributed concurrency control, coherency and data updates.

For request and response processing, the agent returns a result:

Object oResult = map.invoke(key, agent);

Coherence, as a data grid, determines the location to execute the agent based on the configuration for the data topology. It moves the agent to the determined location, executes the agent (automatically handling concurrency control for the item while executing the agent), backs up the modifications (if any), and returns a result.

24.1.2 Performing Parallel Processing

Coherence provides map-reduce functionality which allows agents to be executed in parallel against a collection of entries across all nodes in the grid. Parallel execution allows large amounts of data to be processed by balancing the work across the grid. The invokeAll method is used as follows:

map.invokeAll(collectionKeys, agent);

For request and response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(collectionKeys, agent);

Coherence determines the optimal location(s) to execute the agent based on the configuration for the data topology. It then moves the agent to the determined locations, executes the agent (automatically handling concurrency control for the item(s) while executing the agent), backs up the modifications (if any), and returns the coalesced results. See "Performing Data Grid Aggregation" for instructions on performing aggregation against a result set.

24.1.3 Performing Query-Based Processing

Coherence supports the ability to query across the entire data grid. For details on creating queries, see Chapter 22, "Querying Data In a Cache." For example, in a trading system it is possible to query for all open Order objects for a particular trader:

Example 24-1 Querying Across a Data Grid

NamedCache map    = CacheFactory.getCache("trades");
Filter     filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                                  new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTradeIds = mapTrades.keySet(filter);

By combining this feature with the use of parallel executions in the data grid, Coherence provides the ability to execute an agent against a query. As in the previous section, the execution occurs in parallel, and instead of returning the identities or entries that match the query, Coherence executes the agents against the entries:

map.invokeAll(filter, agent);

For request and response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(filter, agent);

Coherence combines parallel query and parallel execution to achieve query-based agent invocation against a data grid.

24.1.4 Performing Data-Grid-Wide Processing

Passing an instance of AlwaysFilter (or null) to the invokeAll method causes the passed agent to be executed against all entries in the InvocableMap:

map.invokeAll((Filter) null, agent);

As with the other types of agent invocation, request and response processing is supported:

Map mapResults = map.invokeAll((Filter) null, agent);

An application can process all the data spread across a particular map in the data grid with a single line of code.

24.2 Using Agents for Targeted, Parallel and Query-Based Processing

An agent implements the EntryProcessor interface, typically by extending the AbstractProcessor class.

Several agents are included with Coherence, including:

  • AbstractProcessor - an abstract base class for building an EntryProcessor

  • ExtractorProcessor - extracts and returns a value (such as a property value) from an object stored in an InvocableMap

  • CompositeProcessor - bundles a collection of EntryProcessor objects that are invoked sequentially against the same entry

  • ConditionalProcessor - conditionally invokes an EntryProcessor if a Filter against the entry-to-process evaluates to true

  • PropertyProcessor - an abstract base class for EntryProcessor implementations that depend on a PropertyManipulator

  • NumberIncrementor - pre- or post-increments any property of a primitive integral type, and Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal

  • NumberMultiplier - multiplies any property of a primitive integral type, and Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal, and returns either the previous or new value

The EntryProcessor interface (contained within the InvocableMap interface) contains only two methods: process and processAll. The AbstractProcessor provides the default implementation of the processAll method. When processing multiple keys, a single EntryProcessor object is re-used for all the keys and its state should not be mutated.

Note:

If the processAll call throws an exception, changes are only made to the underlying Map for entries that were removed from the setEntries. Changes that are made to the remaining entries are not processed.

The InvocableMap.Entry that is passed to an EntryProcessor is an extension of the Map.Entry interface that allows an EntryProcessor implementation to obtain the necessary information about the entry and to make the necessary modifications in the most efficient manner possible.

24.2.1 Processing Entries in Multiple Caches

Entry processors can update cache entries in multiple caches within a single process or processAll operation. The caches must be managed by the same service and the entries must be located in the same partition. For details about ensuring that entries are in the same partition, see "Specifying Data Affinity".

The process and processAll operations are performed in a transaction-like manner that uses implicit locks when accessing, inserting, updating, modifying, or removing cache entries. If an exception is thrown during the processing of the entries, the entries are rolled back leaving all underlying values unchanged. The processAll operation is atomic with respect to all entries within a single partition (or member if no service threads are configured) rather than the individual entry or entire request.

Note:

The implicit lock may create a deadlock if entries are locked in conflicting orders on different threads. The application is responsible for ensuring that cache entries are accessed (locked) in a deadlock-free manner. In the case where a deadlock is detected, an exception is thrown but the underlying service is not stopped.

The com.tangosol.net.BackingMapContext API is used to process entries in multiple caches and provides a way to directly access entries in a cache's backing map. The backing map is the actual Map implementation where entries are stored (as opposed to the logical representation of a cache typically used by an application). Entries in a backing map are stored in binary format and therefore require an application to handle the serialized form of an entry. For details on backing maps, see Chapter 14, "Implementing Storage and Backing Maps."

The com.tangosol.util.BinaryEntry API provides easy access to a backing map context and is typically used by an application. The following sample code demonstrates how to update entries in two different caches within the process method of an entry processor using the BinaryEntry API.

public Object process(Entry entry) {
   BinaryEntry binEntry = (BinaryEntry) entry;
   Binary binKey = binEntry.getBinaryKey();
   Trade trade = (Trade) binEntry.getValue();
       
   // Update a Trade object in cache1

   trade.setPrice(trade.getPrice() + factor);
   binEntry.setValue(trade);  
 
   // update a Trade object in cache2

   BackingMapManagerContext ctx = binEntry.getContext();
   BinaryEntry binEntry2 = 
      (BinaryEntry) ctx.getBackingMapContext("cache2").getBackingMapEntry(binKey);
   Trade trade2 = (Trade) binEntry2.getValue();
   trade2.setPrice(trade2.getPrice() + factor);
   binEntry2.setValue(trade2);

   return null;
}

Note:

The getBackingMapEntry method may only be called within the context of an entry processor invocation. Any changes made to the entry are persisted with the same lifecycle as those made by the enclosing invocation. The returned entry is only valid for the duration of the enclosing invocation and multiple calls to this method within the same invocation context returns the same entry object.

24.2.2 Ignoring the Results of an Entry Processor

The processAll method of the AbstractProcessor class returns a map of results to a client application. The map contains the keys and values for every entry that was processed. Most often, the entry processor returns results that the client uses. However, there may be situations where some results are not usable by the client. More importantly, there may be situations where the processor must evaluate all the entries in a cache; in which case, the return map contains every key in the cache. In both situations, the agent should be designed to ignore results that are not wanted.

Designing an agent that only returns wanted results is a good pattern and best practice, because it:

  • Makes the client's memory footprint independent of the size of the cache.

  • Avoids transferring all affected keys to the client which could result in an OutOfMemoryError exception on the client if the cache is too large.

  • Avoids deserialization costs in the client for all keys.

  • Avoids transferring the map and all keys through a proxy node (for Extend clients).

To ignore entry processor results, override the processor's processAll method to return an empty Map or null. The following example demonstrates a simple entry processor agent that always returns null after processing an entry. The example is not very realistic but does show how to override the processAll method.

public static class Agent
   implements InvocableMap.EntryProcessor
   {
   private static final long serialVersionUID = 1L;
   
   @Override
   public Object process(Entry entry)
      {
      return null;
      }
    
   @Override
   public Map processAll(Set set)
      {
      for (Entry entry : (Set<Entry>) set)
         {
         process(entry);
         }
      return null;
      }

24.3 Performing Data Grid Aggregation

In addition to scalar agents, the InvocableMap interface also supports entry aggregators that perform operations against a subset of entries to obtain a single result. Entry aggregation occurs in parallel across the grid to provide map-reduce support when working with large amounts of data.

Example 24-2 Aggregation in the InvocableMap API

/**
* Perform an aggregating operation against the entries specified by the
* passed keys.
*
* @param collKeys  the Collection of keys that specify the entries within
*                  this Map to aggregate across
* @param agent     the EntryAggregator that is used to aggregate across
*                  the specified entries of this Map
*
* @return the result of the aggregation
*/
public Object aggregate(Collection collKeys, EntryAggregator agent);

/**
* Perform an aggregating operation against the set of entries that are
* selected by the given Filter.
* <p/>
* <b>Note:</b> calling this method on partitioned caches requires a
* Coherence Enterprise Edition (or higher) license.
*
* @param filter  the Filter that is used to select entries within this
*                Map to aggregate across
* @param agent   the EntryAggregator that is used to aggregate across
*                the selected entries of this Map
*
* @return the result of the aggregation
*/
public Object aggregate(Filter filter, EntryAggregator agent);

A simple EntryAggregator processes a set of InvocableMap.Entry objects to achieve a result:

Example 24-3 EntryAggregator API

/**
* An EntryAggregator represents processing that can be directed to occur
* against some subset of the entries in an InvocableMap, resulting in a
* aggregated result. Common examples of aggregation include functions
* such as min(), max() and avg(). However, the concept of aggregation
* applies to any process that must evaluate a group of entries to
* come up with a single answer.
*/
public interface EntryAggregator
        extends Serializable
    {
    /**
    * Process a set of InvocableMap Entry objects to produce an
    * aggregated result.
    *
    * @param setEntries  a Set of read-only InvocableMap Entry objects to
    *                    aggregate
    *
    * @return the aggregated result from processing the entries
    */
    public Object aggregate(Set setEntries);
    }

For efficient execution in a Data Grid, an aggregation process must be designed to operate in a parallel manner.

Example 24-4 ParallelAwareAggregator API for running Aggregation in Parallel

/**
* A ParallelAwareAggregator is an advanced extension to EntryAggregator
* that is explicitly capable of being run in parallel, for example in a
* distributed environment.
*/
public interface ParallelAwareAggregator
        extends EntryAggregator
    {
    /**
    * Get an aggregator that can take the place of this aggregator in
    * situations in which the InvocableMap can aggregate in parallel.
    *
    * @return the aggregator that is run in parallel
    */
    public EntryAggregator getParallelAggregator();

    /**
    * Aggregate the results of the parallel aggregations.
    *
    * @return the aggregation of the parallel aggregation results
    */
    public Object aggregateResults(Collection collResults);
    }

Coherence comes with all of the natural aggregation functions, including:

24.4 Performing Node-Based Processing

Coherence provides an Invocation Service which allows execution of single-pass agents (called Invocable objects) anywhere within the grid. The agents can be executed on any particular node of the grid, in parallel on any particular set of nodes in the grid, or in parallel on all nodes of the grid.

An invocation service is configured using the <invocation-scheme> element in the cache configuration file. Using the name of the service, the application can easily obtain a reference to the service:

InvocationService service = (InvocationService)CacheFactory.getService
("MyService");

Agents are simply runnable classes that are part of the application. An example of a simple agent is one designed to request a GC from the JVM:

Example 24-5 Simple Agent to Request Garbage Collection

/**
* Agent that issues a garbage collection.
*/
public class GCAgent
        extends AbstractInvocable
    {
    public void run()
        {
        System.gc();
        }
    }

To execute that agent across the entire cluster, it takes one line of code:

service.execute(new GCAgent(), null, null);

Here is an example of an agent that supports a grid-wide request/response model:

Example 24-6 Agent to Support a Grid-Wide Request and Response Model

/**
* Agent that determines how much free memory a grid node has.
*/
public class FreeMemAgent
        extends AbstractInvocable
    {
    public void run()
        {
        Runtime runtime = Runtime.getRuntime();
        int cbFree  = runtime.freeMemory();
        int cbTotal = runtime.totalMemory();
        setResult(new int[] {cbFree, cbTotal});
        }
    }

To execute that agent across the entire grid and retrieve all the results from it, it still takes only one line of code:

Map map = service.query(new FreeMemAgent(), null);

While it is easy to do a grid-wide request/response, it takes a bit more code to print the results:

Example 24-7 Printing the Results from a Grid-Wide Request or Response

Iterator iter = map.entrySet().iterator();
while (iter.hasNext())
    {
    Map.Entry entry  = (Map.Entry) iter.next();
    Member    member = (Member) entry.getKey();
    int[]     anInfo = (int[]) entry.getValue();
    if (anInfo != null) // nullif member died
        System.out.println("Member " + member + " has "
            + anInfo[0] + " bytes free out of "
            + anInfo[1] + " bytes total");
    }

The agent operations can be stateful, which means that their invocation state is serialized and transmitted to the grid nodes on which the agent is to be run.

Example 24-8 Stateful Agent Operations

/**
* Agent that carries some state with it.
*/
public class StatefulAgent
        extends AbstractInvocable
    {
    public StatefulAgent(String sKey)
        {
        m_sKey = sKey;
        }

    public void run()
        {
        // the agent has the key that it was constructed with
        String sKey = m_sKey;
        // ...
        }

    private String m_sKey;
    }

24.5 Using a Work Manager

Coherence provides a grid-enabled implementation of the CommonJ Work Manager. Using a Work Manager, an application can submit a collection of work that must be executed. The Work Manager distributes that work in such a way that it is executed in parallel, typically across the grid. In other words, if there are ten work items submitted and ten servers in the grid, then each server likely processes one work item. Further, the distribution of work items across the grid can be tailored, so that certain servers (for example, one that acts as a gateway to a particular mainframe service) is the first choice to run certain work items, for sake of efficiency and locality of data.

The application can then wait for the work to be completed, and can provide a timeout for how long it can wait. The API for this purpose is quite powerful, allowing an application to wait for the first work item to complete, or for a specified set of the work items to complete. By combining methods from this API, it is possible to do things like "Here are 10 items to execute; for these 7 unimportant items, wait no more than 5 seconds, and for these 3 important items, wait no more than 30 seconds".

Example 24-9 Using a Work Manager

Work[] aWork = ...
Collection collBigItems = new ArrayList();
Collection collAllItems = new ArrayList();
for (int i = 0, c = aWork.length; i < c; ++i)
    {
    WorkItem item = manager.schedule(aWork[i]);

    if (i < 3)
        {
        // the first three work items are the important ones
        collBigItems.add(item);
        }

    collAllItems.add(item);
    }

Collection collDone = manager.waitForAll(collAllItems, 5000L);
if (!collDone.containsAll(collBigItems))
    {
    // wait the remainder of 30 seconds for the important work to finish
    manager.waitForAll(collBigItems, 25000L);
    }