Coherence provides several different options to support locking, transactions, and concurrent access to data.
Coherence provides several options for managing concurrent access to data.
Table 2-1 Coherence Concurrent Access Options
Option Name | Description |
---|---|
The |
|
The |
|
For transaction management in a Java EE container, Coherence provides a JCA resource adaptor to allow transactions to be managed by using JTA. Although Coherence does not currently support XA transactions, it can participate in XA transactions as the last resource. |
|
Coherence also supports a lock-free programming model through the |
|
Guidelines on maintaining caches with local (non XA) data resources. |
The standard NamedCache
interface extends the ConcurrentMap
interface which includes basic locking methods. Locking operations are applied at the entry level by requesting a lock against a specific key in a NamedCache
:
Example 2-1 Applying Locking Operations on a Cache
... NamedCache cache = CacheFactory.getCache("dist-cache"); Object key = "example_key"; cache.lock(key, -1); try { Object value = cache.get(key); // application logic cache.put(key, value); } finally { // Always unlock in a "finally" block // to ensure that uncaught exceptions // don't leave data locked cache.unlock(key); } ...
Coherence lock functionality is similar to the Java synchronized
keyword and the C# lock
keyword: locks only block locks. Threads must cooperatively coordinate access to data through appropriate use of locking. If a thread has locked the key to an item, another thread can read the item without locking.
Locks are unaffected by server failure (and will failover to a backup server.) Locks are immediately released when the lock owner (client) fails.
Locking behavior varies depending on the timeout requested and the type of cache. A timeout of -1 will block indefinitely until a lock can be obtained, 0 will return immediately, and a value greater than 0 will wait the specified number of milliseconds before timing out. The boolean return value should be examined to ensure the caller has actually obtained the lock. See ConcurrentMap.lock() for more details. Note that if a timeout value is not passed to lock()
the default is 0. With replicated caches, the entire cache can be locked by using ConcurrentMap.LOCK_ALL
as the key, although this is usually not recommended. This operation is not supported with partitioned caches.
In both replicated and partitioned caches, gets are permitted on keys that are locked. In a replicated cache, puts are blocked, but they are not blocked in a partitioned cache. When a lock is in place, it is the responsibility of the caller (either in the same thread or the same cluster node, depending on the lease-granularity
configuration) to release the lock. This is why locks should always be released with a finally clause (or equivalent). If this is not done, unhandled exceptions may leave locks in place indefinitely. For more information on lease-granularity
configuration, see "DistributedCache Service Parameters".
A TransactionMap
can be used to update multiple items in a Coherence cache in a transaction. To perform transactions with a TransactionMap
, the Java Transaction API (JTA) libraries must be present in the classpath. TransactionMaps
are created using the CacheFactory
:
NamedCache cache = CacheFactory.getCache("dist-cache"); TransactionMap tmap = CacheFactory.getLocalTransaction(cache);
Before using a TransactionMap
, concurrency and isolation levels should be set to the desired level. For most short lived, highly concurrent transactions, a concurrency level of CONCUR_PESSIMISTIC
and an isolation level of TRANSACTION_REPEATABLE_GET
is necessary. For most long lived transactions that don't occur as often, CONCUR_OPTIMISTIC
and TRANSACTION_REPEATABLE_GET
are good defaults. The combination of concurrency level CONCUR_PESSIMISTIC
and isolation level TRANSACTION_SERIALIZABLE
will lock the entire cache. As mentioned previously, partitioned caches do not allow the entire cache to be locked, thus this mode will not work for partitioned caches. (In general, this level of isolation is not required for reliable transaction processing.) Queries against caches (calling keySet(Filter
)
) or entrySet(Filter)
) are always performed with READ_COMMITTED
isolation. For more information about concurrency and isolation levels, see the TransactionMap
API.
Here is how to set the isolation and concurrency levels:
tmap.setTransactionIsolation(TransactionMap.TRANSACTION_REPEATABLE_GET); tmap.setConcurrency(TransactionMap.CONCUR_PESSIMISTIC);
Now the TransactionMap
can be used to update the cache in a transaction:
Example 2-2 Updating the Cache in a Transaction
import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; import com.tangosol.util.Base; import com.tangosol.util.TransactionMap; import java.util.Collection; import java.util.Collections; public class TxMapSample extends Base { public static void main(String[] args) { // populate the cache NamedCache cache = CacheFactory.getCache("dist-cache"); String key1 = "key1"; String key2 = "key2"; cache.put(key1, new Integer(1)); cache.put(key2, new Integer(1)); out("Initial value for key 1: " + cache.get(key1)); out("Initial value for key 2: " + cache.get(key2)); // create one TransactionMap per NamedCache TransactionMap mapTx = CacheFactory.getLocalTransaction(cache); mapTx.setTransactionIsolation(TransactionMap.TRANSACTION_REPEATABLE_GET); mapTx.setConcurrency(TransactionMap.CONCUR_PESSIMISTIC); // gather the cache(s) into a Collection Collection txnCollection = Collections.singleton(mapTx); boolean fTxSucceeded = false; try { // start the transaction mapTx.begin(); int i1 = ((Integer)mapTx.get(key1)).intValue(); mapTx.put(key1, new Integer(++i1)); int i2 = ((Integer)mapTx.get(key2)).intValue(); mapTx.put(key2, new Integer(++i2)); // commit the changes fTxSucceeded = CacheFactory.commitTransactionCollection(txnCollection, 1); } catch (Throwable t) { // rollback CacheFactory.rollbackTransactionCollection(txnCollection); } int v1 = ((Integer) cache.get(key1)).intValue(); int v2 = ((Integer) cache.get(key2)).intValue(); out("Transaction " + (fTxSucceeded ? "succeeded" : "did not succeed")); out("Updated value for key 1: " + v1); out("Updated value for key 2: " + v2); azzert(v1 == 2, "Expected value for key1 == 2"); azzert(v2 == 2, "Expected value for key2 == 2"); } }
The CacheFactory
API provides helper methods for committing and rolling back a collection of TransactionMap
instances. The commit
method uses a conventional two-phase commit (2PC) protocol. However, as with any 2PC implementation, if one of the resources fails to commit during the second phase ("commit"), the transaction may end up partially committed. Unlike traditional 2PC implementations, Coherence can guarantee that a given server will not enter an "in doubt" state during the commit phase, but other failures are possible (for example, write-through caching can cause persistent failures). Additionally, as the transaction log is stored only on the client, a client-side failure during the "commit" phase might result in a partial commit. As the "commit" phase is nonblocking (any required locks are acquired before the start of the "commit" phase), the "commit" phase is much shorter (usually no more than a few milliseconds) than the "prepare" phase and thus the exposure, while nonzero, is minimal for typical workloads.
MapListeners
registered with caches that participate in a transaction will receive a MapEvent
as each item is committed. There are no guarantees that events will be fired in the order that they appear in the transaction. Additionally, if the transaction updates an item multiple times, only one event will be dispatched, reflecting the final state of the item.
Coherence ships with a JCA 1.0 compliant resource adaptor that can be used to manage transactions in a Java EE container. It is packaged in a resource adaptor archive (RAR) file that can be deployed to any Java EE container compatible with JCA 1.0. When deployed, JTA can be used to execute the transaction:
Example 2-3 Configuration for a JCA Container
String key = "key"; Context ctx = new InitialContext(); UserTransaction tx = null; try { // the transaction manager from container tx = (UserTransaction) ctx.lookup("java:comp/UserTransaction"); tx.begin(); // the try-catch-finally block below is the block of code // that could be on an EJB and therefore automatically within // a transactional context CacheAdapter adapter = null; try { adapter = new CacheAdapter(ctx, "tangosol.coherenceTx", CacheAdapter.CONCUR_OPTIMISTIC, CacheAdapter.TRANSACTION_GET_COMMITTED, 0); NamedCache cache = adapter.getNamedCache("dist-test", getClass().getClassLoader()); int n = ((Integer)cache.get(key)).intValue(); cache.put(key, new Integer(++n)); } catch (Throwable t) { String sMsg = "Failed to connect: " + t; System.err.println(sMsg); t.printStackTrace(System.err); } finally { try { adapter.close(); } catch (Throwable ex) { System.err.println("SHOULD NOT HAPPEN: " + ex); } } } finally { try { tx.commit(); } catch (Throwable t) { String sMsg = "Failed to commit: " + t; System.err.println(sMsg); } }
Coherence can participate in an XA transaction as the last resource. This feature is supported by most transaction managers and is known by various names, such as "Last Resource Commit" or "Last Participant." In this scenario, the completion of a transaction would involve the following steps:
prepare is called on all XA resources
commit is called on the Coherence transaction
if the commit is successful, commit is called on the other XA participants in the transaction.
Refer to your transaction manager's documentation on XA last resource configuration for further details on this technique.
The InvocableMap
superinterface of NamedCache
allows for concurrent lock-free execution of processing code within a cache. This processing is performed by an EntryProcessor
. In exchange for reduced flexibility compared to the more general purpose TransactionMap
and ConcurrentMap
explicit locking APIs, EntryProcessors
provide the highest levels of efficiency without compromising data reliability.
Since EntryProcessors
perform an implicit low level lock on the entries they are processing, the end user can place processing code in an EntryProcessor
without having to worry about concurrency control. Note that this is not the same as the explicit lock(key)
functionality provided by ConcurrentMap!
In a replicated cache or a partitioned cache running under Caching Edition, execution will happen locally on the initiating client. In partitioned caches running under Enterprise Edition or greater, the execution occurs on the node that is responsible for primary storage of the data.
InvocableMap
provides three methods of starting EntryProcessors:
Invoke an EntryProcessor
on a specific key. Note that the key need not exist in the cache to invoke an EntryProcessor on it.
Invoke an EntryProcessor
on a collection of keys.
Invoke an EntryProcessor
on a Filter
. In this case, the Filter
will be executed against the cache entries. Each entry that matches the Filter
criteria will have the EntryProcessor
executed against it. For more information on Filters, see Chapter 6, "Query the Cache".
In partitioned caches running under Enterprise Edition or greater, EntryProcessors
will be executed in parallel across the cluster (on the nodes that own the individual entries.) This provides a significant advantage over having a client lock all affected keys, pull all required data from the cache, process the data, place the data back in the cache, and unlock the keys. The processing occurs in parallel across multiple machines (as opposed to serially on one machine) and the network overhead of obtaining and releasing locks is eliminated.
Here is a sample of high level concurrency control. Code that will require network access is commented:
Example 2-4 Concurrency Control without Using EntryProcessors
final NamedCache cache = CacheFactory.getCache("dist-test"); final String key = "key"; cache.put(key, new Integer(1)); // begin processing // *requires network access* if (cache.lock(key, 0)) { try { // *requires network access* Integer i = (Integer) cache.get(key); // *requires network access* cache.put(key, new Integer(i.intValue() + 1)); } finally { // *requires network access* cache.unlock(key); } } // end processing
The following is an equivalent technique using an Entry Processor. Again, network access is commented:
Example 2-5 Concurrency Control Using EntryProcessors
final NamedCache cache = CacheFactory.getCache("dist-test"); final String key = "key"; cache.put(key, new Integer(1)); // begin processing // *requires network access* cache.invoke(key, new MyCounterProcessor()); // end processing ... public static class MyCounterProcessor extends AbstractProcessor { // this is executed on the node that owns the data, // no network access required public Object process(InvocableMap.Entry entry) { Integer i = (Integer) entry.getValue(); entry.setValue(new Integer(i.intValue() + 1)); return null; } }
EntryProcessors
are individually executed atomically, however multiple EntryProcessor
invocations by using InvocableMap.invokeAll()
will not be executed as one atomic unit. As soon as an individual EntryProcessor
has completed, any updates made to the cache will be immediately visible while the other EntryProcessors
are executing. Furthermore, an uncaught exception in an EntryProcessor
will not prevent the others from executing. Should the primary node for an entry fail while executing an EntryProcessor
, the backup node will perform the execution instead. However if the node fails after the completion of an EntryProcessor
, the EntryProcessor
will not be invoked on the backup.
Note that in general, EntryProcessors
should be short lived. Applications with longer running EntryProcessors
should increase the size of the distributed service thread pool so that other operations performed by the distributed service are not blocked by the long running EntryProcessor
. For more information on the distributed service thread pool, see "DistributedCache Service Parameters".
Coherence includes several EntryProcessor
implementations for common use cases. Further details on these EntryProcessors
, along with additional information on parallel data processing, can be found in "Provide a Data Grid".
When using write-behind and write-through to a database in a Coherence cache, transactional behavior must be taken into account. With write-through enabled, the put will succeed if the item is successfully stored in the database. Otherwise, the exception that occurred in the CacheStore will be rethrown to the client. (Note: to enable this behavior, set <rollback-cachestore-failures>
to true
. See "read-write-backing-map-scheme" for more details.) This only applies when updating one cache item at a time; if two cache items are updated at a time then each CacheStore
operation will be a distinct database transaction. This limitation will be addressed in a future release of Coherence.
Write-behind caching provides much higher throughput and performance. However, writes to the database are performed after the cache has been updated. Therefore care must be taken to ensure that writes to the database will not fail. Write-behind should only be used in applications where:
data constraints will be managed by the application, not the database
no other application will update the database
See "Read-Through, Write-Through, Write-Behind Caching and Refresh-Ahead" for more information on cache stores.
If multiple updates to cache entries must be persisted to the database in a transaction, it may be more suitable to implement a cache-aside pattern where the client is responsible for updating the database and the cache. Note that a CacheLoader may still be used to load cache misses from the data source.