Transactions, Locks and Concurrency

Concurrency Options

Coherence provides several options for managing concurrent access to data.

Explicit locking The ConcurrentMap interface (part of the NamedCache interface) supports explicit locking operations. Many developers find this simple locking API to be the most natural approach.
Transactions The TransactionMap API builds on top of the explicit locking operations to support ACID-style transactions.
Container Integration For transaction management in a Java EE container, Coherence provides a JCA resource adaptor in order to allow transactions to be managed via JTA. Although Coherence does not support XA transactions at this time, it can participate in XA transactions as the last resource.
EntryProcessors Coherence also supports a lock-free programming model through the EntryProcessor API. For many transaction types, this minimizes contention and latency and improves system throughput, without compromising the fault-tolerance of data operations.
Data Source Integration Guidelines on maintaining caches with local (non XA) data resources.

Explicit Locking

The standard NamedCache interface extends the ConcurrentMap interface which includes basic locking methods. Locking operations are applied at the entry level by requesting a lock against a specific key in a NamedCache:

NamedCache cache = CacheFactory.getCache("dist-cache");
Object key = "example_key";
cache.lock(key, -1);
try
    {
    Object value = cache.get(key);
    // application logic
    cache.put(key, value);
    }
finally
    {
    // Always unlock in a "finally" block
    // to ensure that uncaught exceptions
    // don't leave data locked
    cache.unlock(key);
    }

Coherence lock functionality is similar to the Java synchronized keyword and the C# lock keyword: locks only block locks. Threads must cooperatively coordinate access to data through appropriate use of locking. If a thread has locked the key to an item, another thread can read the item without locking.

Locks are unaffected by server failure (and will failover to a backup server.) Locks are immediately released when the lock owner (client) fails.

Locking behavior varies depending on the timeout requested and the type of cache. A timeout of -1 will block indefinitely until a lock can be obtained, 0 will return immediately, and a value greater than 0 will wait the specified number of milliseconds before timing out. The boolean return value should be examined to ensure the caller has actually obtained the lock. See ConcurrentMap.lock() for more details. Note that if a timeout value is not passed to lock() the default is 0. With replicated caches, the entire cache can be locked by using ConcurrentMap.LOCK_ALL as the key, although this is usually not recommended. This operation is not supported with partitioned caches.

In both replicated and partitioned caches, gets are permitted on keys that are locked. In a replicated cache, puts are blocked, but they are not blocked in a partitioned cache. Once a lock is in place, it is the responsibility of the caller (either in the same thread or the same cluster node, depending on the lease-granularity configuration) to release the lock. This is why locks should always be released with a finally clause (or equivalent). If this is not done, unhandled exceptions may leave locks in place indefinitely.

Transactions

A TransactionMap can be used to update multiple items in a Coherence cache in a transaction. In order to perform transactions with a TransactionMap, the Java Transaction API (JTA) libraries must be present in the classpath. TransactionMaps are created using the CacheFactory:

NamedCache     cache = CacheFactory.getCache("dist-cache");
TransactionMap tmap  = CacheFactory.getLocalTransaction(cache);

Before using a TransactionMap, concurrency and isolation levels should be set to the desired level. For most short lived, highly concurrent transactions, a concurrency level of CONCUR_PESSIMISTIC and an isolation level of TRANSACTION_REPEATABLE_GET should be used. For most long lived transactions that don't occur as often, CONCUR_OPTIMISTIC and TRANSACTION_REPEATABLE_GET are good defaults. The combination of concurrency level CONCUR_PESSIMISTIC and isolation level TRANSACTION_SERIALIZABLE will lock the entire cache. As mentioned previously, partitioned caches do not allow the entire cache to be locked, thus this mode will not work for partitioned caches. (In general, this level of isolation is not required for reliable transaction processing.) Queries against caches (calling keySet(Filter) or entrySet(Filter)) are always performed with READ_COMMITTED isolation. For more information about concurrency and isolation levels, see the TransactionMap API.

Here is how to set the isolation and concurrency levels:

tmap.setTransactionIsolation(TransactionMap.TRANSACTION_REPEATABLE_GET);
tmap.setConcurrency(TransactionMap.CONCUR_PESSIMISTIC);

Now the TransactionMap can be used to update the cache in a transaction:

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;

import com.tangosol.util.Base;
import com.tangosol.util.TransactionMap;

import java.util.Collection;
import java.util.Collections;


public class TxMapSample
        extends Base
    {
    public static void main(String[] args)
        {
        // populate the cache
        NamedCache cache = CacheFactory.getCache("dist-cache");

        String key1 = "key1";
        String key2 = "key2";

        cache.put(key1, new Integer(1));
        cache.put(key2, new Integer(1));

        out("Initial value for key 1: " + cache.get(key1));
        out("Initial value for key 2: " + cache.get(key2));

        // create one TransactionMap per NamedCache
        TransactionMap mapTx = CacheFactory.getLocalTransaction(cache);
        mapTx.setTransactionIsolation(TransactionMap.TRANSACTION_REPEATABLE_GET);
        mapTx.setConcurrency(TransactionMap.CONCUR_PESSIMISTIC);

        // gather the cache(s) into a Collection
        Collection txnCollection = Collections.singleton(mapTx);
        boolean    fTxSucceeded  = false;

        try
            {
            // start the transaction
            mapTx.begin();

            int i1 = ((Integer)mapTx.get(key1)).intValue();
            mapTx.put(key1, new Integer(++i1));

            int i2 = ((Integer)mapTx.get(key2)).intValue();
            mapTx.put(key2, new Integer(++i2));

            // commit the changes
            fTxSucceeded = CacheFactory.commitTransactionCollection(txnCollection, 1);
            }
            
        catch (Throwable t)
            {
            // rollback
            CacheFactory.rollbackTransactionCollection(txnCollection);
            }

        int v1 = ((Integer) cache.get(key1)).intValue();
        int v2 = ((Integer) cache.get(key2)).intValue();

        out("Transaction " + (fTxSucceeded ? "succeeded" : "did not succeed"));

        out("Updated value for key 1: " + v1);
        out("Updated value for key 2: " + v2);

        azzert(v1 == 2, "Expected value for key1 == 2");
        azzert(v2 == 2, "Expected value for key2 == 2");
        }
    }

The CacheFactory API provides helper methods for committing and rolling back a collection of TransactionMap instances. The commit method uses a conventional two-phase commit (2PC) protocol. However, as with any 2PC implementation, if one of the resources fails to commit during the second phase ("commit"), the transaction may end up partially committed. Unlike traditional 2PC implementations, Coherence can guarantee that a given server will not enter an "in doubt" state during the commit phase, but other failures are possible (e.g. write-through caching can cause persistent failures). Additionally, as the transaction log is stored only on the client, a client-side failure during the "commit" phase might result in a partial commit. As the "commit" phase is non-blocking (any required locks are acquired prior to the start of the "commit" phase), the "commit" phase is much shorter (usually no more than a few milliseconds) than the "prepare" phase and thus the exposure, while non-zero, is minimal for typical workloads.

MapListeners registered with caches that participate in a transaction will receive a MapEvent as each item is committed. There are no guarantees that events will be fired in the order that they appear in the transaction. Additionally, if the transaction updates an item multiple times, only one event will be dispatched, reflecting the final state of the item.

Container Integration

JCA

Coherence ships with a JCA 1.0 compliant resource adaptor that can be used to manage transactions in a Java EE container. It is packaged in a resource adaptor archive (RAR) file that can be deployed to any Java EE container compatible with JCA 1.0. Once deployed, JTA can be used to execute the transaction:

String          key = "key";
Context         ctx = new InitialContext();
UserTransaction tx  = null;
try
    {
    // the transaction manager from container
    tx = (UserTransaction) ctx.lookup("java:comp/UserTransaction");
    tx.begin();

    // the try-catch-finally block below is the block of code
    // that could be on an EJB and therefore automatically within
    // a transactional context
    CacheAdapter adapter = null;
    try
        {
        adapter = new CacheAdapter(ctx, "tangosol.coherenceTx",
                CacheAdapter.CONCUR_OPTIMISTIC,
                CacheAdapter.TRANSACTION_GET_COMMITTED, 0);

        NamedCache cache = adapter.getNamedCache("dist-test",
                getClass().getClassLoader());

        int n = ((Integer)cache.get(key)).intValue();
        cache.put(key, new Integer(++n));
        }
    catch (Throwable t)
        {
        String sMsg = "Failed to connect: " + t;
        System.err.println(sMsg);
        t.printStackTrace(System.err);
        }
    finally
        {
        try
            {
            adapter.close();
            }
        catch (Throwable ex)
            {
            System.err.println("SHOULD NOT HAPPEN: " + ex);
            }
        }
    }
finally
    {
    try
        {
        tx.commit();
        }
    catch (Throwable t)
        {
        String sMsg = "Failed to commit: " + t;
        System.err.println(sMsg);
        }
    }

XA

Coherence can participate in an XA transaction as the last resource. This feature is supported by most transaction managers and is known by various names, such as "Last Resource Commit" or "Last Participant." In this scenario, the completion of a transaction would involve the following steps:

Refer to your transaction manager's documentation on XA last resource configuration for further details on this technique.

Entry Processors

The InvocableMap superinterface of NamedCache allows for concurrent lock-free execution of processing code within a cache. This processing is performed by an EntryProcessor. In exchange for reduced flexibility compared to the more general purpose TransactionMap and ConcurrentMap explicit locking APIs, EntryProcessors provide the highest levels of efficiency without compromising data reliability.

Since EntryProcessors perform an implicit low level lock on the entries they are processing, the end user can place processing code in an EntryProcessor without having to worry about concurrency control. Note that this is not the same as the explicit lock(key) functionality provided by ConcurrentMap!

In a replicated cache or a partitioned cache running under Caching Edition, execution will happen locally on the initiating client. In partitioned caches running under Enterprise Edition or greater, the execution occurs on the node that is responsible for primary storage of the data.

InvocableMap provides three methods of starting EntryProcessors:

In partitioned caches running under Enterprise Edition or greater, EntryProcessors will be executed in parallel across the cluster (on the nodes that own the individual entries.) This provides a significant advantage over having a client lock all affected keys, pull all required data from the cache, process the data, place the data back in the cache, and unlock the keys. The processing occurs in parallel across multiple machines (as opposed to serially on one machine) and the network overhead of obtaining and releasing locks is eliminated.

Here is a sample of high level concurrency control. Code that will require network access is commented:

final NamedCache cache = CacheFactory.getCache("dist-test");
final String key = "key";

cache.put(key, new Integer(1));


// begin processing

// *requires network access*
if (cache.lock(key, 0))
    {
    try
        {
        // *requires network access*
        Integer i = (Integer) cache.get(key);
        // *requires network access*
        cache.put(key, new Integer(i.intValue() + 1));
        }
    finally
        {
        // *requires network access*
        cache.unlock(key);
        }
    }

// end processing

The following is an equivalent technique using an Entry Processor. Again, network access is commented:

final NamedCache cache = CacheFactory.getCache("dist-test");
final String key = "key";

cache.put(key, new Integer(1));


// begin processing

// *requires network access*
cache.invoke(key, new MyCounterProcessor());

// end processing

...

public static class MyCounterProcessor
        extends AbstractProcessor
    {
    // this is executed on the node that owns the data,
    // no network access required
    public Object process(InvocableMap.Entry entry)
        {
        Integer i = (Integer) entry.getValue();
        entry.setValue(new Integer(i.intValue() + 1));
        return null;
        }
    }

EntryProcessors are individually executed atomically, however multiple EntryProcessor invocations via InvocableMap.invokeAll() will not be executed as one atomic unit. As soon as an individual EntryProcessor has completed, any updates made to the cache will be immediately visible while the other EntryProcessors are executing. Furthermore, an uncaught exception in an EntryProcessor will not prevent the others from executing. Should the primary node for an entry fail while executing an EntryProcessor, the backup node will perform the execution instead. However if the node fails after the completion of an EntryProcessor, the EntryProcessor will not be invoked on the backup.

Note that in general, EntryProcessors should be short lived. Applications with longer running EntryProcessors should increase the size of the distributed service thread pool so that other operations performed by the distributed service are not blocked by the long running EntryProcessor.

Coherence includes several EntryProcessor implementations for common use cases. Further details on these EntryProcessors, along with additional information on parallel data processing, can be found in this document: Provide a Data Grid.

Data Source Integration

When using write-behind and write-through to a database in a Coherence cache, transactional behavior must be taken into account. With write-through enabled, the put will succeed if the item is successfully stored in the database. Otherwise, the exception that occurred in the CacheStore will be rethrown to the client. (Note: in order to enable this behavior, set <rollback-cachestore-failures> to true. See read-write-backing-map-scheme for more details.) This only applies when updating one cache item at a time; if two cache items are updated at a time then each CacheStore operation will be a distinct database transaction. This limitation will be addressed in a future release of Coherence.

Write-behind caching provides much higher throughput and performance. However, writes to the database are performed after the cache has been updated. Therefore care must be taken to ensure that writes to the database will not fail. Write-behind should only be used in applications where:

See Read-Through, Write-Through, Refresh-Ahead and Write-Behind Caching for more information on cache stores.

If multiple updates to cache entries must be persisted to the database in a transaction, it may be more suitable to implement a cache-aside pattern where the client is responsible for updating the database and the cache. Note that a CacheLoader may still be used to load cache misses from the data source.