2 Implement Transactions, Locks, and Concurrency

Coherence provides several different options to support locking, transactions, and concurrent access to data.

2.1 Concurrency Options

Coherence provides several options for managing concurrent access to data.

Table 2-1 Coherence Concurrent Access Options

Option Name Description

Explicit Locking

The ConcurrentMap interface (part of the NamedCache interface) supports explicit locking operations. Many developers find this simple locking API to be the most natural approach.

Transactions

The TransactionMap API builds on top of the explicit locking operations to support ACID-style transactions.

Container Integration

For transaction management in a Java EE container, Coherence provides a JCA resource adaptor to allow transactions to be managed by using JTA. Although Coherence does not currently support XA transactions, it can participate in XA transactions as the last resource.

Entry Processors

Coherence also supports a lock-free programming model through the EntryProcessor API. For many transaction types, this minimizes contention and latency and improves system throughput, without compromising the fault-tolerance of data operations.

Data Source Integration

Guidelines on maintaining caches with local (non XA) data resources.


2.2 Explicit Locking

The standard NamedCache interface extends the ConcurrentMap interface which includes basic locking methods. Locking operations are applied at the entry level by requesting a lock against a specific key in a NamedCache:

Example 2-1 Applying Locking Operations on a Cache

...
NamedCache cache = CacheFactory.getCache("dist-cache");
Object key = "example_key";
cache.lock(key, -1);
try
    {
    Object value = cache.get(key);
    // application logic
    cache.put(key, value);
    }
finally
    {
    // Always unlock in a "finally" block
    // to ensure that uncaught exceptions
    // don't leave data locked
    cache.unlock(key);
    }
...

Coherence lock functionality is similar to the Java synchronized keyword and the C# lock keyword: locks only block locks. Threads must cooperatively coordinate access to data through appropriate use of locking. If a thread has locked the key to an item, another thread can read the item without locking.

Locks are unaffected by server failure (and will failover to a backup server.) Locks are immediately released when the lock owner (client) fails.

Locking behavior varies depending on the timeout requested and the type of cache. A timeout of -1 will block indefinitely until a lock can be obtained, 0 will return immediately, and a value greater than 0 will wait the specified number of milliseconds before timing out. The boolean return value should be examined to ensure the caller has actually obtained the lock. See ConcurrentMap.lock() for more details. Note that if a timeout value is not passed to lock() the default is 0. With replicated caches, the entire cache can be locked by using ConcurrentMap.LOCK_ALL as the key, although this is usually not recommended. This operation is not supported with partitioned caches.

In both replicated and partitioned caches, gets are permitted on keys that are locked. In a replicated cache, puts are blocked, but they are not blocked in a partitioned cache. When a lock is in place, it is the responsibility of the caller (either in the same thread or the same cluster node, depending on the lease-granularity configuration) to release the lock. This is why locks should always be released with a finally clause (or equivalent). If this is not done, unhandled exceptions may leave locks in place indefinitely. For more information on lease-granularity configuration, see "DistributedCache Service Parameters".

2.3 Transactions

A TransactionMap can be used to update multiple items in a Coherence cache in a transaction. To perform transactions with a TransactionMap, the Java Transaction API (JTA) libraries must be present in the classpath. TransactionMaps are created using the CacheFactory:

NamedCache     cache = CacheFactory.getCache("dist-cache");
TransactionMap tmap  = CacheFactory.getLocalTransaction(cache);

Before using a TransactionMap, concurrency and isolation levels should be set to the desired level. For most short lived, highly concurrent transactions, a concurrency level of CONCUR_PESSIMISTIC and an isolation level of TRANSACTION_REPEATABLE_GET is necessary. For most long lived transactions that don't occur as often, CONCUR_OPTIMISTIC and TRANSACTION_REPEATABLE_GET are good defaults. The combination of concurrency level CONCUR_PESSIMISTIC and isolation level TRANSACTION_SERIALIZABLE will lock the entire cache. As mentioned previously, partitioned caches do not allow the entire cache to be locked, thus this mode will not work for partitioned caches. (In general, this level of isolation is not required for reliable transaction processing.) Queries against caches (calling keySet(Filter)) or entrySet(Filter)) are always performed with READ_COMMITTED isolation. For more information about concurrency and isolation levels, see the TransactionMap API.

Here is how to set the isolation and concurrency levels:

tmap.setTransactionIsolation(TransactionMap.TRANSACTION_REPEATABLE_GET);
tmap.setConcurrency(TransactionMap.CONCUR_PESSIMISTIC);

Now the TransactionMap can be used to update the cache in a transaction:

Example 2-2 Updating the Cache in a Transaction

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;

import com.tangosol.util.Base;
import com.tangosol.util.TransactionMap;

import java.util.Collection;
import java.util.Collections;


public class TxMapSample
        extends Base
    {
    public static void main(String[] args)
        {
        // populate the cache
        NamedCache cache = CacheFactory.getCache("dist-cache");

        String key1 = "key1";
        String key2 = "key2";

        cache.put(key1, new Integer(1));
        cache.put(key2, new Integer(1));

        out("Initial value for key 1: " + cache.get(key1));
        out("Initial value for key 2: " + cache.get(key2));

        // create one TransactionMap per NamedCache
        TransactionMap mapTx = CacheFactory.getLocalTransaction(cache);
        mapTx.setTransactionIsolation(TransactionMap.TRANSACTION_REPEATABLE_GET);
        mapTx.setConcurrency(TransactionMap.CONCUR_PESSIMISTIC);

        // gather the cache(s) into a Collection
        Collection txnCollection = Collections.singleton(mapTx);
        boolean    fTxSucceeded  = false;

        try
            {
            // start the transaction
            mapTx.begin();

            int i1 = ((Integer)mapTx.get(key1)).intValue();
            mapTx.put(key1, new Integer(++i1));

            int i2 = ((Integer)mapTx.get(key2)).intValue();
            mapTx.put(key2, new Integer(++i2));

            // commit the changes
            fTxSucceeded = CacheFactory.commitTransactionCollection(txnCollection, 1);
            }
            
        catch (Throwable t)
            {
            // rollback
            CacheFactory.rollbackTransactionCollection(txnCollection);
            }

        int v1 = ((Integer) cache.get(key1)).intValue();
        int v2 = ((Integer) cache.get(key2)).intValue();

        out("Transaction " + (fTxSucceeded ? "succeeded" : "did not succeed"));

        out("Updated value for key 1: " + v1);
        out("Updated value for key 2: " + v2);

        azzert(v1 == 2, "Expected value for key1 == 2");
        azzert(v2 == 2, "Expected value for key2 == 2");
        }
    }

The CacheFactory API provides helper methods for committing and rolling back a collection of TransactionMap instances. The commit method uses a conventional two-phase commit (2PC) protocol. However, as with any 2PC implementation, if one of the resources fails to commit during the second phase ("commit"), the transaction may end up partially committed. Unlike traditional 2PC implementations, Coherence can guarantee that a given server will not enter an "in doubt" state during the commit phase, but other failures are possible (for example, write-through caching can cause persistent failures). Additionally, as the transaction log is stored only on the client, a client-side failure during the "commit" phase might result in a partial commit. As the "commit" phase is nonblocking (any required locks are acquired before the start of the "commit" phase), the "commit" phase is much shorter (usually no more than a few milliseconds) than the "prepare" phase and thus the exposure, while nonzero, is minimal for typical workloads.

MapListeners registered with caches that participate in a transaction will receive a MapEvent as each item is committed. There are no guarantees that events will be fired in the order that they appear in the transaction. Additionally, if the transaction updates an item multiple times, only one event will be dispatched, reflecting the final state of the item.

2.4 Container Integration

2.4.1 JCA

Coherence ships with a JCA 1.0 compliant resource adaptor that can be used to manage transactions in a Java EE container. It is packaged in a resource adaptor archive (RAR) file that can be deployed to any Java EE container compatible with JCA 1.0. When deployed, JTA can be used to execute the transaction:

Example 2-3 Configuration for a JCA Container

String          key = "key";
Context         ctx = new InitialContext();
UserTransaction tx  = null;
try
    {
    // the transaction manager from container
    tx = (UserTransaction) ctx.lookup("java:comp/UserTransaction");
    tx.begin();

    // the try-catch-finally block below is the block of code
    // that could be on an EJB and therefore automatically within
    // a transactional context
    CacheAdapter adapter = null;
    try
        {
        adapter = new CacheAdapter(ctx, "tangosol.coherenceTx",
                CacheAdapter.CONCUR_OPTIMISTIC,
                CacheAdapter.TRANSACTION_GET_COMMITTED, 0);

        NamedCache cache = adapter.getNamedCache("dist-test",
                getClass().getClassLoader());

        int n = ((Integer)cache.get(key)).intValue();
        cache.put(key, new Integer(++n));
        }
    catch (Throwable t)
        {
        String sMsg = "Failed to connect: " + t;
        System.err.println(sMsg);
        t.printStackTrace(System.err);
        }
    finally
        {
        try
            {
            adapter.close();
            }
        catch (Throwable ex)
            {
            System.err.println("SHOULD NOT HAPPEN: " + ex);
            }
        }
    }
finally
    {
    try
        {
        tx.commit();
        }
    catch (Throwable t)
        {
        String sMsg = "Failed to commit: " + t;
        System.err.println(sMsg);
        }
    }

2.4.2 XA

Coherence can participate in an XA transaction as the last resource. This feature is supported by most transaction managers and is known by various names, such as "Last Resource Commit" or "Last Participant." In this scenario, the completion of a transaction would involve the following steps:

  • prepare is called on all XA resources

  • commit is called on the Coherence transaction

  • if the commit is successful, commit is called on the other XA participants in the transaction.

Refer to your transaction manager's documentation on XA last resource configuration for further details on this technique.

2.5 Entry Processors

The InvocableMap superinterface of NamedCache allows for concurrent lock-free execution of processing code within a cache. This processing is performed by an EntryProcessor. In exchange for reduced flexibility compared to the more general purpose TransactionMap and ConcurrentMap explicit locking APIs, EntryProcessors provide the highest levels of efficiency without compromising data reliability.

Since EntryProcessors perform an implicit low level lock on the entries they are processing, the end user can place processing code in an EntryProcessor without having to worry about concurrency control. Note that this is not the same as the explicit lock(key) functionality provided by ConcurrentMap!

In a replicated cache or a partitioned cache running under Caching Edition, execution will happen locally on the initiating client. In partitioned caches running under Enterprise Edition or greater, the execution occurs on the node that is responsible for primary storage of the data.

InvocableMap provides three methods of starting EntryProcessors:

  • Invoke an EntryProcessor on a specific key. Note that the key need not exist in the cache to invoke an EntryProcessor on it.

  • Invoke an EntryProcessor on a collection of keys.

  • Invoke an EntryProcessor on a Filter. In this case, the Filter will be executed against the cache entries. Each entry that matches the Filter criteria will have the EntryProcessor executed against it. For more information on Filters, see Chapter 6, "Query the Cache".

In partitioned caches running under Enterprise Edition or greater, EntryProcessors will be executed in parallel across the cluster (on the nodes that own the individual entries.) This provides a significant advantage over having a client lock all affected keys, pull all required data from the cache, process the data, place the data back in the cache, and unlock the keys. The processing occurs in parallel across multiple machines (as opposed to serially on one machine) and the network overhead of obtaining and releasing locks is eliminated.

Here is a sample of high level concurrency control. Code that will require network access is commented:

Example 2-4 Concurrency Control without Using EntryProcessors

final NamedCache cache = CacheFactory.getCache("dist-test");
final String key = "key";

cache.put(key, new Integer(1));


// begin processing

// *requires network access*
if (cache.lock(key, 0))
    {
    try
        {
        // *requires network access*
        Integer i = (Integer) cache.get(key);
        // *requires network access*
        cache.put(key, new Integer(i.intValue() + 1));
        }
    finally
        {
        // *requires network access*
        cache.unlock(key);
        }
    }

// end processing

The following is an equivalent technique using an Entry Processor. Again, network access is commented:

Example 2-5 Concurrency Control Using EntryProcessors

final NamedCache cache = CacheFactory.getCache("dist-test");
final String key = "key";

cache.put(key, new Integer(1));


// begin processing

// *requires network access*
cache.invoke(key, new MyCounterProcessor());

// end processing

...

public static class MyCounterProcessor
        extends AbstractProcessor
    {
    // this is executed on the node that owns the data,
    // no network access required
    public Object process(InvocableMap.Entry entry)
        {
        Integer i = (Integer) entry.getValue();
        entry.setValue(new Integer(i.intValue() + 1));
        return null;
        }
    }

EntryProcessors are individually executed atomically, however multiple EntryProcessor invocations by using InvocableMap.invokeAll() will not be executed as one atomic unit. As soon as an individual EntryProcessor has completed, any updates made to the cache will be immediately visible while the other EntryProcessors are executing. Furthermore, an uncaught exception in an EntryProcessor will not prevent the others from executing. Should the primary node for an entry fail while executing an EntryProcessor, the backup node will perform the execution instead. However if the node fails after the completion of an EntryProcessor, the EntryProcessor will not be invoked on the backup.

Note that in general, EntryProcessors should be short lived. Applications with longer running EntryProcessors should increase the size of the distributed service thread pool so that other operations performed by the distributed service are not blocked by the long running EntryProcessor. For more information on the distributed service thread pool, see "DistributedCache Service Parameters".

Coherence includes several EntryProcessor implementations for common use cases. Further details on these EntryProcessors, along with additional information on parallel data processing, can be found in "Provide a Data Grid".

2.6 Data Source Integration

When using write-behind and write-through to a database in a Coherence cache, transactional behavior must be taken into account. With write-through enabled, the put will succeed if the item is successfully stored in the database. Otherwise, the exception that occurred in the CacheStore will be rethrown to the client. (Note: to enable this behavior, set <rollback-cachestore-failures> to true. See "read-write-backing-map-scheme" for more details.) This only applies when updating one cache item at a time; if two cache items are updated at a time then each CacheStore operation will be a distinct database transaction. This limitation will be addressed in a future release of Coherence.

Write-behind caching provides much higher throughput and performance. However, writes to the database are performed after the cache has been updated. Therefore care must be taken to ensure that writes to the database will not fail. Write-behind should only be used in applications where:

  • data constraints will be managed by the application, not the database

  • no other application will update the database

See "Read-Through, Write-Through, Write-Behind Caching and Refresh-Ahead" for more information on cache stores.

If multiple updates to cache entries must be persisted to the database in a transaction, it may be more suitable to implement a cache-aside pattern where the client is responsible for updating the database and the cache. Note that a CacheLoader may still be used to load cache misses from the data source.