41 分散環境での同時実行性の実装

Coherence同時実行モジュールは、エグゼキュータ、アトミック、ロック、セマフォ、ラッチなどのjava.util.concurrentパッケージの同時実行性プリミティブを分散実装します。

すでに使い慣れた構成を使用して同時実行アプリケーションを実装でき、さらに、同時実行性のスコープを単一のプロセスからCoherenceクラスタ内の数百のプロセスに拡張することもできます。エグゼキュータを使用して、クラスタ内のどこかで実行されるタスクを送信できます。ロック、ラッチおよびセマフォを使用すると、多くのクラスタ・メンバー間で実行を同期できます。また、アトミックを使用すると、多くのプロセスにわたってグローバル・カウンタを実装できます。

これらの機能は非常に強力で、すでにある知識を再利用できますが、スケーラビリティやパフォーマンスに悪影響を及ぼす可能性があります。ロック、ラッチまたはセマフォを介して実行を同期するたびに、アーキテクチャにボトルネックが発生する可能性があります。分散アトミックを使用してグローバル・カウンタを実装する場合、増分や減分など、ローカルではナノ秒しかかからない非常に単純な操作が、ミリ秒かかる(負荷が高い場合は、さらに長くブロックされる可能性がある)かなり高価なネットワーク呼出しに変わります。

そのため、これらの機能は慎重に使用してください。多くの場合、エントリ・プロセッサ、アグリゲータ、イベントなどのCoherenceプリミティブを使用して同じ目標を達成するための、より適切で高速かつスケーラブルな方法があります。これらのプリミティブは、分散環境で適切に実行およびスケーリングするように設計されています。

ノート:

  • 同時実行機能を使用するには、Oracleでは、ブートストラップAPIを使用してCoherenceクラスタ・メンバーを起動することをお薦めします。ブートストラップAPIの使用を参照してください。
  • Coherence同時実行機能は、フェデレーションをサポートしておらず、これを使用するように構成できません。Coherenceフェデレーションは非同期です。したがって、本来アトミックな性質を持つデータをフェデレートすることは意味がありません。

この章の内容は次のとおりです。

ファクトリ・クラスの使用

各機能(エグゼキュータ、アトミック、ロック、セマフォおよびラッチ)は、1つ以上のCoherenceキャッシュ(場合によっては事前構成済のインターセプタ)によってバックアップされます。下位レベルのCoherenceプリミティブとのすべてのやりとりは、必要なクラスのインスタンスを取得できる様々なファクトリ・クラスの背後に隠されています。

たとえば、Atomicsクラス内のファクトリ・メソッドを使用して様々なアトミック型のインスタンスを取得し、Locksを使用してロック・インスタンスを取得し、LatchesおよびSemaphoresを使用してラッチおよびセマフォを取得します。

ローカルおよびリモート・インスタンスの使用

多くの場合、ファクトリ・クラスを使用すると、様々な構造のローカル・インスタンスとリモート・インスタンスの両方を取得できます。たとえば、Locks.localLockは標準 java.util.concurrent.locks.ReentrantLockのインスタンスを提供し、Locks.remoteLockRemoteLockのインスタンスを返します。

JDKが標準インタフェースを提供しない場合(アトミック、ラッチおよびセマフォの場合)、既存のJDKクラスのインタフェースが抽出され、対応するJDK実装を囲むThinラッパーが作成されます。たとえば、Coherence同時実行には、java.util.concurrent.SemaphoreをラップするSemaphoreインタフェースおよびLocalSemaphoreクラスが用意されています。CountDownLatchおよびすべてのアトミック型についても同様です。

ファクトリ・クラスを使用してローカル・インスタンスとリモート・インスタンスの両方を構築する主な利点は、リモート・ロックと同じ方法でローカル・ロックに名前を付けられることです。Locks.localLock("foo")をコールすると、常に同じLockインスタンスが返されます。これは、Locksクラスによって作成されるローカル・インスタンスとリモート・インスタンスの両方が内部的にキャッシュされるためです。リモート・ロックの場合、ローカルにキャッシュされたすべてのリモート・ロック・インスタンスは、最終的にクラスタ内のどこかで共有ロック・インスタンスによってバックアップされます。これは、プロセス間でロック状態を同期するために使用されます。

シリアライズの使用

Coherence同時実行は、JavaシリアライズとPOFの即時利用可能なシリアライズの両方をサポートしており、Javaシリアライズがデフォルトです。

POFを使用する場合は、coherence.concurrent.serializerシステム・プロパティをpofに設定する必要があります。組込みのCoherence同時実行タイプを登録するには、独自のPOF構成ファイルにもcoherence-concurrent-pof-config.xmlファイルを含める必要があります。

永続性の使用

Coherence同時実行では、アクティブとオンデマンドの両方の永続性がサポートされますが、Coherenceの他の部分と同様に、デフォルトでon-demandに設定されます。

アクティブな永続性を使用するには、coherence.concurrent.persistence.environmentシステム・プロパティをdefault-activeに設定するか、アクティブな永続性が有効になっている別の永続性環境を使用する必要があります。

ノート:

ロックおよびセマフォ・データを格納するキャッシュは一時的なものとして構成され、アクティブまたはオンデマンドの永続性を使用する場合は永続化されません。

Coherence同時実行機能の使用

Coherence同時実行機能は、pom.xmlファイルで機能を依存関係として宣言した後に使用できます。

宣言するには、pom.xmlファイルに次のエントリを作成します。

<dependency>
    <groupId>${coherence.groupId}</groupId>
    <artifactId>coherence-concurrent</artifactId>
    <version>${coherence.version}</version>
</dependency>

エグゼキュータの使用

Coherence同時実行は、RunnableまたはCallableのいずれかのタスクをCoherenceクラスタにディスパッチして実行する機能を提供します。送信済タスクを実際に実行するエグゼキュータは、キャッシュ構成リソース内に1つ以上の名前付きエグゼキュータを定義することで、各クラスタ・メンバーに構成されます。

この項には次のトピックが含まれます:

エグゼキュータの使用 - 例

デフォルトでは、クラスパスにcoherence-concurrentモジュールがある各Coherenceクラスタには、ディスパッチされたタスクの実行に使用できる単一スレッド・エグゼキュータが含まれます。

たとえば:
RemoteExecutor remoteExecutor = RemoteExecutor.getDefault();

Future<Void> result = remoteExecutor.submit(() -> System.out.println("Executed"));

result.get(); // block until completion
たとえば、Fixed5という名前のエグゼキュータを構成するには、コードは次のようになります。
RemoteExecutor remoteExecutor = RemoteExecutor.get("Fixed5");
指定された名前のエグゼキュータが構成されない場合、RemoteExecutorは次の例外をスローします。
RejectedExecutionException

RemoteExecutorインスタンスには、RemoteExecutorが不要になったときに解放する必要があるローカル・リソースを保持できます。ExecutorServiceと同様に、RemoteExecutorにはエグゼキュータを停止するメソッドがあります。これらのメソッドをコールしても、クラスタ内に登録されたエグゼキュータには影響しません。

エグゼキュータの構成

構成には、いくつかのエグゼキュータ・タイプを使用できます。

表41-1 エグゼキュータのタイプ

ExecutorServiceタイプ 説明

単一スレッド

単一スレッドを持つExecutorServiceを作成します。

固定スレッド

固定数のスレッドを持つExecutorServiceを作成します。

キャッシュ

必要に応じて新しいスレッドを作成し、可能な場合は既存のスレッドを再利用するExecutorServiceを作成します。

ワーク・スティーリング

使用可能なプロセッサの数をターゲットの並列性レベルとして使用し、ワーク・スティーリング・スレッド・プールを作成します。

表41-2 構成要素

要素名 必須 予想される型 説明

single

いいえ

該当なし

単一スレッド・エグゼキュータを定義します。

fixed

いいえ

該当なし

固定スレッド・プール・エグゼキュータを定義します。

cached

いいえ

該当なし

キャッシュ済スレッド・プール・エグゼキュータを定義します

work-stealing

いいえ

該当なし

ワーク・スティーリング・プール・エグゼキュータを定義します。

name

はい

java.lang.String

エグゼキュータの論理nameを定義します。

thread-count

はい

java.lang.Integer

固定スレッド・プール・エグゼキュータのスレッド数を定義します。

parallelism

いいえ

java.lang.Integer

ワーク・スティーリング・スレッド・プール・エグゼキュータの並列性を定義します。定義しない場合、デフォルトでシステムで使用可能なプロセッサの数になります。

thread-factory

いいえ

該当なし

java.util.concurrent.ThreadFactoryを定義します。singlefixedおよびcachedエグゼキュータによって使用されます。

instance

はい

java.util.concurrent.ThreadFactory

ThreadFactoryのインスタンス化方法を定義します。インスタンス要素の詳細は、instanceを参照してください。この要素は、thread-factory要素の子である必要があります。

詳細は、スキーマを参照してください。

構成例

エグゼキュータを定義するには、構成要素を認識するためにcache-configルート要素にcoherence-concurrent NamespaceHandlerを含める必要があります。
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
               xmlns:c="class://com.oracle.coherence.concurrent.config.NamespaceHandler"
               xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd class://com.oracle.coherence.concurrent.config.NamespaceHandler concurrent.xsd"> .
.
.
</cache-config>

ノート:

構成で定義されたエグゼキュータは、ドキュメント内の他の要素より前にする必要があります。これを行わないと、ドキュメントを検証できなくなります。
次の例では、NamespaceHandlerに定義されているxmlネームスペースがcであると想定しています。
<!-- creates a single-threaded executor named <em>Single</em> -->
<c:single>
  <c:name>Single</c:name>
</c:single>
<!-- creates a single-threaded executor named <em>Single</em> with a thread factory-->
<c:single>
  <c:name>SingleTF</c:name>
  <c:thread-factory>
    <c:instance>
      <c:class-name>my.custom.ThreadFactory</c:class-name>
    </c:instance>
  </c:thread-factory>
</c:single>
<!-- creates a fixed-thread executor named <em>Fixed5</em> -->
<c:fixed>
  <c:name>Single</c:name>
  <c:thread-count>5</c:thread-count>
</c:fixed>

エグゼキュータの管理

エグゼキュータを管理およびモニターするには様々な方法があります。次のいずれかのオプションを使用できます。

RESTを介したエグゼキュータの管理

RESTを介したCoherence管理では、エンドポイントを公開して、ExecutorMBeanインスタンスに対するアクションを問い合せて呼び出します。

表41-3 RESTエンドポイント

説明 メソッド パス 次のものを生成

すべてのエグゼキュータを表示

GET

/management/coherence/cluster/executors

JSON

名前が一致するすべてのエグゼキュータを表示

GET

/management/coherence/cluster/executors/{name}

JSON

エグゼキュータ統計を名前でリセット

POST

/management/coherence/cluster/executors/{name}/resetStatistics

JSON

CDIの使用

CDIを介してRemoteExecutorsを注入できます。

たとえば:
@Inject
private RemoteExecutor single;    // injects a RemoteExecutor named 'single'.

@Inject
@Name("Fixed5")
private RemoteExecutor fixedPoolRemoteExecutor;    // injects a RemoteExecutor named 'Fixed5'. 

アトミックの使用

Coherence同時実行では、AtomicIntegerAtomicLongAtomicReferenceなどのアトミック型の分散実装が提供されます。また、同じタイプのローカル実装も提供されます。ローカル実装は、分散型バリアントと同じインタフェースを実装する既存のjava.util.concurrent.atomic型を囲む単なるThinラッパーで、置換え可能です。
アトミック型のインスタンスを作成するには、Atomicsクラスで適切なファクトリ・メソッドをコールする必要があります。
// Creates a local, in-process instance of named 'AtomicInteger' with an implicit initial value of 0.
AtomicInteger localFoo  = Atomics.localAtomicInteger("foo"); 

// Creates a remote, distributed instance of named 'AtomicInteger', distinct from the local instance 'foo', 
// with an implicit initial value of '0'.   
AtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");  

// Creates a remote, distributed instance of named 'AtomicLong', with an initial value of '5'.
AtomicLong    remoteBar = Atomics.remoteAtomicLong("bar", 5L);

ノート:

前述のコードで使用されているAtomicIntegerおよびAtomicLong型は、java.util.concurrent.atomicパッケージの型ではありません。実際には、LocalAtomicXyzクラスとRemoteAtomicXyzクラスの両方が実装するcom.oracle.coherence.concurrent.atomicパッケージ内に定義されているインタフェースであり、これは前述のメソッドによって実際に返されるインスタンスです。
したがって、前述のコードを次のように書き換えることができます。
LocalAtomicInteger  localFoo  = Atomics.localAtomicInteger("foo");
RemoteAtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
RemoteAtomicLong    remoteBar = Atomics.remoteAtomicLong("bar", 5L);

ただし、インタフェースを使用すると、必要に応じてローカル実装と分散実装を簡単に切り替えることができるため、具体的な型のかわりにインタフェースを使用することを強くお薦めします。

インスタンスの作成後は、対応するjava.util.concurrent.atomic型のいずれかを使用する場合と同じ方法で使用できます。
int  counter1 = remoteFoo.incrementAndGet();
long counter5 = remoteBar.addAndGet(5L);

この項には次のトピックが含まれます:

アトミック型の非同期実装

AtomicIntegerAtomicLongなどの数値アトミック型のインスタンスは、クライアントがその値を増分する必要があるが、必ずしも新しい値が何であるかを知る必要がないアプリケーション内の様々なカウンタを表すために頻繁に使用されます。

ローカル・アトミックを操作する場合は、前に示したものと同じAPIを使用し(アトミックの使用を参照)、戻り値は無視されるだけです。ただし、サーバーからのレスポンスを待機している間にクライアントで不要なブロッキングが発生する分散アトミックを使用すると、破棄されるだけです。明らかに、これはアトミックのパフォーマンスとスループットの両方に悪影響を及ぼします。

このような状況でのリモート・コールの影響を軽減するために、Coherence同時実行では、サポートするすべてのアトミック型の非ブロッキングの非同期実装も提供されます。

サポートされているアトミック型の非ブロッキング・インスタンスを取得するには、その型のブロッキング・インスタンスでasyncメソッドをコールするだけです。
// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicInteger', with an implicit initial value of 0.
AsyncAtomicInteger asyncFoo = Atomics.remoteAtomicInteger("foo").async();      

// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicLong', with an initial value of 5.
AsyncAtomicLong    asyncBar = Atomics.remoteAtomicLong("bar", 5L).async();
これらのインスタンスを作成した後は、対応するブロッキング・タイプのいずれかを使用する場合と同じ方法で使用できます。唯一の違いは、非ブロッキング・インスタンスは結果に対してCompletableFutureを返すだけで、ブロックしないことです。
CompletableFuture<Integer> futureCounter1 = asyncFoo.incrementAndGet();
CompletableFuture<Long>    futureCounter5 = asyncBar.addAndGet(5L);

同じ名前の分散アトミック型のブロッキング・インスタンスと非ブロッキング・インスタンスは、両方とも同じクラスタ側のアトミック・インスタンス状態によってバックアップされるため、これらを同じ意味で使用できます。

CDIの使用

また、Coherence同時実行のアトミック型はCDIを使用して注入できるため、Atomicsクラスでの明示的なファクトリ・メソッド・コールが不要になります。

// Injects a local, in-process instance of an 'AtomicInteger' named 'foo', with an implicit initial value of '0'.
@Inject
@Name("foo")
private AtomicInteger localFoo;   

// Injects a remote, distributed instance of an 'AtomicInteger' named 'foo', distinct from 
// the local instance 'foo', with an implicit initial value of '0'.
@Inject
@Remote
@Name("foo")
private AtomicInteger remoteFoo;  

// Injects a remote, distributed instance of non-blocking 'AsyncAtomicLong', with an implicit name of 'asyncBar'.
@Inject
@Remote
private AsyncAtomicLong asyncBar 

CDI注入を介してアトミック型のインスタンスを取得した後は、Atomicsファクトリ・クラスから直接取得したインスタンスと同じ方法で使用できます。

ロックの使用

Coherence同時実行では、java.util.concurrent.locksパッケージのLockおよびReadWriteLockインタフェースの分散実装が提供されるため、必要に応じてクラスタ・メンバー間でロックベースの並行処理制御を実装できます。

ローカルJDK実装とは異なり、このパッケージのクラスは、クラスタ・メンバー/プロセスIDおよびスレッドIDを使用してロック所有者を識別し、共有ロック状態をCoherence NamedMap内に格納します。ただし、これは、ロックを取得および解放する呼出しは、別のクラスタ・メンバーに格納される可能性がある共有状態を更新する必要があるため、リモートのネットワーク呼出しであるということも意味します。この更新は、lockおよびunlock操作のパフォーマンスに影響する可能性があります。

この項には次のトピックが含まれます:

排他的ロックの使用

RemoteLockクラスは、Lockインタフェースの実装を提供し、1つのメンバー上の1つのスレッドのみが、いつでもロックによって保護されるクリティカル・セクションを実行していることを確認できます。

RemoteLockのインスタンスを取得するには、Locks.remoteLock factoryメソッドをコールします。
Lock foo = Locks.remoteLock("foo");
Atomicsに示すように、LocksクラスからローカルLockインスタンスを取得できます。これは、localLockファクトリ・メソッドをコールして、標準のjava.util.concurrent.locks.ReentrantLockのインスタンスを返すだけです。
Lock foo = Locks.localLock("foo");
Lockインスタンスを作成した後、通常どおりに使用できます。
foo.lock();
try {
    // critical section guarded by the exclusive lock `foo`
}
finally {
    foo.unlock();
}

読取り/書込みロックの使用

RemoteReadWriteLockクラスは、ReadWriteLockインタフェースの実装を提供し、複数の同時リーダーを許可しながら、1つのメンバー上の1つのスレッドのみが、いつでもwriteロックによって保護されるクリティカル・セクションを実行していることを確認できます。

RemoteReadWriteLockのインスタンスを取得するには、Locks.remoteReadWriteLockファクトリ・メソッドをコールします。
ReadWriteLock bar = Locks.remoteReadWriteLock("bar");
Atomicsに示すように、LocksクラスからローカルReadWriteLockインスタンスを取得できます。これは、localReadWriteLockファクトリ・メソッドをコールして、標準のjava.util.concurrent.locks.ReentrantReadWriteLockのインスタンスを返すだけです。
ReadWriteLock bar = Locks.localReadWriteLock("bar");
ReadWriteLockインスタンスを作成した後、通常どおりに使用できます。
bar.writeLock().lock()
try {
    // critical section guarded by the exclusive write lock `bar`
}
finally {
    bar.writeLock().unlock();
}

または:

bar.readLock().lock()
try {
    // critical section guarded by the shared read lock `bar`
}
finally {
    bar.readLock().unlock();
}

CDIの使用

CDIを使用して、排他的と読取り/書込みの両方のロック・インスタンスを、それらを必要とするオブジェクトに注入することもできます。

// Injects distributed exclusive lock named 'foo' into the 'lock' field.
@Inject
@Remote
@Name("foo")
private Lock lock;           

// Injects distributed read/write lock named 'bar' into the 'bar' field.
@Inject
@Remote
@Name("bar")
private ReadWriteLock bar; 

CDI注入を介してロックのインスタンスを取得した後は、Locksファクトリ・クラスから直接取得したインスタンスと同じ方法で使用できます。

ラッチおよびセマフォの使用

Coherence同時実行では、java.util.concurrentパッケージのCountDownLatchおよびSemaphoreクラスの分散実装も提供されるため、複数のCoherenceクラスタ・メンバー間での実行の同期を、2つのJDKクラスを使用して単一のプロセス内で実装するのと同じくらい簡単に実装できます。また、リモート実装とローカル実装の両方が準拠する、これら2つの同時実行性プリミティブのインタフェースも提供されます。

Atomicsに示すように、ローカル実装は、対応するJDKクラスを囲むThinラッパーにすぎません。

この項には次のトピックが含まれます:

カウントダウン・ラッチの使用

RemoteCoundDownLatchクラスは、CountDownLatchの分散実装を提供し、ラッチを待機している任意のクラスタ・メンバー上のコードの実行が、ラッチがゼロに達したときにのみ続行されるようにできます。すべてのクラスタ・メンバーは、ラッチを待機してカウントダウンできます。

RemoteCountDownLatchのインスタンスを取得するには、Latches.remoteCountDownLatchファクトリ・メソッドをコールします。
// Creates an instance of a 'RemoteCountDownLatch' with the initial count of '5'.
CoundDownLatch foo = Latches.remoteCountDownLatch("foo", 5);
AtomicsおよびLocksに示すように、remoteCountDownLatchファクトリ・メソッドをコールすることで、LatchesクラスからローカルCountDownLatchインスタンスを取得できます。
// Creates an instance of a 'LocalCountDownLatch' with the initial count of '10'.
CoundDownLatch foo = Latches.localCountDownLatch("foo", 10);

RemoteCountDownLatchインスタンスを作成したら、countDownおよびawaitメソッドをコールすることで、通常どおりに使用できます。

セマフォの使用

RemoteSemaphoreクラスは、Semaphoreの分散実装を提供し、すべてのクラスタ・メンバーが同じsemaphoreインスタンスから許可を取得および解放できるようにします。

RemoteSemaphoreのインスタンスを取得するには、Semaphores.remoteSemaphoreファクトリ・メソッドをコールします。
// Creates an instance of a 'RemoteSemaphore' with '5' permits.
Semaphore foo = Semaphores.remoteSemaphore("foo", 5);
AtomicsおよびLocksに示すように、localSemaphoreファクトリ・メソッドをコールすることで、SemaphoresクラスからローカルSemaphoreインスタンスを取得できます。
// Creates an instance of a 'LocalSemaphore' with '0' permits.  
Semaphore foo = Semaphores.localSemaphore("foo");

Semaphoreインスタンスを作成したら、releaseおよびacquireメソッドをコールすることで、通常どおりに使用できます。

CDIの使用

CDIを使用して、CountDownLatchSemaphoreの両方のインスタンスを、それらを必要とするオブジェクトに注入することもできます。

// Injects an instance of 'LocalCountDownLatch' with the initial count of '5'.
@Inject
@Name("foo")
@Count(5)
private CountDownLatch localLatchFoo;

// Injects an instance of 'RemoteCountDownLatch' with the initial count of '10'.
@Inject
@Name("foo")
@Remote
@Count(10)
private CountDownLatch remoteLatchFoo;          

// Inject an instance of 'LocalSemaphore' with '0' (zero) permits available.
@Inject
@Name("bar")
@Remote
private Semaphore localSemaphoreBar;            

// Inject an instance of 'RemoteSemaphore' with '1' permit available.
@Inject
@Name("bar")
@Remote
@Permits(1)
private Semaphore remoteSemaphoreBar;

CDI注入を介してlatchまたはsemaphoreインスタンスを取得した後は、LatchesまたはSemaphoresファクトリ・クラスから直接取得したインスタンスと同じ方法で使用できます。

@Name注釈は、注入ポイントからメンバー名(前述の例ではフィールド名)を取得できるかぎり、どちらの場合もオプションですが、それ以外の場合(コンストラクタ注入を使用する場合など)は必須です。

@Count注釈は、最初のラッチ数を指定し、省略するとデフォルトで1になります。@Permits注釈は、セマフォの使用可能な許可数を指定し、省略するとデフォルトで0になります。つまり、最初のacquireコールは、別のスレッドが1つ以上の許可を解放するまでブロックされます。