41 Implementing Concurrency in a Distributed Environment

The Coherence Concurrent module provides distributed implementations of the concurrency primitives from the java.util.concurrent package such as executors, atomics, locks, semaphores, and latches.

You can implement concurrent applications using the constructs you are already familiar with and also expand the "scope" of concurrency from a single process to potentially hundreds of processes within a Coherence cluster. You can use executors to submit tasks to be executed somewhere in the cluster; you can use locks, latches, and semaphores to synchronize execution across many cluster members; you can use atomics to implement global counters across many processes, and so on.

While these features are extremely powerful and enable you to reuse the knowledge you already have, they may have detrimental effect on scalability and/or performance. Whenever you synchronize execution through locks, latches, or semaphores, you are introducing a potential bottleneck into the architecture. Whenever you use a distributed atomic to implement a global counter, you are turning very simple operations that take mere nanoseconds locally, such as increment and decrement, into fairly expensive network calls that could take milliseconds (and potentially block even longer under heavy load).

So, use these features sparingly. In many cases, there is a better, faster, and a more scalable way to accomplish the same goal using Coherence primitives such as entry processors, aggregators, and events. These primitives are designed to perform and scale well in a distributed environment.

Note:

  • To use the concurrency features, Oracle recommends using the Bootstrap API to start the Coherence cluster members. See Using the Bootstrap API.
  • Coherence concurrent features do not support, and cannot be configured to use federation. Coherence Federation is asynchronous. Therefore, it would not make sense to federate data that is inherently atomic in nature.

This chapter includes the following sections:

Using Factory Classes

Each feature (executors, atomics, locks, semaphores, and latches) is backed by one or more Coherence caches, possibly with preconfigured interceptors. All interaction with lower level Coherence primitives is hidden behind various factory classes that allow you to get the instances of the classes you need.

For example, you will use factory methods within the Atomics class to get instances of various atomic types, Locks to get lock instances, Latches and Semaphores to get latches and semaphores.

Using Local and Remote Instances

In many cases, the factory classes allow you to get both the local and the remote instances of various constructs. For example, Locks.localLock will give you an instance of a standard java.util.concurrent.locks.ReentrantLock, while Locks.remoteLock will return an instance of a RemoteLock.

In cases where JDK does not provide a standard interface, which is the case with atomics, latches, and semaphores, the interface from the existing JDK class has been extracted to create a thin wrapper around the corresponding JDK implementation. For example, Coherence Concurrent provides a Semaphore interface and a LocalSemaphore class that wraps java.util.concurrent.Semaphore. The same is true for CountDownLatch and all atomic types.

The main advantage of using factory classes to construct both the local and the remote instances is that it allows you to name local locks the same way you name the remote locks: calling Locks.localLock("foo") always returns the same Lock instance because the Locks class internally caches both the local and the remote instances it creates. In the case of remote locks, every locally cached remote lock instance is ultimately backed by a shared lock instance somewhere in the cluster, which is used to synchronize lock state across the processes.

Using Serialization

Coherence Concurrent supports both Java serialization and POF out-of-the-box serialization, with Java serialization being the default.

If you want to use POF instead, you have to set the coherence.concurrent.serializer system property to pof. You should also include the coherence-concurrent-pof-config.xml file into your own POF configuration file to register the built-in Coherence Concurrent types.

Using Persistence

Coherence Concurrent supports both active and on-demand persistence, but just like in the rest of Coherence it is set to on-demand by default.

To use active persistence, you should set the coherence.concurrent.persistence.environment system property to default-active, or use another persistence environment that has active persistence enabled.

Note:

The caches that store lock and semaphore data are configured as transient, and are not persisted when you use active or on-demand persistence.

Using the Coherence Concurrent Features

You can use the Coherence Concurrent features after declaring the features as a dependency in the pom.xml file.

To declare, make the following entry in the pom.xml file.

<dependency>
    <groupId>${coherence.groupId}</groupId>
    <artifactId>coherence-concurrent</artifactId>
    <version>${coherence.version}</version>
</dependency>

Using Executors

Coherence Concurrent provides a facility to dispatch tasks, either a Runnable or a Callable, to a Coherence cluster for execution. Executors that actually execute the submitted tasks are configured on each cluster member by defining one or more named executors within a cache configuration resource.

This section includes the following topics:

Using Executors - Examples

By default, each Coherence cluster with the coherence-concurrent module on the classpath includes a single-threaded executor that may be used to execute the dispatched tasks.

For example:
RemoteExecutor remoteExecutor = RemoteExecutor.getDefault();

Future<Void> result = remoteExecutor.submit(() -> System.out.println("Executed"));

result.get(); // block until completion
For example, to configure an executor with name Fixed5, the code will be:
RemoteExecutor remoteExecutor = RemoteExecutor.get("Fixed5");
If no executor has been configured with the given name, the RemoteExecutor will throw the following exception:
RejectedExecutionException

Each RemoteExecutor instance may hold local resources that should be released when the RemoteExecutor is no longer required. Similar to an ExecutorService, a RemoteExecutor has methods to shut the executor down. Calling these methods has no impact on the executors registered within the cluster.

Configuring Executors

Several executor types are available for configuration.

Table 41-1 Types of Executors

ExecutorService Type Description

Single thread

Creates an ExecutorService with a single thread.

Fixed thread

Creates an ExecutorService with a fixed number of threads.

Cached

Creates an ExecutorService that creates new threads as needed and reuses existing threads when possible.

Work stealing

Creates a work-stealing thread pool by using the number of available processors as its target parallelism level.

Table 41-2 Configuration Elements

Element Name Required Expected Type Description

single

no

N/A

Defines a single-thread executor.

fixed

no

N/A

Defines a fixed-thread pool executor.

cached

no

N/A

Defines a cached-thread-pool executor

work-stealing

no

N/A

Defines a work-stealing pool executor.

name

yes

java.lang.String

Defines the logical name of the executor.

thread-count

yes

java.lang.Integer

Defines the thread count for a fixed-thread pool executor.

parallelism

no

java.lang.Integer

Defines the parallelism of a work-stealing-thread pool executor. If not defined, it defaults to the number of processors available on the system.

thread-factory

no

N/A

Defines a java.util.concurrent.ThreadFactory. Used by single, fixed, and cached executors.

instance

yes

java.util.concurrent.ThreadFactory

Defines how the ThreadFactory will be instantiated. For information about the instance element, see instance. This element must be a child of the thread-factory element.

For complete details, see schema.

Configuration Examples

To define executors, the cache-config root element should include the coherence-concurrent NamespaceHandler to recognize the configuration elements.
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
               xmlns:c="class://com.oracle.coherence.concurrent.config.NamespaceHandler"
               xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd class://com.oracle.coherence.concurrent.config.NamespaceHandler concurrent.xsd"> .
.
.
</cache-config>

Note:

Executors defined by the configuration must precede any other elements in the document. Failing to do so, will prevent the document from being validated.
The following examples assume that the xml namespace defined for the NamespaceHandler is c:
<!-- creates a single-threaded executor named <em>Single</em> -->
<c:single>
  <c:name>Single</c:name>
</c:single>
<!-- creates a single-threaded executor named <em>Single</em> with a thread factory-->
<c:single>
  <c:name>SingleTF</c:name>
  <c:thread-factory>
    <c:instance>
      <c:class-name>my.custom.ThreadFactory</c:class-name>
    </c:instance>
  </c:thread-factory>
</c:single>
<!-- creates a fixed-thread executor named <em>Fixed5</em> -->
<c:fixed>
  <c:name>Single</c:name>
  <c:thread-count>5</c:thread-count>
</c:fixed>

Managing Executors

There are various ways to manage and monitor executors. You can use one of the following options:

Managing Executors Over REST

Coherence Management over REST exposes endpoints to query and invoke actions against ExecutorMBean instances.

Table 41-3 REST Endpoints

Description Method Path Produces

View all Executors

GET

/management/coherence/cluster/executors

JSON

View all Executors with matching name

GET

/management/coherence/cluster/executors/{name}

JSON

Reset Executor statistics by name

POST

/management/coherence/cluster/executors/{name}/resetStatistics

JSON

Using CDI

You can inject RemoteExecutors through CDI.

For example:
@Inject
private RemoteExecutor single;    // injects a RemoteExecutor named 'single'.

@Inject
@Name("Fixed5")
private RemoteExecutor fixedPoolRemoteExecutor;    // injects a RemoteExecutor named 'Fixed5'. 

Using Atomics

Coherence Concurrent provides distributed implementations of atomic types, such as AtomicInteger, AtomicLong, and AtomicReference. It also provides local implementations of the same types. The local implementations are just thin wrappers around the existing java.util.concurrent.atomic types, which implement the same interface as their distributed variants, to be interchangeable.
To create instances of atomic types, you should call the appropriate factory method on the Atomics class:
// Creates a local, in-process instance of named 'AtomicInteger' with an implicit initial value of 0.
AtomicInteger localFoo  = Atomics.localAtomicInteger("foo"); 

// Creates a remote, distributed instance of named 'AtomicInteger', distinct from the local instance 'foo', 
// with an implicit initial value of '0'.   
AtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");  

// Creates a remote, distributed instance of named 'AtomicLong', with an initial value of '5'.
AtomicLong    remoteBar = Atomics.remoteAtomicLong("bar", 5L);

Note:

The AtomicInteger and AtomicLong types used in the code above are not types from the java.util.concurrent.atomic package. They are actually interfaces defined within the com.oracle.coherence.concurrent.atomic package that both LocalAtomicXyz and RemoteAtomicXyz classes implement, which are the instances that are actually returned by the above methods.
Therefore, you can rewrite the above code as:
LocalAtomicInteger  localFoo  = Atomics.localAtomicInteger("foo");
RemoteAtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
RemoteAtomicLong    remoteBar = Atomics.remoteAtomicLong("bar", 5L);

However, Oracle strongly recommends that you use interfaces instead of concrete types because interfaces make it easy to switch between local and distributed implementations when necessary.

After the instances are created, you can use them the same way you would use any of the corresponding java.util.concurrent.atomic types:
int  counter1 = remoteFoo.incrementAndGet();
long counter5 = remoteBar.addAndGet(5L);

This section includes the following topics:

Asynchronous Implementations of Atomic Types

The instances of numeric atomic types, such as AtomicInteger and AtomicLong, are frequently used to represent various counters in the application where a client may need to increment the value, but does not necessarily need to know what the new value is.

When working with the local atomics, you can use the same API shown earlier (see Using Atomics) and simply ignore the return value. However, when using distributed atomics that would introduce unnecessary blocking on the client while waiting for the response from the server, which would then simply be discarded. Obviously, this will have a negative impact on both performance and throughput of the atomics.

To reduce the impact of remote calls in those situations, Coherence Concurrent also provides non-blocking, asynchronous implementations of all atomic types it supports.

To obtain a non-blocking instance of any supported atomic type, simply call the async method on the blocking instance of that type:
// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicInteger', with an implicit initial value of 0.
AsyncAtomicInteger asyncFoo = Atomics.remoteAtomicInteger("foo").async();      

// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicLong', with an initial value of 5.
AsyncAtomicLong    asyncBar = Atomics.remoteAtomicLong("bar", 5L).async();
After you create these instances, you can use them the same way you would use any of the corresponding blocking types. The only difference is that the non-blocking instances will simply return a CompletableFuture for the result, and will not block:
CompletableFuture<Integer> futureCounter1 = asyncFoo.incrementAndGet();
CompletableFuture<Long>    futureCounter5 = asyncBar.addAndGet(5L);

Both the blocking and non-blocking instances of any distributed atomic type, with the same name, are backed by the same cluster-side atomic instance state, so they can be used interchangeably.

Using CDI

Atomic types from Coherence Concurrent can also be injected using CDI, which eliminates the need for explicit factory method calls on the Atomics class.

// Injects a local, in-process instance of an 'AtomicInteger' named 'foo', with an implicit initial value of '0'.
@Inject
@Name("foo")
private AtomicInteger localFoo;   

// Injects a remote, distributed instance of an 'AtomicInteger' named 'foo', distinct from 
// the local instance 'foo', with an implicit initial value of '0'.
@Inject
@Remote
@Name("foo")
private AtomicInteger remoteFoo;  

// Injects a remote, distributed instance of non-blocking 'AsyncAtomicLong', with an implicit name of 'asyncBar'.
@Inject
@Remote
private AsyncAtomicLong asyncBar 

After you obtain an instance of an atomic type through a CDI injection, you can use it the same way you would use an instance obtained directly from the Atomics factory class.

Using Locks

Coherence Concurrent provides distributed implementations of Lock and ReadWriteLock interfaces from the java.util.concurrent.locks package, enabling you to implement lock-based concurrency control across cluster members when necessary.

Unlike local JDK implementations, the classes in this package use cluster member/process ID and thread ID to identify lock owner, and store shared lock state within a Coherence NamedMap. However, this also implies that the calls to acquire and release locks are remote, network calls, because they need to update shared state that is likely stored on a different cluster member. This update may impact the performance of lock and unlock operations.

This section includes the following topics:

Using Exclusive Locks

A RemoteLock class provides an implementation of a Lock interface and enables you to ensure that only one thread on one member is running a critical section guarded by the lock at any given time.

To obtain an instance of a RemoteLock, call the Locks.remoteLock factory method:
Lock foo = Locks.remoteLock("foo");
As seen with Atomics, you can obtain a local Lock instance from the Locks class, which will simply return an instance of a standard java.util.concurrent.locks.ReentrantLock, by calling the localLock factory method:
Lock foo = Locks.localLock("foo");
After you create a Lock instance, you can use it as you normally would:
foo.lock();
try {
    // critical section guarded by the exclusive lock `foo`
}
finally {
    foo.unlock();
}

Using Read/Write Locks

A RemoteReadWriteLock class provides an implementation of a ReadWriteLock interface and enables you to ensure that only one thread on one member is running a critical section guarded by the write lock at any given time, while allowing multiple concurrent readers.

To obtain an instance of a RemoteReadWriteLock, call the Locks.remoteReadWriteLock factory method:
ReadWriteLock bar = Locks.remoteReadWriteLock("bar");
As seen with Atomics, you can obtain a local ReadWriteLock instance from the Locks class, which will simply return an instance of a standard java.util.concurrent.locks.ReentrantReadWriteLock, by calling the localReadWriteLock factory method:
ReadWriteLock bar = Locks.localReadWriteLock("bar");
After you create a ReadWriteLock instance, you can use it as you normally would:
bar.writeLock().lock()
try {
    // critical section guarded by the exclusive write lock `bar`
}
finally {
    bar.writeLock().unlock();
}

Or:

bar.readLock().lock()
try {
    // critical section guarded by the shared read lock `bar`
}
finally {
    bar.readLock().unlock();
}

Using CDI

You can also use CDI to inject both the exclusive and read/write lock instances into objects that need them:

// Injects distributed exclusive lock named 'foo' into the 'lock' field.
@Inject
@Remote
@Name("foo")
private Lock lock;           

// Injects distributed read/write lock named 'bar' into the 'bar' field.
@Inject
@Remote
@Name("bar")
private ReadWriteLock bar; 

After you obtain an instance of lock through a CDI injection, you can use it the same way you would use an instance obtained directly from the Locks factory class.

Using Latches and Semaphores

Coherence Concurrent also provides distributed implementations of a CountDownLatch and Semaphore classes from the java.util.concurrent package, enabling you to implement synchronization of execution across multiple Coherence cluster members as easily as you can implement it within a single process using the two JDK classes. It also provides interfaces for those two concurrency primitives, that both remote and local implementations conform to.

As seen with Atomics, the local implementations are nothing more than thin wrappers around the corresponding JDK classes.

This section includes the following topics:

Using the Count Down Latch

The RemoteCoundDownLatch class provides a distributed implementation of a CountDownLatch, and enables you to ensure that the execution of the code on any cluster member that is waiting for the latch proceeds only when the latch reaches zero. Any cluster member can both wait for a latch and count down.

To obtain an instance of RemoteCountDownLatch, call the Latches.remoteCountDownLatch factory method:
// Creates an instance of a 'RemoteCountDownLatch' with the initial count of '5'.
CoundDownLatch foo = Latches.remoteCountDownLatch("foo", 5);
As seen with Atomics and Locks, you can obtain a local CountDownLatch instance from the Latches class by calling the remoteCountDownLatch factory method:
// Creates an instance of a 'LocalCountDownLatch' with the initial count of '10'.
CoundDownLatch foo = Latches.localCountDownLatch("foo", 10);

After you have a RemoteCountDownLatch instance, you can use it as you normally would, by calling the countDown and await methods on it.

Using a Semaphore

The RemoteSemaphore class provides a distributed implementation of a Semaphore, and enables any cluster member to acquire and release permits from the same semaphore instance.

To obtain an instance of RemoteSemaphore, call the Semaphores.remoteSemaphore factory method:
// Creates an instance of a 'RemoteSemaphore' with '5' permits.
Semaphore foo = Semaphores.remoteSemaphore("foo", 5);
As seen with Atomics and Locks, you can obtain a local Semaphore instance from the Semaphores class by calling the localSemaphore factory method:
// Creates an instance of a 'LocalSemaphore' with '0' permits.  
Semaphore foo = Semaphores.localSemaphore("foo");

After you create a Semaphore instance, you can use it as you normally would, by calling the release and acquire methods on it.

Using CDI

You can also use CDI to inject both the CountDownLatch and Semaphore instances into objects that need them:

// Injects an instance of 'LocalCountDownLatch' with the initial count of '5'.
@Inject
@Name("foo")
@Count(5)
private CountDownLatch localLatchFoo;

// Injects an instance of 'RemoteCountDownLatch' with the initial count of '10'.
@Inject
@Name("foo")
@Remote
@Count(10)
private CountDownLatch remoteLatchFoo;          

// Inject an instance of 'LocalSemaphore' with '0' (zero) permits available.
@Inject
@Name("bar")
@Remote
private Semaphore localSemaphoreBar;            

// Inject an instance of 'RemoteSemaphore' with '1' permit available.
@Inject
@Name("bar")
@Remote
@Permits(1)
private Semaphore remoteSemaphoreBar;

After you obtain a latch or a semaphore instance through a CDI injection, you can use the same way as you would use an instance obtained directly from the Latches or Semaphores factory classes.

The @Name annotation is optional in both cases as long as the member name (in the examples above, the field name) can be obtained from the injection point, but is required otherwise (such as when you use a constructor injection).

The @Count annotation specifies the initial latch count, and if omitted, will default to one. The @Permits annotation specifies the number of available permits for a semaphore, and if omitted, will default to zero, which means that the first acquire call will block until another thread releases one or more permits.