This chapter provides instructions for using entry processors and aggregators to perform data grid processing across a cluster. These data grid features perform in a similar manner to other map-reduce patterns and allow the processing of large amounts of data at very low latencies.
This chapter includes the following sections:
Coherence provides the ideal infrastructure for building data grid services and the client and server-based applications that use a data grid. At a basic level, Coherence can manage large amounts of data across a large number of servers in a grid; it can provide close to zero latency access for that data; it supports parallel queries across that data in a map-reduce manner; and it supports integration with database and EIS systems that act as the system of record for that data.
Coherence provides for the ability to execute an agent against an entry in any map of data managed by a data grid:
map.invoke(key, agent);
In the case of partitioned data, the agent executes on the grid node that owns the data. The queuing, concurrency management, agent execution, data access by the agent, and data modification by the agent all occur on that grid node. (Only the synchronous backup of the resultant data modification, if any, requires additional network traffic.) For many processing purposes, it is much more efficient to move the serialized form of the agent (at most a few hundred bytes) than to handle distributed concurrency control, coherency and data updates.
For request and response processing, the agent returns a result:
Object oResult = map.invoke(key, agent);
Coherence, as a data grid, determines the location to execute the agent based on the configuration for the data topology. It moves the agent to the determined location, executes the agent (automatically handling concurrency control for the item while executing the agent), backs up the modifications (if any), and returns a result.
Coherence provides map-reduce functionality which allows agents to be executed in parallel against a collection of entries across all nodes in the grid. Parallel execution allows large amounts of data to be processed by balancing the work across the grid. The invokeAll
method is used as follows:
map.invokeAll(collectionKeys, agent);
For request and response processing, the agent returns one result for each key processed:
Map mapResults = map.invokeAll(collectionKeys, agent);
Coherence determines the optimal location(s) to execute the agent based on the configuration for the data topology. It then moves the agent to the determined locations, executes the agent (automatically handling concurrency control for the item(s) while executing the agent), backs up the modifications (if any), and returns the coalesced results. See "Performing Data Grid Aggregation" for instructions on performing aggregation against a result set.
Coherence supports the ability to query across the entire data grid. For details on creating queries, see Chapter 22, "Querying Data In a Cache." For example, in a trading system it is possible to query for all open Order
objects for a particular trader:
Example 24-1 Querying Across a Data Grid
NamedCache map = CacheFactory.getCache("trades"); Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid), new EqualsFilter("getStatus", Status.OPEN)); Set setOpenTradeIds = mapTrades.keySet(filter);
By combining this feature with the use of parallel executions in the data grid, Coherence provides the ability to execute an agent against a query. As in the previous section, the execution occurs in parallel, and instead of returning the identities or entries that match the query, Coherence executes the agent against the entries:
map.invokeAll(filter, agent);
For request and response processing, the agent returns one result for each key processed:
Map mapResults = map.invokeAll(filter, agent);
Coherence combines parallel query and parallel execution to achieve query-based agent invocation against a data grid.
Passing an instance of AlwaysFilter
(or null
) to the invokeAll
method causes the passed agent to be executed against all entries in the InvocableMap
:
map.invokeAll((Filter) null, agent);
As with the other types of agent invocation, request and response processing is supported:
Map mapResults = map.invokeAll((Filter) null, agent);
An application can process all the data spread across a particular map in the data grid with a single line of code.
An agent implements the EntryProcessor
interface, typically by extending the AbstractProcessor
class.
Several agents are included with Coherence, including:
AbstractProcessor
- an abstract base class for building an EntryProcessor
ExtractorProcessor
- extracts and returns a value (such as a property value) from an object stored in an InvocableMap
CompositeProcessor
- bundles a collection of EntryProcessor
objects that are invoked sequentially against the same entry
ConditionalProcessor
- conditionally invokes an EntryProcessor
if a Filter
against the entry-to-process evaluates to true
PropertyProcessor
- an abstract base class for EntryProcessor
implementations that depend on a PropertyManipulator
NumberIncrementor
- pre- or post-increments any property of a primitive integral type, and Byte
, Short
, Integer
, Long
, Float
, Double
, BigInteger
, BigDecimal
NumberMultiplier
- multiplies any property of a primitive integral type, and Byte
, Short
, Integer
, Long
, Float
, Double
, BigInteger
, BigDecimal
, and returns either the previous or new value
The EntryProcessor
interface (contained within the InvocableMap
interface) contains only two methods: process
and processAll
. The AbstractProcessor
provides the default implementation of the processAll
method. When processing multiple keys, a single EntryProcessor
object is re-used for all the keys and its state should not be mutated.
Note:
If theprocessAll
call throws an exception, changes are only made to the underlying Map
for entries that were removed from the setEntries
. Changes that are made to the remaining entries are not processed.The InvocableMap.Entry
that is passed to an EntryProcessor
is an extension of the Map.Entry
interface that allows an EntryProcessor
implementation to obtain the necessary information about the entry and to make the necessary modifications in the most efficient manner possible.
Entry processors can update cache entries in multiple caches within a single process
or processAll
operation. The caches must be managed by the same service and the entries must be located in the same partition. For details about ensuring that entries are in the same partition, see "Specifying Data Affinity".
The process
and processAll
operations are performed in a transaction-like manner that uses implicit locks when accessing, inserting, updating, modifying, or removing cache entries. If an exception is thrown during the processing of the entries, the entries are rolled back leaving all underlying values unchanged. The processAll
operation is atomic with respect to all entries within a single partition (or member if no service threads are configured) rather than the individual entry or entire request.
Note:
The implicit lock may create a deadlock if entries are locked in conflicting orders on different threads. The application is responsible for ensuring that cache entries are accessed (locked) in a deadlock-free manner. In the case where a deadlock is detected, an exception is thrown but the underlying service is not stopped.The com.tangosol.net.BackingMapContext
API is used to process entries in multiple caches and provides a way to directly access entries in a cache's backing map. The backing map is the actual Map
implementation where entries are stored (as opposed to the logical representation of a cache typically used by an application). Entries in a backing map are stored in binary format and therefore require an application to handle the serialized form of an entry. For details on backing maps, see Chapter 14, "Implementing Storage and Backing Maps."
The com.tangosol.util.BinaryEntry
API provides easy access to a backing map context and is typically used by an application. The following sample code demonstrates how to update entries in two different caches within the process
method of an entry processor using the BinaryEntry
API.
public Object process(Entry entry) { BinaryEntry binEntry = (BinaryEntry) entry; Binary binKey = binEntry.getBinaryKey(); Trade trade = (Trade) binEntry.getValue(); // Update a Trade object in cache1 trade.setPrice(trade.getPrice() + factor); binEntry.setValue(trade); // update a Trade object in cache2 BackingMapManagerContext ctx = binEntry.getContext(); BinaryEntry binEntry2 = (BinaryEntry) ctx.getBackingMapContext("cache2").getBackingMapEntry(binKey); Trade trade2 = (Trade) binEntry2.getValue(); trade2.setPrice(trade2.getPrice() + factor); binEntry2.setValue(trade2); return null; }
Note:
ThegetBackingMapEntry
method may only be called within the context of an entry processor invocation. Any changes made to the entry are persisted with the same lifecycle as those made by the enclosing invocation. The returned entry is only valid for the duration of the enclosing invocation and multiple calls to this method within the same invocation context returns the same entry object.The processAll
method of the AbstractProcessor
class returns a map of results to a client application. The map contains the keys and values for every entry that was processed. Most often, the entry processor returns results that the client uses. However, there may be situations where some results are not usable by the client. More importantly, there may be situations where the processor must evaluate all the entries in a cache; in which case, the return map contains every key in the cache. In both situations, the agent should be designed to ignore results that are not wanted.
Designing an agent that only returns wanted results is a good pattern and best practice, because it:
Makes the client's memory footprint independent of the size of the cache.
Avoids transferring all affected keys to the client which could result in an OutOfMemoryError
exception on the client if the cache is too large.
Avoids deserialization costs in the client for all keys.
Avoids transferring the map and all keys through a proxy node (for Extend clients).
To ignore entry processor results, override the processor's processAll
method to return an empty Map
or null
. The following example demonstrates a simple entry processor agent that always returns null
after processing an entry. The example is not very realistic but does show how to override the processAll
method.
public static class Agent implements InvocableMap.EntryProcessor { private static final long serialVersionUID = 1L; @Override public Object process(Entry entry) { return null; } @Override public Map processAll(Set set) { for (Entry entry : (Set<Entry>) set) { process(entry); } return null; }
Entry processors can be invoked asynchronously using the AsynchronousProcessor
class. The class implements the standard Java Future
interface and also includes a Coherence-specific flow control mechanism to guard against excessive backlogs.
The AsynchronousProcessor
class is used to wrap an entry processor implementation. For example:
UpdaterProcessor up = new UpdaterProcessor(null, value); AsynchronousProcessor ap = new AsynchronousProcessor(up); cache.invokeAll(filter, ap); ap.getResult();
The above example invokes the underlying entry processor and uses automatic flow control (as defined by the underlying service's flow control logic) and a default unit-of-order ID (assigned to the calling thread's hashCode
– as a thread's requests are naturally expected to execute in order). Ordering is guaranteed for each partition even during failover. Additional constructors are available to manually control request flow and assign the unit-of-order ID as required.
For advanced use cases, the AsynchronousProcessor
class can be extended to define custom asynchronous functionality. The following example extends the AsynchrounousProcessor
class and overrides the onResult
, onComplete
and onException
methods. Refer to the Java API Reference for Oracle Coherence for details on the AsynchrounousProcessor
class and its methods.
Note:
Overriding implementations of theonComplete
, onResult
, and onException
methods must be non-blocking and short-lived, because this call is made on the service thread of the client and blocks processing for responses on other threads.AsynchronousProcessor processor = new AsynchronousProcessor(null) { public synchronized void onResult(Entry entry) { super.onResult(entry); // process the result } public void onComplete() { super.onComplete(); if (m_eReason == null) { // process the result } else { // process the (potentially partial) failure } } public void onException(Throwable eReason) { super.onException(eReason); // process the observed exception } };
In addition to scalar agents, the InvocableMap
interface also supports entry aggregators that perform operations against a subset of entries to obtain a single result. Entry aggregation occurs in parallel across the grid to provide map-reduce support when working with large amounts of data.
Example 24-2 Aggregation in the InvocableMap API
/** * Perform an aggregating operation against the entries specified by the * passed keys. * * @param collKeys the Collection of keys that specify the entries within * this Map to aggregate across * @param agent the EntryAggregator that is used to aggregate across * the specified entries of this Map * * @return the result of the aggregation */ public Object aggregate(Collection collKeys, EntryAggregator agent); /** * Perform an aggregating operation against the set of entries that are * selected by the given Filter. * <p/> * <b>Note:</b> calling this method on partitioned caches requires a * Coherence Enterprise Edition (or higher) license. * * @param filter the Filter that is used to select entries within this * Map to aggregate across * @param agent the EntryAggregator that is used to aggregate across * the selected entries of this Map * * @return the result of the aggregation */ public Object aggregate(Filter filter, EntryAggregator agent);
A simple EntryAggregator
processes a set of InvocableMap
.Entry
objects to achieve a result:
Example 24-3 EntryAggregator API
/** * An EntryAggregator represents processing that can be directed to occur * against some subset of the entries in an InvocableMap, resulting in a * aggregated result. Common examples of aggregation include functions * such as min(), max() and avg(). However, the concept of aggregation * applies to any process that must evaluate a group of entries to * come up with a single answer. */ public interface EntryAggregator extends Serializable { /** * Process a set of InvocableMap Entry objects to produce an * aggregated result. * * @param setEntries a Set of read-only InvocableMap Entry objects to * aggregate * * @return the aggregated result from processing the entries */ public Object aggregate(Set setEntries); }
For efficient execution in a Data Grid, an aggregation process must be designed to operate in a parallel manner.
Example 24-4 ParallelAwareAggregator API for running Aggregation in Parallel
/** * A ParallelAwareAggregator is an advanced extension to EntryAggregator * that is explicitly capable of being run in parallel, for example in a * distributed environment. */ public interface ParallelAwareAggregator extends EntryAggregator { /** * Get an aggregator that can take the place of this aggregator in * situations in which the InvocableMap can aggregate in parallel. * * @return the aggregator that is run in parallel */ public EntryAggregator getParallelAggregator(); /** * Aggregate the results of the parallel aggregations. * * @return the aggregation of the parallel aggregation results */ public Object aggregateResults(Collection collResults); }
Coherence comes with all of the natural aggregation functions, including:
Note:
All aggregators that come with Coherence are parallel-aware.See the com.tangosol.util.aggregator
package for a list of Coherence aggregators. To implement your own aggregator, see the AbstractAggregator
abstract base class.
Coherence provides an Invocation Service which allows execution of single-pass agents (called Invocable objects) anywhere within the grid. The agents can be executed on a particular node in the grid, in parallel on a particular set of nodes in the grid, or in parallel on all nodes of the grid.
An invocation service is configured using the <invocation-scheme>
element in the cache configuration file. Using the name of the service, the application can easily obtain a reference to the service:
InvocationService service = (InvocationService)CacheFactory.getService ("MyService");
Agents are simply runnable classes that are part of the application. An example of a simple agent is one designed to request a GC from the JVM:
Example 24-5 Simple Agent to Request Garbage Collection
/** * Agent that issues a garbage collection. */ public class GCAgent extends AbstractInvocable { public void run() { System.gc(); } }
To execute that agent across the entire cluster, it takes one line of code:
service.execute(new GCAgent(), null, null);
Here is an example of an agent that supports a grid-wide request/response model:
Example 24-6 Agent to Support a Grid-Wide Request and Response Model
/** * Agent that determines how much free memory a grid node has. */ public class FreeMemAgent extends AbstractInvocable { public void run() { Runtime runtime = Runtime.getRuntime(); int cbFree = runtime.freeMemory(); int cbTotal = runtime.totalMemory(); setResult(new int[] {cbFree, cbTotal}); } }
To execute that agent across the entire grid and retrieve all the results from it, it still takes only one line of code:
Map map = service.query(new FreeMemAgent(), null);
While it is easy to do a grid-wide request/response, it takes a bit more code to print the results:
Example 24-7 Printing the Results from a Grid-Wide Request or Response
Iterator iter = map.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); Member member = (Member) entry.getKey(); int[] anInfo = (int[]) entry.getValue(); if (anInfo != null) // nullif member died System.out.println("Member " + member + " has " + anInfo[0] + " bytes free out of " + anInfo[1] + " bytes total"); }
The agent operations can be stateful, which means that their invocation state is serialized and transmitted to the grid nodes on which the agent is to be run.
Example 24-8 Stateful Agent Operations
/** * Agent that carries some state with it. */ public class StatefulAgent extends AbstractInvocable { public StatefulAgent(String sKey) { m_sKey = sKey; } public void run() { // the agent has the key that it was constructed with String sKey = m_sKey; // ... } private String m_sKey; }
Coherence provides a grid-enabled implementation of the CommonJ Work Manager. Using a Work Manager, an application can submit a collection of work that must be executed. The Work Manager distributes that work in such a way that it is executed in parallel, typically across the grid. In other words, if there are ten work items submitted and ten servers in the grid, then each server likely processes one work item. Further, the distribution of work items across the grid can be tailored, so that certain servers (for example, one that acts as a gateway to a particular mainframe service) is the first choice to run certain work items, for sake of efficiency and locality of data.
The application can then wait for the work to be completed, and can provide a timeout for how long it can wait. The API for this purpose is quite powerful, allowing an application to wait for the first work item to complete, or for a specified set of the work items to complete. By combining methods from this API, it is possible to do things like "Here are 10 items to execute; for these 7 unimportant items, wait no more than 5 seconds, and for these 3 important items, wait no more than 30 seconds".
Example 24-9 Using a Work Manager
Work[] aWork = ... Collection collBigItems = new ArrayList(); Collection collAllItems = new ArrayList(); for (int i = 0, c = aWork.length; i < c; ++i) { WorkItem item = manager.schedule(aWork[i]); if (i < 3) { // the first three work items are the important ones collBigItems.add(item); } collAllItems.add(item); } Collection collDone = manager.waitForAll(collAllItems, 5000L); if (!collDone.containsAll(collBigItems)) { // wait the remainder of 30 seconds for the important work to finish manager.waitForAll(collBigItems, 25000L); }