29 Processing Data In a Cache

You can use entry processors and aggregators to perform data grid processing across a cluster.These data grid features perform in a similar manner to other map-reduce patterns and allow the processing of large amounts of data at very low latencies.

This chapter includes the following sections:

Overview of Processing Data In a Cache

Coherence provides the ideal infrastructure for building data grid services and the client and server-based applications that use a data grid.At a basic level, Coherence can manage large amounts of data across a large number of servers in a grid; it can provide close to zero latency access for that data; it supports parallel queries across that data in a map-reduce manner; and it supports integration with database and EIS systems that act as the system of record for that data.

This section includes the following topics:

Performing Targeted Processing

Coherence provides for the ability to execute an agent against an entry in any map of data managed by a data grid:

map.invoke(key, agent);

In the case of partitioned data, the agent executes on the grid node that owns the data. The queuing, concurrency management, agent execution, data access by the agent, and data modification by the agent all occur on that grid node. (Only the synchronous backup of the resultant data modification, if any, requires additional network traffic.) For many processing purposes, it is much more efficient to move the serialized form of the agent (at most a few hundred bytes) than to handle distributed concurrency control, coherency and data updates.

For request and response processing, the agent returns a result:

Object oResult = map.invoke(key, agent);

Coherence, as a data grid, determines the location to execute the agent based on the configuration for the data topology. It moves the agent to the determined location, executes the agent (automatically handling concurrency control for the item while executing the agent), backs up the modifications (if any), and returns a result.

Performing Parallel Processing

Coherence provides map-reduce functionality which allows agents to be executed in parallel against a collection of entries across all nodes in the grid. Parallel execution allows large amounts of data to be processed by balancing the work across the grid. The invokeAll method is used as follows:

map.invokeAll(collectionKeys, agent);

For request and response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(collectionKeys, agent);

Coherence determines the optimal location(s) to execute the agent based on the configuration for the data topology. It then moves the agent to the determined locations, executes the agent (automatically handling concurrency control for the item(s) while executing the agent), backs up the modifications (if any), and returns the coalesced results. See Performing Data Grid Aggregation.

Performing Query-Based Processing

Coherence supports the ability to query across the entire data grid. See Querying Data In a Cache. For example, in a trading system it is possible to query for all open Order objects for a particular trader:

NamedCache map    = CacheFactory.getCache("trades");
Filter     filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                                  new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTradeIds = mapTrades.keySet(filter);

By combining this feature with the use of parallel executions in the data grid, Coherence provides the ability to execute an agent against a query. As in the previous section, the execution occurs in parallel, and instead of returning the identities or entries that match the query, Coherence executes the agent against the entries:

map.invokeAll(filter, agent);

For request and response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(filter, agent);

Coherence combines parallel query and parallel execution to achieve query-based agent invocation against a data grid.

Performing Data-Grid-Wide Processing

Passing an instance of AlwaysFilter (or null) to the invokeAll method causes the passed agent to be executed against all entries in the InvocableMap:

map.invokeAll((Filter) null, agent);

As with the other types of agent invocation, request and response processing is supported:

Map mapResults = map.invokeAll((Filter) null, agent);

An application can process all the data spread across a particular map in the data grid with a single line of code.

Using Agents for Targeted, Parallel and Query-Based Processing

You can process data in a cache using agents that are commonly referred to as entry processors. Coherence includes many predefined entry processors that can be used to perform many common operations.

This section includes the following topics:

Overview of Entry Processor Agents

Agents implement the EntryProcessor interface, typically by extending the AbstractProcessor class. Coherence includes the following predefined EntryProcessor implementations that are included in the com.tangosol.util.processor package:

  • AbstractProcessor - an abstract base class for building an EntryProcessor

  • AsynchronousProcessor - A wrapper class that allows for an asynchronous invocation of an underlying entry processor. See Processing Entries Asynchronously.

  • CompositeProcessor - bundles a collection of EntryProcessor objects that are invoked sequentially against the same entry

  • ConditionalProcessor - conditionally invokes an EntryProcessor if a Filter against the entry-to-process evaluates to true

  • ConditionalPut - performs an Entry.setValue operation if the specified condition is satisfied

  • ConditionalPutAll - performs an Entry.setValue operation for multiple entries that satisfy the specified condition

  • ConditionalRemove - performs an Entry.remove operation if the specified condition is satisfied

  • ExtractorProcessor - extracts and returns a value (such as a property value) from an object stored in an InvocableMap

  • NumberIncrementor - pre- or post-increments any property of a primitive integral type, and Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal

  • NumberMultiplier - multiplies any property of a primitive integral type, and Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal, and returns either the previous or new value

  • PreloadRequest - performs an Entry.getValue call. No results are reported back to the caller. The processor provides a means to load an entry or a collection of entries into the cache using a cache loader without incurring the cost of sending the value(s) over the network. If the corresponding entry (or entries) already exists in the cache, or if the cache does not have a loader, then invoking this processor has no effect.

  • PriorityProcessor - explicitly controls the scheduling priority and timeouts for execution of Entryprocessor methods.

  • PropertyProcessor - an abstract base class for EntryProcessor implementations that depend on a PropertyManipulator. The NumberIncrementor and NumberMultiplier entry processors extend this processor.

  • UpdaterProcessor - updates an attribute of an object cached in an InvocableMap.

  • VersionedPut - performs an Entry.setValue operation if the version of the specified value matches the version of the current value. For a match, the processor increments the version indicator before the value is updated. Entry values must implement the Versionable interface.

  • VersionedPutAll - performs an Entry.setValue operation only for entries whose versions match to versions of the corresponding current values. For a match, the processor increments the version indicator before each value is updated. Entry values must implement the Versionable interface.

The EntryProcessor interface (contained within the InvocableMap interface) contains only two methods: process and processAll. The AbstractProcessor provides the default implementation of the processAll method. When processing multiple keys, a single EntryProcessor object is re-used for all the keys and its state should not be mutated.

Note:

If the processAll call throws an exception, changes are only made to the underlying Map for entries that were removed from the setEntries. Changes that are made to the remaining entries are not processed.

The InvocableMap.Entry that is passed to an EntryProcessor is an extension of the Map.Entry interface that allows an EntryProcessor implementation to obtain the necessary information about the entry and to make the necessary modifications in the most efficient manner possible.

Processing Entries Using Lambda Expressions

Lambda expressions can be used as entry processors and can result in more concise client code that does not require the processor to be serialized or registered in a POF configuration file. The following example creates an entry processor as a lambda expression and uses the entry processor within the invokeAll method:

InvocableMap.EntryProcessor<ContactId, Contact, Void> processor = (entry) ->
   {
      Contact contact = entry.getValue();
      contact.setFirstName(contact.getFirstName().toUpperCase());
      entry.setValue(contact);
      return null;
   };
 
cache.invokeAll(processor);

The following example creates an entry processor as a lambda expression directly within the invokeAll method.

Address addrWork = new Address("201 Newbury St.", "Yoyodyne, Ltd.",
   "Boston", "MA", "02116", "US");

ValueExtractor extractor =
   Lambda.extractor(Contact::getHomeAddress).andThen(Address::getState);
Filter filter = equal(extractor, "MA");

addrWork.setStreet1("200 Newbury St.");

cache.invokeAll(filter, entry ->
   {
   Contact contact = entry.getValue();
   contact.setWorkAddress(addrWork);
   entry.setValue(contact);
   return null;
   });

Note that lambda expressions cannot be nested. For example:

cache.invoke(filter, entry -> {Runnable r = () -> System.out.println("");
r.run();}
About Lambdas in a Distributed Environment

Executing lambda expressions in distributed environments can be problematic due to the static nature of lambdas. Only the metadata that describes the lambda is sent across the wire. Therefore, the same compiled code is required in both the client and server classpath. Any changes or additions of new lambda expressions on the client require a redeployment and restart of both the client and the server. In addition, synthetic lambda method names are not stable across class versions. Therefore, all cluster members must have the exact version of a class and must be upgraded, including extend clients, at the same time.

To overcome these limitation, a dynamic implementation for lambdas is provided. The dynamic implementation sends both the lambda metadata and the actual byte code to be executed. Client-side byte code is parsed and then from it a new lambda class is generated. On the server, a lambda class is created based on the byte code received from the client, and executed. The dynamic implementation:

  • allows modification of existing (or the introduction of new) behavior without the need for redeployment or server restart.

  • eliminates the issues related to lambda naming instability.

  • allows multiple different versions of a class throughout the cluster.

To ensure that the dynamic implementation works correctly, do not refer to named methods and constructors in a lambda expression, because method and constructor references are always treated as static lambdas. In addition, the dynamic implementation captures only enclosing method arguments and local variables.

Coherence has been using dynamic lambdas implementation in a distributed environment since Coherence release 12.2.1. To provide users the ability to choose between the convenience and flexibility of dynamic lambdas and the enhanced security of static lambdas, a configuration option has been added to enable configuration between using dynamic and static lambdas. The system property coherence.lambdas enables configuration of Coherence clients/extend clients/proxies/servers to use either dynamic or static lambdas. By default, the lambdas in a distributed environment will remain "dynamic". If static lambdas are to be used, all Coherence clients and extend clients/proxies/servers must be configured with the system property coherence.lambdas set to static. Otherwise, any attempt to send a dynamic lambda from a Coherence client configured with dynamic lambdas to a Coherence server member configured with static lambdas will result in a lambda invocation failure. A Coherence server configured with “dynamic” can handle both static or dynamic client lambda invocations.

Using coherence.lambdas System Property to Configure Static Lambdas
You can use the system property coherence.lambdas to configure static lambdas, as shown below:
-Dcoherence.lambdas=static

Dynamic lambdas send byte code between Coherence members in the distributed environment. The static lambdas implementation sends only lambda metadata references across the members in the distributed environment and the same class files containing the static lambdas must be in the java classpath of all members (or the static lambda metadata reference will not resolve in Coherence receiver member).

Note:

When the class defining the serialized lambda is missing from the server’s java classpath context, here is an example print stack trace of an exception handled by the client invoking the lambda:

Failed request execution for DistributedCache service on Member(Id=3,,…)) java.lang.ClassNotFoundException: lambda.AbstractRemoteFunctionTests
…
Caused by: java.lang.RuntimeException: Failed to deserialize static lambda lambda/RemoteTests$lambda$testLambdaInvoke$393ba5d0$1(Lcom/tangosol/util/InvocableMap$Entry;)Ljava/lang/Object; due to missing context class lambda.RemoteTests.
at com.tangosol.internal.util.invoke.lambda.StaticLambdaInfo.toSerializedLambda(StaticLambdaInfo.java:430

The above failure is resolved by ensuring that the missing class lambda.RemoteTests is in all server classpaths so that the static lambda reference sent by the invoking client is resolved on the server side.

Processing Entries in Multiple Caches

Entry processors can update cache entries in multiple caches within a single process or processAll operation. The caches must be managed by the same service and the entries must be located in the same partition. See Specifying Data Affinity.

The process and processAll operations are performed in a transaction-like manner that uses implicit locks when accessing, inserting, updating, modifying, or removing cache entries. If an exception is thrown during the processing of the entries, the entries are rolled back leaving all underlying values unchanged. The processAll operation is atomic with respect to all entries within a single partition (or member if no service threads are configured) rather than the individual entry or entire request.

Note:

The implicit lock may create a deadlock if entries are locked in conflicting orders on different threads. The application is responsible for ensuring that cache entries are accessed (locked) in a deadlock-free manner. In the case where a deadlock is detected, an exception is thrown but the underlying service is not stopped.

The com.tangosol.net.BackingMapContext API is used to process entries in multiple caches and provides a way to directly access entries in a cache's backing map. The backing map is the actual Map implementation where entries are stored (as opposed to the logical representation of a cache typically used by an application). Entries in a backing map are stored in binary format and therefore require an application to handle the serialized form of an entry. See Implementing Storage and Backing Maps.

The com.tangosol.util.BinaryEntry API provides easy access to a backing map context and is typically used by an application. The following sample code demonstrates how to update entries in two different caches within the process method of an entry processor using the BinaryEntry API.

public Object process(Entry entry) {
   BinaryEntry binEntry = (BinaryEntry) entry;
   Binary binKey = binEntry.getBinaryKey();
   Trade trade = (Trade) binEntry.getValue();
       
   // Update a Trade object in cache1

   trade.setPrice(trade.getPrice() + factor);
   binEntry.setValue(trade);  
 
   // update a Trade object in cache2

   BackingMapManagerContext ctx = binEntry.getContext();
   BinaryEntry binEntry2 = 
      (BinaryEntry) ctx.getBackingMapContext("cache2").getBackingMapEntry(binKey);
   Trade trade2 = (Trade) binEntry2.getValue();
   trade2.setPrice(trade2.getPrice() + factor);
   binEntry2.setValue(trade2);

   return null;
}

Note:

The getBackingMapEntry method may only be called within the context of an entry processor invocation. Any changes made to the entry are persisted with the same lifecycle as those made by the enclosing invocation. The returned entry is only valid for the duration of the enclosing invocation and multiple calls to this method within the same invocation context returns the same entry object.

Ignoring the Results of an Entry Processor

The processAll method of the AbstractProcessor class returns a map of results to a client application. The map contains the keys and values for every entry that was processed. Most often, the entry processor returns results that the client uses. However, there may be situations where some results are not usable by the client. More importantly, there may be situations where the processor must evaluate all the entries in a cache; in which case, the return map contains every key in the cache. In both situations, the agent should be designed to ignore results that are not wanted.

Designing an agent that only returns wanted results is a good pattern and best practice, because it:

  • Makes the client's memory footprint independent of the size of the cache.

  • Avoids transferring all affected keys to the client which could result in an OutOfMemoryError exception on the client if the cache is too large.

  • Avoids deserialization costs in the client for all keys.

  • Avoids transferring the map and all keys through a proxy node (for Extend clients).

To ignore entry processor results, override the processor's processAll method to return an empty Map or null. The following example demonstrates a simple entry processor agent that always returns null after processing an entry. The example is not very realistic but does show how to override the processAll method.

public static class Agent
   implements InvocableMap.EntryProcessor
   {
   private static final long serialVersionUID = 1L;
   
   @Override
   public Object process(Entry entry)
      {
      return null;
      }
    
   @Override
   public Map processAll(Set set)
      {
      for (Entry entry : (Set<Entry>) set)
         {
         process(entry);
         }
      return null;
      }

Performing Synthetic Operations

Entry processors can perform synthetic operations on entries. Both the remove and setValue methods for an entry can be declared synthetic by including a true parameter in the method call. For example:

entry.setValue(value, true)

The setValue method in a synthetic operation does not return the previous value. In addition, synthetic operations are not propagated to cache stores or binary entry stores; applications are able to commit changes after all processing is complete.

Applications typically use synthetic operations to prepare a cache and perform operations (state changes) that do not require listeners and cache stores to be notified, especially when doing so may be expensive in terms of network, memory, and CPU usage. Applications also use synthetic operations when pre-warming a cache; applications that load cache data may want to avoid having a cache store called, especially if the entries that are being loaded into the cache are coming from the back-end data source.

Applications can differentiate between client driven (natural) events and cache internal (synthetic) events using the isSynthetic method on the CacheEvent class. See Using Synthetic Events.

Processing Entries Asynchronously

Entry processors can be invoked asynchronously using the AsynchronousProcessor class. The class implements the standard Java Future interface and also includes a Coherence-specific flow control mechanism to guard against excessive backlogs.

Note:

The AsynchronousProcessor class is used to wrap an entry processor implementation. For example:

UpdaterProcessor up = new UpdaterProcessor(null, value);
AsynchronousProcessor ap = new AsynchronousProcessor(up);

cache.invokeAll(filter, ap);
ap.getResult();

The above example invokes the underlying entry processor and uses automatic flow control (as defined by the underlying service's flow control logic) and a default unit-of-order ID (assigned to the calling thread's hashCode – as a thread's requests are naturally expected to execute in order). Ordering is guaranteed for each partition even during failover. Additional constructors are available to manually control request flow and assign the unit-of-order ID as required.

For advanced use cases, the AsynchronousProcessor class can be extended to define custom asynchronous functionality. The following example extends the AsynchrounousProcessor class and overrides the onResult, onComplete and onException methods.

Note:

Overriding implementations of the onComplete, onResult, and onException methods must be non-blocking and short-lived, because this call is made on the service thread of the client and blocks processing for responses on other threads.

AsynchronousProcessor processor = new AsynchronousProcessor(null)
   {

   public synchronized void onResult(Entry entry)
      {
      super.onResult(entry);

      // process the result
      }

   public void onComplete()
      {
      super.onComplete();
            
      if (m_eReason == null)
         {

         // process the result
         }
      else
         {

         // process the (potentially partial) failure
         }
      }

   public void onException(Throwable eReason)
      {
         super.onException(eReason);
         
         // process the observed exception
      }
};

Performing Data Grid Aggregation

The InvocableMap interface supports entry aggregators that perform operations against a subset of entries to obtain a single result.Entry aggregation occurs in parallel across the grid to provide map-reduce support when working with large amounts of data. For details, see the aggregate method.

In addition, the EntryAggregator interface can be used to processes a set of InvocableMap.Entry objects to achieve an aggregated result.

For efficient execution in a data grid, an aggregation process must be designed to operate in a parallel manner. The StreamingAggregator interface is an advanced extension to the EntryAggregator interface that is explicitly capable of being run in parallel in a distributed environment.

Note:

The ParallelAwareAggregator interface has been deprecated and should no longer be used. Applications should use the StreamingAggregator interface to implement custom aggregators. See Performing Data Grid Aggregation Using Streams.

Coherence includes many natural aggregator functions. The functions include:

  • Count

  • DistinctValues

  • DoubleAverage

  • DoubleMax

  • DoubleMin

  • DoubleSum

  • LongMax

  • LongMin

  • LongSum

See the com.tangosol.util.aggregator package for a list of Coherence aggregators. To implement your own aggregator, see the StreamingAggregator interface.

Performing Data Grid Aggregation Using Streams

Data grid aggregation can be performed using Java streams. The use of streams provides a simplified programming model especially when combined with Java lambda expressions. For example:
ValueExtractor<Person, Integer> ageExtractor = Person::getAge;
double avgAge = cache.stream()
 .mapToInt(entry -> entry.extract(ageExtractor))
 .average()
 .getAsDouble();

When using Coherence filters, pass a filter object as the source of the stream. For example:

ValueExtractor<Person, Integer> ageExtractor = Person::getAge;
int max = personCache.stream(filter)
 .mapToInt(entry -> entry.extract(ageExtractor))
 .max();

As an alternative, use the Coherence Stream API extension to specify the extractor when creating a stream and rely on the extension for any optimizations. For example:

int max = personCache.stream(filter, Person::getAge)
 .mapToInt(Number::intValue)
 .max();

Note that in this case you must use mapToInt(Number::intValue) to convert Stream<Integer> into IntStream. This conversion can also be done internally. For example:

int max = RemoteStream.toIntStream(personCache.stream(filter, Person::getAge)).max();

The Java streams implementation has been extended in Coherence to allow aggregation across the cluster. The API defines a set of aggregators that support streams by implementing the InvocableMap.StreamingAggregator interface. In addition, Coherence includes many useful collectors that can be executed in parallel in a distributed environment. The collectors are called using the RemoteCollectors class. For example:

avgAge = cache.stream()
   .map(Map.Entry::getValue)
   .collect(RemoteCollectors.averagingInt(Contact::getAge));
System.out.println("\nThe average age of all contacts using collect() is: " +
   avgAge);

To define custom aggregators that also support streams, you can extend the CollectorAggregator class.

Performing Node-Based Processing

Coherence provides an invocation service which allows execution of single-pass agents (called invocable objects) anywhere within the grid.The agents can be executed on a particular node in the grid, in parallel on a particular set of nodes in the grid, or in parallel on all nodes of the grid.

An invocation service is configured using the <invocation-scheme> element in the cache configuration file. See invocation-scheme. Using the name of the service, the application can easily obtain a reference to the service:

InvocationService service = (InvocationService)CacheFactory.getService
("MyService");

Agents are simply runnable classes that are part of the application. An example of a simple agent is one designed to request a GC from the JVM:

/**
* Agent that issues a garbage collection.
*/
public class GCAgent
        extends AbstractInvocable
    {
    public void run()
        {
        System.gc();
        }
    }

To execute that agent across the entire cluster, it takes one line of code:

service.execute(new GCAgent(), null, null);

Here is an example of an agent that supports a grid-wide request/response model:

/**
* Agent that determines how much free memory a grid node has.
*/
public class FreeMemAgent
        extends AbstractInvocable
    {
    public void run()
        {
        Runtime runtime = Runtime.getRuntime();
        int cbFree  = runtime.freeMemory();
        int cbTotal = runtime.totalMemory();
        setResult(new int[] {cbFree, cbTotal});
        }
    }

To execute that agent across the entire grid and retrieve all the results from it only requires a single line of code:

Map map = service.query(new FreeMemAgent(), null);

While it is easy to do a grid-wide request/response, it takes a bit more code to print the results:

Iterator iter = map.entrySet().iterator();
while (iter.hasNext())
    {
    Map.Entry entry  = (Map.Entry) iter.next();
    Member    member = (Member) entry.getKey();
    int[]     anInfo = (int[]) entry.getValue();
    if (anInfo != null) // nullif member died
        System.out.println("Member " + member + " has "
            + anInfo[0] + " bytes free out of "
            + anInfo[1] + " bytes total");
    }

The agent operations can be stateful, which means that their invocation state is serialized and transmitted to the grid nodes on which the agent is to be run.

/**
* Agent that carries some state with it.
*/
public class StatefulAgent
        extends AbstractInvocable
    {
    public StatefulAgent(String sKey)
        {
        m_sKey = sKey;
        }

    public void run()
        {
        // the agent has the key that it was constructed with
        String sKey = m_sKey;
        // ...
        }

    private String m_sKey;
    }

Using a Work Manager

Coherence provides a grid-enabled implementation of the CommonJ Work Manager.Using a Work Manager, an application can submit a collection of work that must be executed. The Work Manager distributes that work in such a way that it is executed in parallel, typically across the grid. In other words, if there are ten work items submitted and ten servers in the grid, then each server likely processes one work item. Further, the distribution of work items across the grid can be tailored, so that certain servers (for example, one that acts as a gateway to a particular mainframe service) is the first choice to run certain work items, for sake of efficiency and locality of data.

The application can then wait for the work to be completed, and can provide a timeout for how long it can wait. The API for this purpose is quite powerful, allowing an application to wait for the first work item to complete, or for a specified set of the work items to complete. By combining methods from this API, it is possible to do things like "Here are 10 items to execute; for these 7 unimportant items, wait no more than 5 seconds, and for these 3 important items, wait no more than 30 seconds".

Example 29-1 Using a Work Manager

Work[] aWork = ...
Collection collBigItems = new ArrayList();
Collection collAllItems = new ArrayList();
for (int i = 0, c = aWork.length; i < c; ++i)
    {
    WorkItem item = manager.schedule(aWork[i]);

    if (i < 3)
        {
        // the first three work items are the important ones
        collBigItems.add(item);
        }

    collAllItems.add(item);
    }

Collection collDone = manager.waitForAll(collAllItems, 5000L);
if (!collDone.containsAll(collBigItems))
    {
    // wait the remainder of 30 seconds for the important work to finish
    manager.waitForAll(collBigItems, 25000L);
    }