This chapter includes the following sections:
Coherence provides the ideal infrastructure for building data grid services and the client and server-based applications that use a data grid. At a basic level, Coherence can manage large amounts of data across a large number of servers in a grid; it can provide close to zero latency access for that data; it supports parallel queries across that data in a map-reduce manner; and it supports integration with database and EIS systems that act as the system of record for that data.
Coherence provides for the ability to execute an agent against an entry in any map of data managed by a data grid:
map.invoke(key, agent);
In the case of partitioned data, the agent executes on the grid node that owns the data. The queuing, concurrency management, agent execution, data access by the agent, and data modification by the agent all occur on that grid node. (Only the synchronous backup of the resultant data modification, if any, requires additional network traffic.) For many processing purposes, it is much more efficient to move the serialized form of the agent (at most a few hundred bytes) than to handle distributed concurrency control, coherency and data updates.
For request and response processing, the agent returns a result:
Object oResult = map.invoke(key, agent);
Coherence, as a data grid, determines the location to execute the agent based on the configuration for the data topology. It moves the agent to the determined location, executes the agent (automatically handling concurrency control for the item while executing the agent), backs up the modifications (if any), and returns a result.
Coherence provides map-reduce functionality which allows agents to be executed in parallel against a collection of entries across all nodes in the grid. Parallel execution allows large amounts of data to be processed by balancing the work across the grid. The invokeAll
method is used as follows:
map.invokeAll(collectionKeys, agent);
For request and response processing, the agent returns one result for each key processed:
Map mapResults = map.invokeAll(collectionKeys, agent);
Coherence determines the optimal location(s) to execute the agent based on the configuration for the data topology. It then moves the agent to the determined locations, executes the agent (automatically handling concurrency control for the item(s) while executing the agent), backs up the modifications (if any), and returns the coalesced results. See "Performing Data Grid Aggregation" for instructions on performing aggregation against a result set.
Coherence supports the ability to query across the entire data grid. For details on creating queries, see Querying Data In a Cache. For example, in a trading system it is possible to query for all open Order
objects for a particular trader:
NamedCache map = CacheFactory.getCache("trades"); Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid), new EqualsFilter("getStatus", Status.OPEN)); Set setOpenTradeIds = mapTrades.keySet(filter);
By combining this feature with the use of parallel executions in the data grid, Coherence provides the ability to execute an agent against a query. As in the previous section, the execution occurs in parallel, and instead of returning the identities or entries that match the query, Coherence executes the agent against the entries:
map.invokeAll(filter, agent);
For request and response processing, the agent returns one result for each key processed:
Map mapResults = map.invokeAll(filter, agent);
Coherence combines parallel query and parallel execution to achieve query-based agent invocation against a data grid.
Passing an instance of AlwaysFilter
(or null
) to the invokeAll
method causes the passed agent to be executed against all entries in the InvocableMap
:
map.invokeAll((Filter) null, agent);
As with the other types of agent invocation, request and response processing is supported:
Map mapResults = map.invokeAll((Filter) null, agent);
An application can process all the data spread across a particular map in the data grid with a single line of code.
Agents implement the EntryProcessor
interface, typically by extending the AbstractProcessor
class. Coherence includes many predefined entry processors that can be used to perform many common operations. The processors are defined in the com.tangosol.util.processor
package.
Coherence includes the following EntryProcessor
implementations:
AbstractProcessor
- an abstract base class for building an EntryProcessor
AsynchronousProcessor
- A wrapper class that allows for an asynchronous invocation of an underlying entry processor. For details, see "Processing Entries Asynchronously".
CompositeProcessor
- bundles a collection of EntryProcessor
objects that are invoked sequentially against the same entry
ConditionalProcessor
- conditionally invokes an EntryProcessor
if a Filter
against the entry-to-process evaluates to true
ConditionalPut
- performs an Entry.setValue
operation if the specified condition is satisfied
ConditionalPutAll
- performs an Entry.setValue
operation for multiple entries that satisfy the specified condition
ConditionalRemove
- performs an Entry.remove
operation if the specified condition is satisfied
ExtractorProcessor
- extracts and returns a value (such as a property value) from an object stored in an InvocableMap
NumberIncrementor
- pre- or post-increments any property of a primitive integral type, and Byte
, Short
, Integer
, Long
, Float
, Double
, BigInteger
, BigDecimal
NumberMultiplier
- multiplies any property of a primitive integral type, and Byte
, Short
, Integer
, Long
, Float
, Double
, BigInteger
, BigDecimal
, and returns either the previous or new value
PreloadRequest
- performs an Entry.getValue
call. No results are reported back to the caller. The processor provides a means to load an entry or a collection of entries into the cache using a cache loader without incurring the cost of sending the value(s) over the network. If the corresponding entry (or entries) already exists in the cache, or if the cache does not have a loader, then invoking this processor has no effect.
PriorityProcessor
- explicitly controls the scheduling priority and timeouts for execution of Entryprocessor
methods.
PropertyProcessor
- an abstract base class for EntryProcessor
implementations that depend on a PropertyManipulator
. The NumberIncrementor
and NumberMultiplier
entry processors extend this processor.
UpdaterProcessor
- updates an attribute of an object cached in an InvocableMap
.
VersionedPut
- performs an Entry.setValue
operation if the version of the specified value matches the version of the current value. For a match, the processor increments the version indicator before the value is updated. Entry values must implement the Versionable
interface.
VersionedPutAll
- performs an Entry.setValue
operation only for entries whose versions match to versions of the corresponding current values. For a match, the processor increments the version indicator before each value is updated. Entry values must implement the Versionable
interface.
The EntryProcessor
interface (contained within the InvocableMap
interface) contains only two methods: process
and processAll
. The AbstractProcessor
provides the default implementation of the processAll
method. When processing multiple keys, a single EntryProcessor
object is re-used for all the keys and its state should not be mutated.
Note:
If the processAll
call throws an exception, changes are only made to the underlying Map
for entries that were removed from the setEntries
. Changes that are made to the remaining entries are not processed.
The InvocableMap.Entry
that is passed to an EntryProcessor
is an extension of the Map.Entry
interface that allows an EntryProcessor
implementation to obtain the necessary information about the entry and to make the necessary modifications in the most efficient manner possible.
Lambda expressions can be used as entry processors and can result in more concise client code that does not require the processor to be serialized or registered in a POF configuration file. The following example creates an entry processor as a lambda expression and uses the entry processor within the invokeAll
method:
InvocableMap.EntryProcessor<ContactId, Contact, Void> processor = (entry) -> { Contact contact = entry.getValue(); contact.setFirstName(contact.getFirstName().toUpperCase()); entry.setValue(contact); return null; }; cache.invokeAll(processor);
The following example creates an entry processor as a lambda expression directly within the invokeAll
method.
Address addrWork = new Address("201 Newbury St.", "Yoyodyne, Ltd.", "Boston", "MA", "02116", "US"); ValueExtractor extractor = Lambda.extractor(Contact::getHomeAddress).andThen(Address::getState); Filter filter = equal(extractor, "MA"); addrWork.setStreet1("200 Newbury St."); cache.invokeAll(filter, entry -> { Contact contact = entry.getValue(); contact.setWorkAddress(addrWork); entry.setValue(contact); return null; });
Note that lambda expressions cannot be nested. For example:
cache.invoke(filter, entry -> {Runnable r = () -> System.out.println(""); r.run();}
Understanding Lambdas in a Distributed Environment
Executing lambda expressions in distributed environments can be problematic due to the static nature of lambdas. Only the metadata that describes the lambda is sent across the wire. Therefore, the same compiled code is required in both the client and server classpath. Any changes or additions of new lambda expressions on the client require a redeployment and restart of both the client and the server. In addition, synthetic lambda method names are not stable across class versions. Therefore, all cluster members must have the exact version of a class and must be upgraded, including extend clients, at the same time.
To overcome these limitation, a dynamic implementation for lambdas is provided. The dynamic implementation sends both the lambda metadata and the actual byte code to be executed. Client-side byte code is parsed and then from it a new lambda class is generated. On the server, a lambda class is created based on the byte code received from the client and executed. The dynamic implementation:
allows modification of existing (or the introduction of new) behavior without the need for redeployment or server restart
eliminates the issues related to lambda naming instability
allows multiple different versions of a class throughout the cluster
To ensure that the dynamic implementation works correctly, make sure not to refer to named methods and constructors in a lambda expression, because method and constructor references are always treated as static lambdas. In addition, the dynamic implementation only captures enclosing method arguments and local variables.
Entry processors can update cache entries in multiple caches within a single process
or processAll
operation. The caches must be managed by the same service and the entries must be located in the same partition. For details about ensuring that entries are in the same partition, see "Specifying Data Affinity".
The process
and processAll
operations are performed in a transaction-like manner that uses implicit locks when accessing, inserting, updating, modifying, or removing cache entries. If an exception is thrown during the processing of the entries, the entries are rolled back leaving all underlying values unchanged. The processAll
operation is atomic with respect to all entries within a single partition (or member if no service threads are configured) rather than the individual entry or entire request.
Note:
The implicit lock may create a deadlock if entries are locked in conflicting orders on different threads. The application is responsible for ensuring that cache entries are accessed (locked) in a deadlock-free manner. In the case where a deadlock is detected, an exception is thrown but the underlying service is not stopped.
The com.tangosol.net.BackingMapContext
API is used to process entries in multiple caches and provides a way to directly access entries in a cache's backing map. The backing map is the actual Map
implementation where entries are stored (as opposed to the logical representation of a cache typically used by an application). Entries in a backing map are stored in binary format and therefore require an application to handle the serialized form of an entry. For details on backing maps, see Implementing Storage and Backing Maps.
The com.tangosol.util.BinaryEntry
API provides easy access to a backing map context and is typically used by an application. The following sample code demonstrates how to update entries in two different caches within the process
method of an entry processor using the BinaryEntry
API.
public Object process(Entry entry) { BinaryEntry binEntry = (BinaryEntry) entry; Binary binKey = binEntry.getBinaryKey(); Trade trade = (Trade) binEntry.getValue(); // Update a Trade object in cache1 trade.setPrice(trade.getPrice() + factor); binEntry.setValue(trade); // update a Trade object in cache2 BackingMapManagerContext ctx = binEntry.getContext(); BinaryEntry binEntry2 = (BinaryEntry) ctx.getBackingMapContext("cache2").getBackingMapEntry(binKey); Trade trade2 = (Trade) binEntry2.getValue(); trade2.setPrice(trade2.getPrice() + factor); binEntry2.setValue(trade2); return null; }
Note:
The getBackingMapEntry
method may only be called within the context of an entry processor invocation. Any changes made to the entry are persisted with the same lifecycle as those made by the enclosing invocation. The returned entry is only valid for the duration of the enclosing invocation and multiple calls to this method within the same invocation context returns the same entry object.
The processAll
method of the AbstractProcessor
class returns a map of results to a client application. The map contains the keys and values for every entry that was processed. Most often, the entry processor returns results that the client uses. However, there may be situations where some results are not usable by the client. More importantly, there may be situations where the processor must evaluate all the entries in a cache; in which case, the return map contains every key in the cache. In both situations, the agent should be designed to ignore results that are not wanted.
Designing an agent that only returns wanted results is a good pattern and best practice, because it:
Makes the client's memory footprint independent of the size of the cache.
Avoids transferring all affected keys to the client which could result in an OutOfMemoryError
exception on the client if the cache is too large.
Avoids deserialization costs in the client for all keys.
Avoids transferring the map and all keys through a proxy node (for Extend clients).
To ignore entry processor results, override the processor's processAll
method to return an empty Map
or null
. The following example demonstrates a simple entry processor agent that always returns null
after processing an entry. The example is not very realistic but does show how to override the processAll
method.
public static class Agent implements InvocableMap.EntryProcessor { private static final long serialVersionUID = 1L; @Override public Object process(Entry entry) { return null; } @Override public Map processAll(Set set) { for (Entry entry : (Set<Entry>) set) { process(entry); } return null; }
Entry processors can perform synthetic operations on entries. Both the remove
and setValue
methods for an entry can be declared synthetic by including a true
parameter in the method call. For example:
entry.setValue(value, true)
The setValue
method in a synthetic operation does not return the previous value. In addition, synthetic operations are not propagated to cache stores or binary entry stores; applications are able to commit changes after all processing is complete.
Applications typically use synthetic operations to prepare a cache and perform operations (state changes) that do not require listeners and cache stores to be notified, especially when doing so may be expensive in terms of network, memory, and CPU usage. Applications also use synthetic operations when pre-warming a cache; applications that load cache data may want to avoid having a cache store called, especially if the entries that are being loaded into the cache are coming from the back-end data source.
Applications can differentiate between client driven (natural) events and cache internal (synthetic) events using the isSynthetic
method on the CacheEvent
class. For details, see "Using Synthetic Events".
Entry processors can be invoked asynchronously using the AsynchronousProcessor
class. The class implements the standard Java Future
interface and also includes a Coherence-specific flow control mechanism to guard against excessive backlogs.
Note:
Entries can also be processed asynchronously using the AsyncNameCache<K, V>
interface. For details, see "Performing NameCache Operations Asynchronously."
The AsynchronousProcessor
class is used to wrap an entry processor implementation. For example:
UpdaterProcessor up = new UpdaterProcessor(null, value); AsynchronousProcessor ap = new AsynchronousProcessor(up); cache.invokeAll(filter, ap); ap.getResult();
The above example invokes the underlying entry processor and uses automatic flow control (as defined by the underlying service's flow control logic) and a default unit-of-order ID (assigned to the calling thread's hashCode
– as a thread's requests are naturally expected to execute in order). Ordering is guaranteed for each partition even during failover. Additional constructors are available to manually control request flow and assign the unit-of-order ID as required.
For advanced use cases, the AsynchronousProcessor
class can be extended to define custom asynchronous functionality. The following example extends the AsynchrounousProcessor
class and overrides the onResult
, onComplete
and onException
methods. Refer to the Java API Reference for Oracle Coherence for details on the AsynchrounousProcessor
class and its methods.
Note:
Overriding implementations of the onComplete
, onResult
, and onException
methods must be non-blocking and short-lived, because this call is made on the service thread of the client and blocks processing for responses on other threads.
AsynchronousProcessor processor = new AsynchronousProcessor(null) { public synchronized void onResult(Entry entry) { super.onResult(entry); // process the result } public void onComplete() { super.onComplete(); if (m_eReason == null) { // process the result } else { // process the (potentially partial) failure } } public void onException(Throwable eReason) { super.onException(eReason); // process the observed exception } };
In addition to scalar agents, the InvocableMap
interface also supports entry aggregators that perform operations against a subset of entries to obtain a single result. Entry aggregation occurs in parallel across the grid to provide map-reduce support when working with large amounts of data. For details, see the aggregate
method.
In addition, the EntryAggregator
interface can be used to processes a set of InvocableMap
.Entry
objects to achieve an aggregated result.
For efficient execution in a data grid, an aggregation process must be designed to operate in a parallel manner. The StreamingAggregator
interface is an advanced extension to the EntryAggregator
interface that is explicitly capable of being run in parallel in a distributed environment.
Note:
The ParallelAwareAggregator
interface has been deprecated and should no longer be used. Applications should use the StreamingAggregator
interface to implement custom aggregators. For details, see Performing Data Grid Aggregation Using Streams.
Coherence includes many natural aggregator functions. The functions include:
See the com.tangosol.util.aggregator
package for a list of Coherence aggregators. To implement your own aggregator, see the StreamingAggregator
interface.
Data grid aggregation can be performed using Java streams. The use of streams provides a simplified programming model especially when combined with Java lambda expressions. For example:
ValueExtractor<Person, Integer> ageExtractor = Person::getAge; double avgAge = cache.stream() .mapToInt(entry -> entry.extract(ageExtractor)) .average() .getAsDouble();
When using Coherence filters, pass a filter object as the source of the stream. For example:
ValueExtractor<Person, Integer> ageExtractor = Person::getAge; int max = personCache.stream(filter) .mapToInt(entry -> entry.extract(ageExtractor)) .max();
As an alternative, use the Coherence Stream
API extension to specify the extractor when creating a stream and rely on the extension for any optimizations. For example:
int max = personCache.stream(filter, Person::getAge) .mapToInt(Number::intValue) .max();
Note that in this case you must use mapToInt(Number::intValue)
to convert Stream<Integer>
into IntStream
. This conversion can also be done internally. For example:
int max = RemoteStream.toIntStream(personCache.stream(filter, Person::getAge)).max();
The Java streams implementation has been extended in Coherence to allow aggregation across the cluster. The API defines a set of aggregators that support streams by implementing the InvocableMap.StreamingAggregator
interface. In addition, Coherence includes many useful collectors that can be executed in parallel in a distributed environment. The collectors are called using the RemoteCollectors
class. For example:
avgAge = cache.stream() .map(Map.Entry::getValue) .collect(RemoteCollectors.averagingInt(Contact::getAge)); System.out.println("\nThe average age of all contacts using collect() is: " + avgAge);
To define custom aggregators that also support streams, you can extend the CollectorAggregator
class. For details on this class and the supported collectors, see the com.tangosol.internal.util.stream
package in Java API Reference for Oracle Coherence.
Coherence provides an Invocation Service which allows execution of single-pass agents (called Invocable objects) anywhere within the grid. The agents can be executed on a particular node in the grid, in parallel on a particular set of nodes in the grid, or in parallel on all nodes of the grid.
An invocation service is configured using the <invocation-scheme>
element in the cache configuration file. Using the name of the service, the application can easily obtain a reference to the service:
InvocationService service = (InvocationService)CacheFactory.getService ("MyService");
Agents are simply runnable classes that are part of the application. An example of a simple agent is one designed to request a GC from the JVM:
/** * Agent that issues a garbage collection. */ public class GCAgent extends AbstractInvocable { public void run() { System.gc(); } }
To execute that agent across the entire cluster, it takes one line of code:
service.execute(new GCAgent(), null, null);
Here is an example of an agent that supports a grid-wide request/response model:
/** * Agent that determines how much free memory a grid node has. */ public class FreeMemAgent extends AbstractInvocable { public void run() { Runtime runtime = Runtime.getRuntime(); int cbFree = runtime.freeMemory(); int cbTotal = runtime.totalMemory(); setResult(new int[] {cbFree, cbTotal}); } }
To execute that agent across the entire grid and retrieve all the results from it, it still takes only one line of code:
Map map = service.query(new FreeMemAgent(), null);
While it is easy to do a grid-wide request/response, it takes a bit more code to print the results:
Iterator iter = map.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); Member member = (Member) entry.getKey(); int[] anInfo = (int[]) entry.getValue(); if (anInfo != null) // nullif member died System.out.println("Member " + member + " has " + anInfo[0] + " bytes free out of " + anInfo[1] + " bytes total"); }
The agent operations can be stateful, which means that their invocation state is serialized and transmitted to the grid nodes on which the agent is to be run.
/** * Agent that carries some state with it. */ public class StatefulAgent extends AbstractInvocable { public StatefulAgent(String sKey) { m_sKey = sKey; } public void run() { // the agent has the key that it was constructed with String sKey = m_sKey; // ... } private String m_sKey; }
Coherence provides a grid-enabled implementation of the CommonJ Work Manager. Using a Work Manager, an application can submit a collection of work that must be executed. The Work Manager distributes that work in such a way that it is executed in parallel, typically across the grid. In other words, if there are ten work items submitted and ten servers in the grid, then each server likely processes one work item. Further, the distribution of work items across the grid can be tailored, so that certain servers (for example, one that acts as a gateway to a particular mainframe service) is the first choice to run certain work items, for sake of efficiency and locality of data.
The application can then wait for the work to be completed, and can provide a timeout for how long it can wait. The API for this purpose is quite powerful, allowing an application to wait for the first work item to complete, or for a specified set of the work items to complete. By combining methods from this API, it is possible to do things like "Here are 10 items to execute; for these 7 unimportant items, wait no more than 5 seconds, and for these 3 important items, wait no more than 30 seconds".
Example 24-1 Using a Work Manager
Work[] aWork = ... Collection collBigItems = new ArrayList(); Collection collAllItems = new ArrayList(); for (int i = 0, c = aWork.length; i < c; ++i) { WorkItem item = manager.schedule(aWork[i]); if (i < 3) { // the first three work items are the important ones collBigItems.add(item); } collAllItems.add(item); } Collection collDone = manager.waitForAll(collAllItems, 5000L); if (!collDone.containsAll(collBigItems)) { // wait the remainder of 30 seconds for the important work to finish manager.waitForAll(collBigItems, 25000L); }