41 Implementing Concurrency in a Distributed Environment
The Coherence Concurrent module provides distributed implementations of the
concurrency primitives from the java.util.concurrent
package such as
executors, atomics, locks, semaphores, and latches.
You can implement concurrent applications using the constructs you are already familiar with and also expand the "scope" of concurrency from a single process to potentially hundreds of processes within a Coherence cluster. You can use executors to submit tasks to be executed somewhere in the cluster; you can use locks, latches, and semaphores to synchronize execution across many cluster members; you can use atomics to implement global counters across many processes, and so on.
While these features are extremely powerful and enable you to reuse the knowledge you already have, they may have detrimental effect on scalability and/or performance. Whenever you synchronize execution through locks, latches, or semaphores, you are introducing a potential bottleneck into the architecture. Whenever you use a distributed atomic to implement a global counter, you are turning very simple operations that take mere nanoseconds locally, such as increment and decrement, into fairly expensive network calls that could take milliseconds (and potentially block even longer under heavy load).
So, use these features sparingly. In many cases, there is a better, faster, and a more scalable way to accomplish the same goal using Coherence primitives such as entry processors, aggregators, and events. These primitives are designed to perform and scale well in a distributed environment.
Note:
- To use the concurrency features, Oracle recommends using the Bootstrap API to start the Coherence cluster members. See Using the Bootstrap API.
- Coherence concurrent features do not support, and cannot be configured to use federation. Coherence Federation is asynchronous. Therefore, it would not make sense to federate data that is inherently atomic in nature.
This chapter includes the following sections:
- Using Factory Classes
Each feature (executors, atomics, locks, semaphores, and latches) is backed by one or more Coherence caches, possibly with preconfigured interceptors. All interaction with lower level Coherence primitives is hidden behind various factory classes that allow you to get the instances of the classes you need. - Using Local and Remote Instances
In many cases, the factory classes allow you to get both the local and the remote instances of various constructs. For example,Locks.localLock
will give you an instance of a standardjava.util.concurrent.locks.ReentrantLock
, whileLocks.remoteLock
will return an instance of aRemoteLock
. - Using Serialization
Coherence Concurrent supports both Java serialization and POF out-of-the-box serialization, with Java serialization being the default. - Using Persistence
Coherence Concurrent supports both active and on-demand persistence, but just like in the rest of Coherence it is set toon-demand
by default. - Using the Coherence Concurrent Features
You can use the Coherence Concurrent features after declaring the features as a dependency in thepom.xml
file. - Using Executors
Coherence Concurrent provides a facility to dispatch tasks, either a Runnable or a Callable, to a Coherence cluster for execution. - Using Atomics
Coherence Concurrent provides distributed implementations of atomic types, such asAtomicInteger
,AtomicLong
, andAtomicReference
. It also provides local implementations of the same types. - Using Locks
Coherence Concurrent provides distributed implementations ofLock
andReadWriteLock
interfaces from thejava.util.concurrent.locks
package, enabling you to implement lock-based concurrency control across cluster members when necessary. - Using Latches and Semaphores
Parent topic: Performing Data Grid Operations
Using Factory Classes
Each feature (executors, atomics, locks, semaphores, and latches) is backed by one or more Coherence caches, possibly with preconfigured interceptors. All interaction with lower level Coherence primitives is hidden behind various factory classes that allow you to get the instances of the classes you need.
For example, you will use factory methods within the Atomics
class
to get instances of various atomic types, Locks
to get lock instances,
Latches
and Semaphores
to get latches and
semaphores.
Parent topic: Implementing Concurrency in a Distributed Environment
Using Local and Remote Instances
In many cases, the factory classes allow you to get both the local and the
remote instances of various constructs. For example, Locks.localLock
will
give you an instance of a standard
java.util.concurrent.locks.ReentrantLock
, while Locks.remoteLock
will return an instance of a RemoteLock
.
In cases where JDK does not provide a standard interface, which is the case with
atomics, latches, and semaphores, the interface from the existing JDK class has been
extracted to create a thin wrapper around the corresponding JDK implementation. For
example, Coherence Concurrent provides a Semaphore
interface and a
LocalSemaphore
class that wraps
java.util.concurrent.Semaphore
. The same is true for
CountDownLatch
and all atomic types.
The main advantage of using factory classes to construct both the local and the remote
instances is that it allows you to name local locks the same way you name the remote
locks: calling Locks.localLock("foo")
always returns the same
Lock
instance because the Locks
class internally
caches both the local and the remote instances it creates. In the case of remote locks,
every locally cached remote lock instance is ultimately backed by a shared lock instance
somewhere in the cluster, which is used to synchronize lock state across the
processes.
Parent topic: Implementing Concurrency in a Distributed Environment
Using Serialization
Coherence Concurrent supports both Java serialization and POF out-of-the-box serialization, with Java serialization being the default.
If you want to use POF instead, you have to set the
coherence.concurrent.serializer
system property to
pof
. You should also include the
coherence-concurrent-pof-config.xml
file into your own POF
configuration file to register the built-in Coherence Concurrent types.
Parent topic: Implementing Concurrency in a Distributed Environment
Using Persistence
Coherence Concurrent supports both active and on-demand persistence, but just
like in the rest of Coherence it is set to on-demand
by
default.
To use active persistence, you should set the
coherence.concurrent.persistence.environment
system property to
default-active
, or use another persistence environment that has
active persistence enabled.
Note:
The caches that store lock and semaphore data are configured as transient, and are not persisted when you use active or on-demand persistence.Parent topic: Implementing Concurrency in a Distributed Environment
Using the Coherence Concurrent Features
You can use the Coherence Concurrent features after declaring the features as
a dependency in the pom.xml
file.
To declare, make the following entry in the pom.xml
file.
<dependency>
<groupId>${coherence.groupId}</groupId>
<artifactId>coherence-concurrent</artifactId>
<version>${coherence.version}</version>
</dependency>
Parent topic: Implementing Concurrency in a Distributed Environment
Using Executors
This section includes the following topics:
- Using Executors - Examples
- Configuring Executors
- Managing Executors
- Managing Executors Over REST
- Using CDI
Parent topic: Implementing Concurrency in a Distributed Environment
Using Executors - Examples
By default, each Coherence cluster with the coherence-concurrent
module on the classpath includes a single-threaded executor that may be used to execute
the dispatched tasks.
RemoteExecutor remoteExecutor = RemoteExecutor.getDefault();
Future<Void> result = remoteExecutor.submit(() -> System.out.println("Executed"));
result.get(); // block until completion
Fixed5
, the code will
be:RemoteExecutor remoteExecutor = RemoteExecutor.get("Fixed5");
RemoteExecutor
will throw the following
exception:RejectedExecutionException
Each RemoteExecutor
instance may hold local resources that should be
released when the RemoteExecutor
is no longer required. Similar to an
ExecutorService
, a RemoteExecutor
has methods to
shut the executor down. Calling these methods has no impact on the executors registered
within the cluster.
Parent topic: Using Executors
Configuring Executors
Several executor types are available for configuration.
Table 41-1 Types of Executors
ExecutorService Type | Description |
---|---|
Single thread |
Creates an |
Fixed thread |
Creates an |
Cached |
Creates an |
Work stealing |
Creates a work-stealing thread pool by using the number of available processors as its target parallelism level. |
Table 41-2 Configuration Elements
Element Name | Required | Expected Type | Description |
---|---|---|---|
single |
no |
N/A |
Defines a single-thread executor. |
fixed |
no |
N/A |
Defines a fixed-thread pool executor. |
cached |
no |
N/A |
Defines a cached-thread-pool executor |
work-stealing |
no |
N/A |
Defines a work-stealing pool executor. |
name |
yes |
java.lang.String |
Defines the logical |
thread-count |
yes |
java.lang.Integer |
Defines the thread count for a fixed-thread pool executor. |
parallelism |
no |
java.lang.Integer |
Defines the parallelism of a work-stealing-thread pool executor. If not defined, it defaults to the number of processors available on the system. |
thread-factory |
no |
N/A |
Defines a |
instance |
yes |
java.util.concurrent.ThreadFactory |
Defines how the |
For complete details, see schema.
Configuration Examples
cache-config
root element should include
the coherence-concurrent
NamespaceHandler to recognize the
configuration
elements.<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xmlns:c="class://com.oracle.coherence.concurrent.config.NamespaceHandler"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd class://com.oracle.coherence.concurrent.config.NamespaceHandler concurrent.xsd"> .
.
.
</cache-config>
Note:
Executors defined by the configuration must precede any other elements in the document. Failing to do so, will prevent the document from being validated.xml
namespace
defined for the NamespaceHandler is
c
:<!-- creates a single-threaded executor named <em>Single</em> -->
<c:single>
<c:name>Single</c:name>
</c:single>
<!-- creates a single-threaded executor named <em>Single</em> with a thread factory-->
<c:single>
<c:name>SingleTF</c:name>
<c:thread-factory>
<c:instance>
<c:class-name>my.custom.ThreadFactory</c:class-name>
</c:instance>
</c:thread-factory>
</c:single>
<!-- creates a fixed-thread executor named <em>Fixed5</em> -->
<c:fixed>
<c:name>Single</c:name>
<c:thread-count>5</c:thread-count>
</c:fixed>
Parent topic: Using Executors
Managing Executors
There are various ways to manage and monitor executors. You can use one of the following options:
- MBeans: See ExecutorMBean.
- Reporter: See Understanding the Executor Report.
- Coherence VisualVM plug-in: See coherence-visualvm.
- REST API: See Managing Executors Over REST.
- Distributed Tracing: See Distributed Tracing.
Parent topic: Using Executors
Managing Executors Over REST
Coherence Management over REST exposes endpoints to query and invoke actions against
ExecutorMBean
instances.
Table 41-3 REST Endpoints
Description | Method | Path | Produces |
---|---|---|---|
View all Executors |
|
|
JSON |
View all Executors with matching name |
|
|
JSON |
Reset Executor statistics by name |
|
|
JSON |
Parent topic: Using Executors
Using CDI
You can inject RemoteExecutors through CDI.
@Inject
private RemoteExecutor single; // injects a RemoteExecutor named 'single'.
@Inject
@Name("Fixed5")
private RemoteExecutor fixedPoolRemoteExecutor; // injects a RemoteExecutor named 'Fixed5'.
Parent topic: Using Executors
Using Atomics
AtomicInteger
, AtomicLong
, and
AtomicReference
. It also provides local implementations of the same
types. The local implementations are just thin wrappers around the existing
java.util.concurrent.atomic
types, which implement the same interface
as their distributed variants, to be interchangeable.
Atomics
class:// Creates a local, in-process instance of named 'AtomicInteger' with an implicit initial value of 0.
AtomicInteger localFoo = Atomics.localAtomicInteger("foo");
// Creates a remote, distributed instance of named 'AtomicInteger', distinct from the local instance 'foo',
// with an implicit initial value of '0'.
AtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
// Creates a remote, distributed instance of named 'AtomicLong', with an initial value of '5'.
AtomicLong remoteBar = Atomics.remoteAtomicLong("bar", 5L);
Note:
TheAtomicInteger
and AtomicLong
types used in the
code above are not types from the java.util.concurrent.atomic
package.
They are actually interfaces defined within the
com.oracle.coherence.concurrent.atomic
package that both
LocalAtomicXyz
and RemoteAtomicXyz
classes
implement, which are the instances that are actually returned by the above
methods.
LocalAtomicInteger localFoo = Atomics.localAtomicInteger("foo");
RemoteAtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
RemoteAtomicLong remoteBar = Atomics.remoteAtomicLong("bar", 5L);
However, Oracle strongly recommends that you use interfaces instead of concrete types because interfaces make it easy to switch between local and distributed implementations when necessary.
java.util.concurrent.atomic
types:int counter1 = remoteFoo.incrementAndGet();
long counter5 = remoteBar.addAndGet(5L);
This section includes the following topics:
Parent topic: Implementing Concurrency in a Distributed Environment
Asynchronous Implementations of Atomic Types
The instances of numeric atomic types, such as AtomicInteger
and
AtomicLong
, are frequently used to represent various counters in
the application where a client may need to increment the value, but does not necessarily
need to know what the new value is.
When working with the local atomics, you can use the same API shown earlier (see Using Atomics) and simply ignore the return value. However, when using distributed atomics that would introduce unnecessary blocking on the client while waiting for the response from the server, which would then simply be discarded. Obviously, this will have a negative impact on both performance and throughput of the atomics.
To reduce the impact of remote calls in those situations, Coherence Concurrent also provides non-blocking, asynchronous implementations of all atomic types it supports.
async
method on the blocking instance of that
type:// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicInteger', with an implicit initial value of 0.
AsyncAtomicInteger asyncFoo = Atomics.remoteAtomicInteger("foo").async();
// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicLong', with an initial value of 5.
AsyncAtomicLong asyncBar = Atomics.remoteAtomicLong("bar", 5L).async();
CompletableFuture
for the result, and will not
block:CompletableFuture<Integer> futureCounter1 = asyncFoo.incrementAndGet();
CompletableFuture<Long> futureCounter5 = asyncBar.addAndGet(5L);
Both the blocking and non-blocking instances of any distributed atomic type, with the same name, are backed by the same cluster-side atomic instance state, so they can be used interchangeably.
Parent topic: Using Atomics
Using CDI
Atomic types from Coherence Concurrent can also be injected using CDI, which eliminates the need for explicit factory method calls on the Atomics class.
// Injects a local, in-process instance of an 'AtomicInteger' named 'foo', with an implicit initial value of '0'.
@Inject
@Name("foo")
private AtomicInteger localFoo;
// Injects a remote, distributed instance of an 'AtomicInteger' named 'foo', distinct from
// the local instance 'foo', with an implicit initial value of '0'.
@Inject
@Remote
@Name("foo")
private AtomicInteger remoteFoo;
// Injects a remote, distributed instance of non-blocking 'AsyncAtomicLong', with an implicit name of 'asyncBar'.
@Inject
@Remote
private AsyncAtomicLong asyncBar
After you obtain an instance of an atomic type through a CDI injection, you can use it
the same way you would use an instance obtained directly from the
Atomics
factory class.
Parent topic: Using Atomics
Using Locks
Coherence Concurrent provides distributed implementations of
Lock
and ReadWriteLock
interfaces from the
java.util.concurrent.locks
package, enabling you to implement
lock-based concurrency control across cluster members when necessary.
Unlike local JDK implementations, the classes in this package use cluster
member/process ID and thread ID to identify lock owner, and store shared lock state
within a Coherence NamedMap
. However, this also implies that the calls
to acquire and release locks are remote, network calls, because they need to update
shared state that is likely stored on a different cluster member. This update may impact
the performance of lock
and unlock
operations.
This section includes the following topics:
Parent topic: Implementing Concurrency in a Distributed Environment
Using Exclusive Locks
A RemoteLock
class provides an implementation of a
Lock
interface and enables you to ensure that only one thread on
one member is running a critical section guarded by the lock at any given time.
RemoteLock
, call the Locks.remoteLock
factory
method:Lock foo = Locks.remoteLock("foo");
Atomics
, you can obtain a local Lock
instance from the Locks
class, which will simply return an instance of
a standard java.util.concurrent.locks.ReentrantLock
, by calling the
localLock
factory
method:Lock foo = Locks.localLock("foo");
Lock
instance, you can use it as you normally
would:foo.lock();
try {
// critical section guarded by the exclusive lock `foo`
}
finally {
foo.unlock();
}
Parent topic: Using Locks
Using Read/Write Locks
A RemoteReadWriteLock
class provides an implementation of a
ReadWriteLock
interface and enables you to ensure that only one
thread on one member is running a critical section guarded by the write
lock at any given time, while allowing multiple concurrent readers.
RemoteReadWriteLock
, call the
Locks.remoteReadWriteLock
factory
method:ReadWriteLock bar = Locks.remoteReadWriteLock("bar");
Atomics
, you can obtain a local
ReadWriteLock
instance from the Locks
class, which
will simply return an instance of a standard
java.util.concurrent.locks.ReentrantReadWriteLock
, by calling the
localReadWriteLock
factory
method:ReadWriteLock bar = Locks.localReadWriteLock("bar");
ReadWriteLock
instance, you can use it as you
normally
would:bar.writeLock().lock()
try {
// critical section guarded by the exclusive write lock `bar`
}
finally {
bar.writeLock().unlock();
}
Or:
bar.readLock().lock()
try {
// critical section guarded by the shared read lock `bar`
}
finally {
bar.readLock().unlock();
}
Parent topic: Using Locks
Using CDI
You can also use CDI to inject both the exclusive and read/write lock instances into objects that need them:
// Injects distributed exclusive lock named 'foo' into the 'lock' field.
@Inject
@Remote
@Name("foo")
private Lock lock;
// Injects distributed read/write lock named 'bar' into the 'bar' field.
@Inject
@Remote
@Name("bar")
private ReadWriteLock bar;
After you obtain an instance of lock through a CDI injection, you can use it the same way
you would use an instance obtained directly from the Locks
factory
class.
Parent topic: Using Locks
Using Latches and Semaphores
Coherence Concurrent also provides distributed implementations of a
CountDownLatch
and Semaphore
classes from the
java.util.concurrent
package, enabling you to implement
synchronization of execution across multiple Coherence cluster members as easily as you
can implement it within a single process using the two JDK classes. It also provides
interfaces for those two concurrency primitives, that both remote and local
implementations conform to.
As seen with Atomics
, the local implementations are nothing more than
thin wrappers around the corresponding JDK classes.
This section includes the following topics:
Parent topic: Implementing Concurrency in a Distributed Environment
Using the Count Down Latch
The RemoteCoundDownLatch
class provides a distributed implementation
of a CountDownLatch
, and enables you to ensure that the execution of
the code on any cluster member that is waiting for the latch proceeds only when the
latch reaches zero. Any cluster member can both wait for a latch and count down.
RemoteCountDownLatch
, call the
Latches.remoteCountDownLatch
factory
method:// Creates an instance of a 'RemoteCountDownLatch' with the initial count of '5'.
CoundDownLatch foo = Latches.remoteCountDownLatch("foo", 5);
Atomics
and Locks
, you can obtain a local
CountDownLatch
instance from the Latches
class by
calling the remoteCountDownLatch
factory
method:// Creates an instance of a 'LocalCountDownLatch' with the initial count of '10'.
CoundDownLatch foo = Latches.localCountDownLatch("foo", 10);
After you have a RemoteCountDownLatch
instance, you can use it as you
normally would, by calling the countDown
and await
methods on it.
Parent topic: Using Latches and Semaphores
Using a Semaphore
The RemoteSemaphore
class provides a distributed implementation of a
Semaphore
, and enables any cluster member to acquire and release
permits from the same semaphore
instance.
RemoteSemaphore
, call the
Semaphores.remoteSemaphore
factory
method:// Creates an instance of a 'RemoteSemaphore' with '5' permits.
Semaphore foo = Semaphores.remoteSemaphore("foo", 5);
Atomics
and Locks
, you can obtain a local
Semaphore
instance from the Semaphores
class by
calling the localSemaphore
factory
method:// Creates an instance of a 'LocalSemaphore' with '0' permits.
Semaphore foo = Semaphores.localSemaphore("foo");
After you create a Semaphore
instance, you can use it as you normally
would, by calling the release
and acquire
methods on
it.
Parent topic: Using Latches and Semaphores
Using CDI
You can also use CDI to inject both the CountDownLatch
and
Semaphore
instances into objects that need them:
// Injects an instance of 'LocalCountDownLatch' with the initial count of '5'.
@Inject
@Name("foo")
@Count(5)
private CountDownLatch localLatchFoo;
// Injects an instance of 'RemoteCountDownLatch' with the initial count of '10'.
@Inject
@Name("foo")
@Remote
@Count(10)
private CountDownLatch remoteLatchFoo;
// Inject an instance of 'LocalSemaphore' with '0' (zero) permits available.
@Inject
@Name("bar")
@Remote
private Semaphore localSemaphoreBar;
// Inject an instance of 'RemoteSemaphore' with '1' permit available.
@Inject
@Name("bar")
@Remote
@Permits(1)
private Semaphore remoteSemaphoreBar;
After you obtain a latch
or a semaphore
instance
through a CDI injection, you can use the same way as you would use an instance obtained
directly from the Latches
or Semaphores
factory
classes.
The @Name
annotation is optional in both cases as long as the member
name (in the examples above, the field name) can be obtained from the injection point,
but is required otherwise (such as when you use a constructor injection).
The @Count
annotation specifies the initial latch count, and if omitted,
will default to one. The @Permits
annotation specifies the number of
available permits for a semaphore, and if omitted, will default to zero, which means
that the first acquire
call will block until another thread releases
one or more permits.
Parent topic: Using Latches and Semaphores