41 分散環境での同時実行性の実装
Coherence同時実行モジュールは、エグゼキュータ、アトミック、ロック、セマフォ、ラッチなどのjava.util.concurrent
パッケージの同時実行性プリミティブを分散実装します。
すでに使い慣れた構成を使用して同時実行アプリケーションを実装でき、さらに、同時実行性のスコープを単一のプロセスからCoherenceクラスタ内の数百のプロセスに拡張することもできます。エグゼキュータを使用して、クラスタ内のどこかで実行されるタスクを送信できます。ロック、ラッチおよびセマフォを使用すると、多くのクラスタ・メンバー間で実行を同期できます。また、アトミックを使用すると、多くのプロセスにわたってグローバル・カウンタを実装できます。
これらの機能は非常に強力で、すでにある知識を再利用できますが、スケーラビリティやパフォーマンスに悪影響を及ぼす可能性があります。ロック、ラッチまたはセマフォを介して実行を同期するたびに、アーキテクチャにボトルネックが発生する可能性があります。分散アトミックを使用してグローバル・カウンタを実装する場合、増分や減分など、ローカルではナノ秒しかかからない非常に単純な操作が、ミリ秒かかる(負荷が高い場合は、さらに長くブロックされる可能性がある)かなり高価なネットワーク呼出しに変わります。
そのため、これらの機能は慎重に使用してください。多くの場合、エントリ・プロセッサ、アグリゲータ、イベントなどのCoherenceプリミティブを使用して同じ目標を達成するための、より適切で高速かつスケーラブルな方法があります。これらのプリミティブは、分散環境で適切に実行およびスケーリングするように設計されています。
ノート:
- 同時実行機能を使用するには、Oracleでは、ブートストラップAPIを使用してCoherenceクラスタ・メンバーを起動することをお薦めします。ブートストラップAPIの使用を参照してください。
- Coherence同時実行機能は、フェデレーションをサポートしておらず、これを使用するように構成できません。Coherenceフェデレーションは非同期です。したがって、本来アトミックな性質を持つデータをフェデレートすることは意味がありません。
この章の内容は次のとおりです。
- ファクトリ・クラスの使用
各機能(エグゼキュータ、アトミック、ロック、セマフォおよびラッチ)は、1つ以上のCoherenceキャッシュ(場合によっては事前構成済のインターセプタ)によってバックアップされます。下位レベルのCoherenceプリミティブとのすべてのやりとりは、必要なクラスのインスタンスを取得できる様々なファクトリ・クラスの背後に隠されています。 - ローカルおよびリモート・インスタンスの使用
多くの場合、ファクトリ・クラスを使用すると、様々な構造のローカル・インスタンスとリモート・インスタンスの両方を取得できます。たとえば、Locks.localLock
は標準java.util.concurrent.locks.ReentrantLock
のインスタンスを提供し、Locks.remoteLock
はRemoteLock
のインスタンスを返します。 - シリアライズの使用
Coherence同時実行は、JavaシリアライズとPOFの即時利用可能なシリアライズの両方をサポートしており、Javaシリアライズがデフォルトです。 - 永続性の使用
Coherence同時実行では、アクティブとオンデマンドの両方の永続性がサポートされますが、Coherenceの他の部分と同様に、デフォルトでon-demand
に設定されます。 - Coherence同時実行機能の使用
Coherence同時実行機能は、pom.xml
ファイルで機能を依存関係として宣言した後に使用できます。 - エグゼキュータの使用
Coherence同時実行は、Runnable
、Callable
またはTask
のいずれかのタスクをCoherenceクラスタにディスパッチして実行する機能を提供します。 - アトミックの使用
Coherence同時実行では、AtomicInteger
、AtomicLong
、AtomicReference
などのアトミック型の分散実装が提供されます。また、同じタイプのローカル実装も提供されます。 - ロックの使用
Coherence同時実行では、java.util.concurrent.locks
パッケージのLock
およびReadWriteLock
インタフェースの分散実装が提供されるため、必要に応じてクラスタ・メンバー間でロックベースの並行処理制御を実装できます。 - ラッチおよびセマフォの使用
- ブロッキング・キューの使用
親トピック: データ・グリッド操作の実行
ファクトリ・クラスの使用
各機能(エグゼキュータ、アトミック、ロック、セマフォおよびラッチ)は、1つ以上のCoherenceキャッシュ(場合によっては事前構成済のインターセプタ)によってバックアップされます。下位レベルのCoherenceプリミティブとのすべてのやりとりは、必要なクラスのインスタンスを取得できる様々なファクトリ・クラスの背後に隠されています。
たとえば、Atomics
クラス内のファクトリ・メソッドを使用して様々なアトミック型のインスタンスを取得し、Locks
を使用してロック・インスタンスを取得し、Latches
およびSemaphores
を使用してラッチおよびセマフォを取得します。
親トピック: 分散環境での同時実行性の実装
ローカルおよびリモート・インスタンスの使用
多くの場合、ファクトリ・クラスを使用すると、様々な構造のローカル・インスタンスとリモート・インスタンスの両方を取得できます。たとえば、Locks.localLock
は標準 java.util.concurrent.locks.ReentrantLock
のインスタンスを提供し、Locks.remoteLock
はRemoteLock
のインスタンスを返します。
JDKが標準インタフェースを提供しない場合(アトミック、ラッチおよびセマフォの場合)、既存のJDKクラスのインタフェースが抽出され、対応するJDK実装を囲むThinラッパーが作成されます。たとえば、Coherence同時実行には、java.util.concurrent.Semaphore
をラップするSemaphore
インタフェースおよびLocalSemaphore
クラスが用意されています。CountDownLatch
およびすべてのアトミック型についても同様です。
ファクトリ・クラスを使用してローカル・インスタンスとリモート・インスタンスの両方を構築する主な利点は、リモート・ロックと同じ方法でローカル・ロックに名前を付けられることです。Locks.localLock("foo")
をコールすると、常に同じLock
インスタンスが返されます。これは、Locks
クラスによって作成されるローカル・インスタンスとリモート・インスタンスの両方が内部的にキャッシュされるためです。リモート・ロックの場合、ローカルにキャッシュされたすべてのリモート・ロック・インスタンスは、最終的にクラスタ内のどこかで共有ロック・インスタンスによってバックアップされます。これは、プロセス間でロック状態を同期するために使用されます。
親トピック: 分散環境での同時実行性の実装
シリアライズの使用
Coherence同時実行は、JavaシリアライズとPOFの即時利用可能なシリアライズの両方をサポートしており、Javaシリアライズがデフォルトです。
POFを使用する場合は、coherence.concurrent.serializer
システム・プロパティをpof
に設定する必要があります。組込みのCoherence同時実行タイプを登録するには、独自のPOF構成ファイルにもcoherence-concurrent-pof-config.xml
ファイルを含める必要があります。
親トピック: 分散環境での同時実行性の実装
永続性の使用
Coherence同時実行では、アクティブとオンデマンドの両方の永続性がサポートされますが、Coherenceの他の部分と同様に、デフォルトでon-demand
に設定されます。
アクティブな永続性を使用するには、coherence.concurrent.persistence.environment
システム・プロパティをdefault-active
に設定するか、アクティブな永続性が有効になっている別の永続性環境を使用する必要があります。
ノート:
ロックおよびセマフォ・データを格納するキャッシュは一時的なものとして構成され、アクティブまたはオンデマンドの永続性を使用する場合は永続化されません。親トピック: 分散環境での同時実行性の実装
Coherence同時実行機能の使用
Coherence同時実行機能は、pom.xml
ファイルで機能を依存関係として宣言した後に使用できます。
宣言するには、pom.xml
ファイルに次のエントリを作成します。
<dependency>
<groupId>${coherence.groupId}</groupId>
<artifactId>coherence-concurrent</artifactId>
<version>${coherence.version}</version>
</dependency>
親トピック: 分散環境での同時実行性の実装
エグゼキュータの使用
Runnable
、Callable
またはTask
のいずれかのタスクをCoherenceクラスタにディスパッチして実行する機能を提供します。送信されたタスクを実行するエグゼキュータは、キャッシュ構成リソース内に1つ以上の名前付きエグゼキュータを定義することで、各クラスタ・メンバーで構成されます。
この項には次のトピックが含まれます:
親トピック: 分散環境での同時実行性の実装
エグゼキュータの使用 - 例
デフォルトでは、クラスパスにcoherence-concurrent
モジュールがある各Coherenceクラスタには、ディスパッチされたタスクの実行に使用できる単一スレッド・エグゼキュータが含まれます。
RemoteExecutor remoteExecutor = RemoteExecutor.getDefault();
Future<Void> result = remoteExecutor.submit(() -> System.out.println("Executed"));
result.get(); // block until completion
Fixed5
という名前で構成されている場合は、次のようにしてエグゼキュータへの参照を取得できます:RemoteExecutor remoteExecutor = RemoteExecutor.get("Fixed5");
RemoteExecutor
は次の例外をスローします。RejectedExecutionException
各RemoteExecutor
インスタンスには、RemoteExecutor
が不要になったときに解放する必要があるローカル・リソースを保持できます。ExecutorService
と同様に、RemoteExecutor
にはエグゼキュータを停止する同様のメソッドがあります。これらのメソッドをコールしても、クラスタ内に登録されたエグゼキュータには影響しません。
親トピック: エグゼキュータの使用
高度なオーケストレーション
RemoteExecutor
は、JDKに含まれる標準のExecutorService
と同様の機能を提供しますが、Coherenceのコンテキストでは十分でない可能性があります。タスクは、複数のCoherenceメンバー間で実行し、中間結果を生成し、タスクを実行しているクラスタ・メンバーに障害が発生しても永続性を維持することが必要な場合があります。
このような場合、タスク・オーケストレーションを使用できます。オーケストレーションの詳細に入る前に、次の概念を理解する必要があります。
表41-1 タスク・オーケストレーション・インタフェース
インタフェース | 説明 |
---|---|
|
タスクは、1つ以上のスレッドによって潜在的に実行されるように設計されているという点で、 |
|
実行時に |
|
特定の |
|
収集された |
|
|
|
タスクの状態共有メカニズム。 |
|
可変結果コンテナに結果を蓄積し、オプションですべての結果が処理された後で蓄積された結果を最終的な表現に変換する可変リダクション操作。 |
タスク
Task
実装は、タスクを実行するexecute(Context)
という単一のメソッドを定義し、場合によっては後で実行されるようにします。メソッドの実行が完了すると、結果を返すか、例外をスローし(ただし、Yield
例外はスローしません)、割り当てられたExecutor
に対してタスクが完了したとみなされます。
Task
は、Yield
例外をスローすることで、一定時間実行を停止することがあります。この例外タイプは、Executor
によるTask
の実行が一時停止され、通常同じExecutor
によって、後で再開されることを示します。
親トピック: 高度なオーケストレーション
タスク・コンテキスト
Task
が実行されると、Context
インスタンスが実行引数として渡されます。
Context
は、複数のJava仮想マシンで実行されているタスク間で共有状態を許可するタスク・プロパティへのアクセスを提供します。
Context
は、全体的な実行ステータスの詳細を示します。
表41-2 実行ステータス
実行状態 | メソッド | 説明 |
---|---|---|
|
|
タスクが完了したかどうかを |
|
|
タスクが実際に取り消されたかどうかを |
|
|
|
親トピック: 高度なオーケストレーション
タスク・オーケストレーション
オーケストレーションは、RemoteExecutor.orchestrate(Task)
をコールすることから始まります。これにより、指定されたTask
に対してTask.Orchestration
インスタンスが返されます。Task.Orchestration
を使用すると、タスクを実行する側面を構成できます。
表41-3 タスク・オーケストレーション・メソッド
メソッド | 説明 |
---|---|
|
タスクは、指定されたエグゼキュータが定義/構成されているすべてのJava仮想マシンで同時に実行されます。これはデフォルトです。 |
|
タスクは、指定されたエグゼキュータが定義/構成されているすべてのJava仮想マシンで順番に実行されます。 |
|
タスクを |
|
フィルタリングによって、タスクが実行される場所を制限する追加の方法が提供されます。述語は、各Java Virtual Machineの各エグゼキュータに関連付けられたメタデータに適用されます。メタデータの例としては、エグゼキュータが実行されているメンバー、またはメンバーのロールなどがあります。述語は連鎖して、適切なエグゼキュータを決定する際のブール・ロジックを提供できます。 |
|
タスクが実行されているJava Virtual Machineに関係なく、すべてのタスクで使用できる初期状態を定義します。 |
|
指定した場合、タスクは保持され、完了後にタスク計算の最終結果を新規サブスクライバに通知できます。 |
|
これは、 |
親トピック: 高度なオーケストレーション
タスク・コレクタおよびコレクタブル
オーケストレーションに渡されるTask.Collector
は、タスクから結果を収集し、オプションで収集された結果を最終形式に変換します。コレクタは、TaskCollector
クラスで使用可能なコレクタの例を使用すると、最もわかりやすいです。
表41-4 タスク・コレクタのメソッド
メソッド | 説明 |
---|---|
|
実行中のタスクから収集されたnull以外の結果の数。 |
|
実行中のタスクによって提供された最初の結果を収集して返します。 |
|
実行中のタスクによって返された最後の結果を収集して返します。 |
|
null以外のすべての結果を収集し、セットとして返します。 |
|
null以外のすべての結果を収集し、リストとして返します。 |
オーケストレーションでcollect
をコールして返されるTask.Collectable
インスタンスでは、登録済サブスクライバによって結果を収集または公開しなくなる条件を設定できます。Task.Collectable
でsubmit()
をコールすると、タスクのオーケストレーションが開始されます。
親トピック: 高度なオーケストレーション
タスク・コーディネータ
オーケストレーションCollectable
でsubmit()
をコールすると、Task.Coordinator
が返されます。Task.Collectable
と同様に、Task.Coordinator
ではサブスクライバを登録できます。さらに、オーケストレーションを取り消したり、完了ステータスを確認したりできます。
親トピック: 高度なオーケストレーション
タスク・サブスクライバ
Task.Subscriber
は、オーケストレーションの実行ステータスに関する様々なイベントを受信します。
表41-5 タスク・サブスクライバ・イベント
メソッド | 説明 |
---|---|
|
オーケストレーションの完了を通知します。 |
|
回復不可能なエラー(引数として指定)が発生したときにコールされます。 |
|
|
|
|
親トピック: 高度なオーケストレーション
高度なオーケストレーション - 例
最初に、オーケストレーションの例に共通する次のコードについて考えてみます:
// demonstrate orchestration using the default RemoteExecutor RemoteExecutor executor = RemoteExecutor.getDefault(); // WaitingSubscriber is an implementation of the // com.oracle.coherence.concurrent.executor.Task.Subscriber interface // that has a get() method that blocks until Subscriber.onComplete() is // called and will return the results received by onNext() WaitingSubscriber subscriber = new WaitingSubscriber(); // ValueTask is an implementation of the // com.oracle.coherence.concurrent.executor.Task interface // that returns the value provided at construction time ValueTask task = new ValueTask("Hello World");
前述の例を考慮すると、オーケストレーションの最も簡単な例は次のとおりです:
// orchestrate the task, subscribe, and submit executor.orchestrate(task) .subscribe(subscriber) .submit(); // wait for the task to complete // if this was run on four cluster members running the default executor service, // the returned Collection will have four results Collection<String> results = subscriber.get();
前述の例を基に、2つの記憶域メンバーと2つのプロキシ・メンバーを持つクラスタを想定します。クラスタ・メンバーは、それぞれstorage
およびproxy
のロールで構成されます。たとえば、storage
メンバーでのみタスクを実行する必要がある場合、オーケストレーションは次のようになります:
// orchestrate the task, filtering by a role, subscribe, and submit executor.orchestrate(task) .filter(Predicates.role("storage")) .subscribe(subscriber) .submit(); // wait for the task to complete // as there are only two storage members in this hypothetical, only two // results will be returned Collection<String> results = subscriber.get();
com.oracle.coherence.concurrent.executor.function.Predicates
で使用できる述語はいくつかありますが、ターゲット・ユース・ケースに適用されない場合は、Remote.Predicate
インタフェースを実装するだけです。
collect(Collector)
およびuntil(Predicate)
を使用して、結果の収集およびサブスクライバへの提示方法をカスタマイズできます:
// orchestrate the task, collecting the first non-null result, // subscribe, and submit executor.orchestrate(new MayReturnNullTask()) .collect(TaskCollectors.firstOf()) .until(Predicates.nonNullValue()) .subscribe(subscriber) .submit(); // wait for the task to complete // the first non-result returned will be the one provided to the // subscriber Collection<String> results = subscriber.get();
com.oracle.coherence.concurrent.executor.TaskCollectors
には複数のコレクタが用意されていますが、ターゲット・ユース・ケースに適用されない場合は、Task.Collector
インタフェースを実装してください。
親トピック: エグゼキュータの使用
エグゼキュータの構成
構成には、いくつかのエグゼキュータ・タイプを使用できます。
表41-6 エグゼキュータのタイプ
ExecutorServiceタイプ | 説明 |
---|---|
単一スレッド |
単一スレッドを持つ |
固定スレッド |
固定数のスレッドを持つ |
キャッシュ |
必要に応じて新しいスレッドを作成し、可能な場合は既存のスレッドを再利用する |
ワーク・スティーリング |
使用可能なプロセッサの数をターゲットの並列性レベルとして使用し、ワーク・スティーリング・スレッド・プールを作成します。 |
カスタム |
非標準エグゼキュータの作成を許可します。 |
VirtualThread |
VirtualThread-per-task ExecutorServiceを作成します。JDK 21以上が必要です。 |
表41-7 構成要素
要素名 | 必須 | 予想される型 | 説明 |
---|---|---|---|
|
いいえ |
該当なし |
単一スレッド・エグゼキュータを定義します。 |
|
いいえ |
該当なし |
固定スレッド・プール・エグゼキュータを定義します。 |
|
いいえ |
該当なし |
キャッシュ済スレッド・プール・エグゼキュータを定義します |
|
いいえ |
該当なし |
ワーク・スティーリング・プール・エグゼキュータを定義します。 |
|
いいえ |
|
カスタム・エグゼキュータを定義します。 |
|
いいえ |
該当なし |
VirtualThread-per-taskエグゼキュータを定義します。 |
|
はい |
|
エグゼキュータの論理 |
|
はい |
|
固定スレッド・プール・エグゼキュータのスレッド数を定義します。 |
|
いいえ |
|
ワーク・スティーリング・スレッド・プール・エグゼキュータの並列性を定義します。定義しない場合、デフォルトでシステムで使用可能なプロセッサの数になります。 |
|
いいえ |
該当なし |
|
|
はい |
|
|
詳細は、スキーマを参照してください。
構成例
cache-config
ルート要素にcoherence-concurrent
NamespaceHandlerを含める必要があります。<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xmlns:c="class://com.oracle.coherence.concurrent.config.NamespaceHandler"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd class://com.oracle.coherence.concurrent.config.NamespaceHandler concurrent.xsd"> .
.
.
</cache-config>
ノート:
構成で定義されたエグゼキュータは、ドキュメント内の他の要素より前にする必要があります。これを行わないと、ドキュメントを検証できなくなります。xml
ネームスペースがc
であると想定しています。<!-- creates a single-threaded executor named <em>Single</em> -->
<c:single>
<c:name>Single</c:name>
</c:single>
<!-- creates a single-threaded executor named <em>Single</em> with a thread factory-->
<c:single>
<c:name>SingleTF</c:name>
<c:thread-factory>
<c:instance>
<c:class-name>my.custom.ThreadFactory</c:class-name>
</c:instance>
</c:thread-factory>
</c:single>
<!-- creates a fixed-thread executor named <em>Fixed5</em> -->
<c:fixed>
<c:name>Single</c:name>
<c:thread-count>5</c:thread-count>
</c:fixed>
親トピック: エグゼキュータの使用
エグゼキュータの管理
エグゼキュータを管理およびモニターするには様々な方法があります。次のいずれかのオプションを使用できます。
- MBean: ExecutorMBeanに関する項を参照してください。
- Reporter: エグゼキュータ・レポートの理解に関する項を参照してください。
- Coherence VisualVMプラグイン: coherence-visualvmを参照してください。
- REST API: RESTを介したエグゼキュータの管理を参照してください。
- 分散トレース: 分散トレースを参照してください。
親トピック: エグゼキュータの使用
RESTを介したエグゼキュータの管理
RESTを介したCoherence管理では、エンドポイントを公開して、ExecutorMBean
インスタンスに対するアクションを問い合せて呼び出します。
表41-8 RESTエンドポイント
説明 | メソッド | パス | 次のものを生成 |
---|---|---|---|
すべてのエグゼキュータを表示 |
|
|
JSON |
名前が一致するすべてのエグゼキュータを表示 |
|
|
JSON |
エグゼキュータ統計を名前でリセット |
|
|
JSON |
親トピック: エグゼキュータの使用
CDIの使用
CDIを介してRemoteExecutorsを注入できます。
@Inject
private RemoteExecutor single; // injects a RemoteExecutor named 'single'.
@Inject
@Name("Fixed5")
private RemoteExecutor fixedPoolRemoteExecutor; // injects a RemoteExecutor named 'Fixed5'.
親トピック: エグゼキュータの使用
アトミックの使用
AtomicInteger
、AtomicLong
、AtomicReference
などのアトミック型の分散実装が提供されます。また、同じタイプのローカル実装も提供されます。ローカル実装は、分散型バリアントと同じインタフェースを実装する既存のjava.util.concurrent.atomic
型を囲む単なるThinラッパーで、置換え可能です。
Atomics
クラスで適切なファクトリ・メソッドをコールする必要があります。// Creates a local, in-process instance of named 'AtomicInteger' with an implicit initial value of 0.
AtomicInteger localFoo = Atomics.localAtomicInteger("foo");
// Creates a remote, distributed instance of named 'AtomicInteger', distinct from the local instance 'foo',
// with an implicit initial value of '0'.
AtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
// Creates a remote, distributed instance of named 'AtomicLong', with an initial value of '5'.
AtomicLong remoteBar = Atomics.remoteAtomicLong("bar", 5L);
ノート:
前述のコードで使用されているAtomicInteger
およびAtomicLong
型は、java.util.concurrent.atomic
パッケージの型ではありません。実際には、LocalAtomicXyz
クラスとRemoteAtomicXyz
クラスの両方が実装するcom.oracle.coherence.concurrent.atomic
パッケージ内に定義されているインタフェースであり、これは前述のメソッドによって実際に返されるインスタンスです。
LocalAtomicInteger localFoo = Atomics.localAtomicInteger("foo");
RemoteAtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
RemoteAtomicLong remoteBar = Atomics.remoteAtomicLong("bar", 5L);
ただし、インタフェースを使用すると、必要に応じてローカル実装と分散実装を簡単に切り替えることができるため、具体的な型のかわりにインタフェースを使用することを強くお薦めします。
java.util.concurrent.atomic
型のいずれかを使用する場合と同じ方法で使用できます。int counter1 = remoteFoo.incrementAndGet();
long counter5 = remoteBar.addAndGet(5L);
この項には次のトピックが含まれます:
親トピック: 分散環境での同時実行性の実装
アトミック型の非同期実装
AtomicInteger
や AtomicLong
などの数値アトミック型のインスタンスは、クライアントがその値を増分する必要があるが、必ずしも新しい値が何であるかを知る必要がないアプリケーション内の様々なカウンタを表すために頻繁に使用されます。
ローカル・アトミックを操作する場合は、前に示したものと同じAPIを使用し(アトミックの使用を参照)、戻り値は無視されるだけです。ただし、サーバーからのレスポンスを待機している間にクライアントで不要なブロッキングが発生する分散アトミックを使用すると、破棄されるだけです。明らかに、これはアトミックのパフォーマンスとスループットの両方に悪影響を及ぼします。
このような状況でのリモート・コールの影響を軽減するために、Coherence同時実行では、サポートするすべてのアトミック型の非ブロッキングの非同期実装も提供されます。
async
メソッドをコールするだけです。// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicInteger', with an implicit initial value of 0.
AsyncAtomicInteger asyncFoo = Atomics.remoteAtomicInteger("foo").async();
// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicLong', with an initial value of 5.
AsyncAtomicLong asyncBar = Atomics.remoteAtomicLong("bar", 5L).async();
CompletableFuture
を返すだけで、ブロックしないことです。CompletableFuture<Integer> futureCounter1 = asyncFoo.incrementAndGet();
CompletableFuture<Long> futureCounter5 = asyncBar.addAndGet(5L);
同じ名前の分散アトミック型のブロッキング・インスタンスと非ブロッキング・インスタンスは、両方とも同じクラスタ側のアトミック・インスタンス状態によってバックアップされるため、これらを同じ意味で使用できます。
親トピック: アトミックの使用
CDIの使用
また、Coherence同時実行のアトミック型はCDIを使用して注入できるため、Atomicsクラスでの明示的なファクトリ・メソッド・コールが不要になります。
// Injects a local, in-process instance of an 'AtomicInteger' named 'foo', with an implicit initial value of '0'.
@Inject
@Name("foo")
private AtomicInteger localFoo;
// Injects a remote, distributed instance of an 'AtomicInteger' named 'foo', distinct from
// the local instance 'foo', with an implicit initial value of '0'.
@Inject
@Remote
@Name("foo")
private AtomicInteger remoteFoo;
// Injects a remote, distributed instance of non-blocking 'AsyncAtomicLong', with an implicit name of 'asyncBar'.
@Inject
@Remote
private AsyncAtomicLong asyncBar
CDI注入を介してアトミック型のインスタンスを取得した後は、Atomics
ファクトリ・クラスから直接取得したインスタンスと同じ方法で使用できます。
親トピック: アトミックの使用
ロックの使用
Coherence同時実行では、java.util.concurrent.locks
パッケージのLock
およびReadWriteLock
インタフェースの分散実装が提供されるため、必要に応じてクラスタ・メンバー間でロックベースの並行処理制御を実装できます。
ローカルJDK実装とは異なり、このパッケージのクラスは、クラスタ・メンバー/プロセスIDおよびスレッドIDを使用してロック所有者を識別し、共有ロック状態をCoherence NamedMap
内に格納します。ただし、これは、ロックを取得および解放する呼出しは、別のクラスタ・メンバーに格納される可能性がある共有状態を更新する必要があるため、リモートのネットワーク呼出しであるということも意味します。この更新は、lock
およびunlock
操作のパフォーマンスに影響する可能性があります。
この項には次のトピックが含まれます:
排他的ロックの使用
RemoteLock
クラスは、Lock
インタフェースの実装を提供し、1つのメンバー上の1つのスレッドのみが、いつでもロックによって保護されるクリティカル・セクションを実行していることを確認できます。
RemoteLock
のインスタンスを取得するには、Locks.remoteLock factory
メソッドをコールします。Lock foo = Locks.remoteLock("foo");
Atomics
に示すように、Locks
クラスからローカルLock
インスタンスを取得できます。これは、localLock
ファクトリ・メソッドをコールして、標準のjava.util.concurrent.locks.ReentrantLock
のインスタンスを返すだけです。Lock foo = Locks.localLock("foo");
Lock
インスタンスを作成した後、通常どおりに使用できます。foo.lock();
try {
// critical section guarded by the exclusive lock `foo`
}
finally {
foo.unlock();
}
親トピック: ロックの使用
読取り/書込みロックの使用
RemoteReadWriteLock
クラスは、ReadWriteLock
インタフェースの実装を提供し、複数の同時リーダーを許可しながら、1つのメンバー上の1つのスレッドのみが、いつでもwrite
ロックによって保護されるクリティカル・セクションを実行していることを確認できます。
RemoteReadWriteLock
のインスタンスを取得するには、Locks.remoteReadWriteLock
ファクトリ・メソッドをコールします。ReadWriteLock bar = Locks.remoteReadWriteLock("bar");
Atomics
に示すように、Locks
クラスからローカルReadWriteLock
インスタンスを取得できます。これは、localReadWriteLock
ファクトリ・メソッドをコールして、標準のjava.util.concurrent.locks.ReentrantReadWriteLock
のインスタンスを返すだけです。ReadWriteLock bar = Locks.localReadWriteLock("bar");
ReadWriteLock
インスタンスを作成した後、通常どおりに使用できます。bar.writeLock().lock()
try {
// critical section guarded by the exclusive write lock `bar`
}
finally {
bar.writeLock().unlock();
}
または:
bar.readLock().lock()
try {
// critical section guarded by the shared read lock `bar`
}
finally {
bar.readLock().unlock();
}
親トピック: ロックの使用
CDIの使用
CDIを使用して、排他的と読取り/書込みの両方のロック・インスタンスを、それらを必要とするオブジェクトに注入することもできます。
// Injects distributed exclusive lock named 'foo' into the 'lock' field.
@Inject
@Remote
@Name("foo")
private Lock lock;
// Injects distributed read/write lock named 'bar' into the 'bar' field.
@Inject
@Remote
@Name("bar")
private ReadWriteLock bar;
CDI注入を介してロックのインスタンスを取得した後は、Locks
ファクトリ・クラスから直接取得したインスタンスと同じ方法で使用できます。
親トピック: ロックの使用
ラッチおよびセマフォの使用
Coherence同時実行では、java.util.concurrent
パッケージのCountDownLatch
およびSemaphore
クラスの分散実装も提供されるため、複数のCoherenceクラスタ・メンバー間での実行の同期を、2つのJDKクラスを使用して単一のプロセス内で実装するのと同じくらい簡単に実装できます。また、リモート実装とローカル実装の両方が準拠する、これら2つの同時実行性プリミティブのインタフェースも提供されます。
Atomics
に示すように、ローカル実装は、対応するJDKクラスを囲むThinラッパーにすぎません。
この項には次のトピックが含まれます:
カウントダウン・ラッチの使用
RemoteCoundDownLatch
クラスは、CountDownLatch
の分散実装を提供し、ラッチを待機している任意のクラスタ・メンバー上のコードの実行が、ラッチがゼロに達したときにのみ続行されるようにできます。すべてのクラスタ・メンバーは、ラッチを待機してカウントダウンできます。
RemoteCountDownLatch
のインスタンスを取得するには、Latches.remoteCountDownLatch
ファクトリ・メソッドをコールします。// Creates an instance of a 'RemoteCountDownLatch' with the initial count of '5'.
CoundDownLatch foo = Latches.remoteCountDownLatch("foo", 5);
Atomics
およびLocks
に示すように、remoteCountDownLatch
ファクトリ・メソッドをコールすることで、Latches
クラスからローカルCountDownLatch
インスタンスを取得できます。// Creates an instance of a 'LocalCountDownLatch' with the initial count of '10'.
CoundDownLatch foo = Latches.localCountDownLatch("foo", 10);
RemoteCountDownLatch
インスタンスを作成したら、countDown
およびawait
メソッドをコールすることで、通常どおりに使用できます。
親トピック: ラッチおよびセマフォの使用
セマフォの使用
RemoteSemaphore
クラスは、Semaphore
の分散実装を提供し、すべてのクラスタ・メンバーが同じsemaphore
インスタンスから許可を取得および解放できるようにします。
RemoteSemaphore
のインスタンスを取得するには、Semaphores.remoteSemaphore
ファクトリ・メソッドをコールします。// Creates an instance of a 'RemoteSemaphore' with '5' permits.
Semaphore foo = Semaphores.remoteSemaphore("foo", 5);
Atomics
およびLocks
に示すように、localSemaphore
ファクトリ・メソッドをコールすることで、Semaphores
クラスからローカルSemaphore
インスタンスを取得できます。// Creates an instance of a 'LocalSemaphore' with '0' permits.
Semaphore foo = Semaphores.localSemaphore("foo");
Semaphore
インスタンスを作成したら、release
およびacquire
メソッドをコールすることで、通常どおりに使用できます。
親トピック: ラッチおよびセマフォの使用
CDIの使用
CDIを使用して、CountDownLatch
とSemaphore
の両方のインスタンスを、それらを必要とするオブジェクトに注入することもできます。
// Injects an instance of 'LocalCountDownLatch' with the initial count of '5'.
@Inject
@Name("foo")
@Count(5)
private CountDownLatch localLatchFoo;
// Injects an instance of 'RemoteCountDownLatch' with the initial count of '10'.
@Inject
@Name("foo")
@Remote
@Count(10)
private CountDownLatch remoteLatchFoo;
// Inject an instance of 'LocalSemaphore' with '0' (zero) permits available.
@Inject
@Name("bar")
@Remote
private Semaphore localSemaphoreBar;
// Inject an instance of 'RemoteSemaphore' with '1' permit available.
@Inject
@Name("bar")
@Remote
@Permits(1)
private Semaphore remoteSemaphoreBar;
CDI注入を介してlatch
またはsemaphore
インスタンスを取得した後は、Latches
またはSemaphores
ファクトリ・クラスから直接取得したインスタンスと同じ方法で使用できます。
@Name
注釈は、注入ポイントからメンバー名(前述の例ではフィールド名)を取得できるかぎり、どちらの場合もオプションですが、それ以外の場合(コンストラクタ注入を使用する場合など)は必須です。
@Count
注釈は、最初のラッチ数を指定し、省略するとデフォルトで1になります。@Permits
注釈は、セマフォの使用可能な許可数を指定し、省略するとデフォルトで0になります。つまり、最初のacquire
コールは、別のスレッドが1つ以上の許可を解放するまでブロックされます。
親トピック: ラッチおよびセマフォの使用
ブロッキング・キューの使用
Coherenceでは、Coherence CE 24.03からデータ構造としてキューがサポートされます。Coherence NamedQueue
はjava.util.Queue
の実装で、NamedDeque
はjava.util.Deque
の実装です。
Coherenceには、BlockingQueue
の2つの実装があります。1つは単純なサイズ制限付きキューで、2つ目ははるかに大きい容量の分散ページ・キューです。単純なキューは、BlockingQueue
と両端キューのBlockingDeque
の両方として使用できます。分散ページ・キューは、BlockingQueue
実装としてのみ使用できます。
ノート:
Coherenceキューは、キューと同じ名前を持つキャッシュにマップされます。キャッシュがキューに使用されている場合、同じキャッシュを通常のデータ・キャッシュとして使用することはできません。
ブロッキング・キュー
Coherence同時実行モジュールには、NamedBlockingQueue
というjava.util.concurrent.BlockingQueue
の実装と、NamedBlockingDeque
というjava.util.concurrent.BlockingDeque
の実装が含まれます。
アプリケーションでCoherenceブロッキング・キューを使用するには、次のようにcoherence-concurrent
モジュールに対する依存関係を追加する必要があります:
<dependency>
<groupId>com.oracle.coherence</groupId>
<artifactId>coherence-concurrent</artifactId>
<version>14.1.2-0-0</version>
</dependency>
ブロッキング・キューのインスタンスを取得するには、com.oracle.coherence.concurrent.Queues
ファクトリ・クラスを使用します。
"my-queue"という名前の単純なサイズ制限付きBlockingQueue
を取得するには、次の例を参照してください:
NamedBlockingQueue<String> queue = Queues.queue("my-queue");
"my-deque"という名前の単純なサイズ制限付きBlockingDeque
を取得するには、次の例を参照してください:
NamedBlockingDeque<String> queue = Queues.deque("my-deque");
"my-queue"という名前の分散ページBlockingQueue
を取得するには、次の例を参照してください:
NamedBlockingQueue<String> queue = Queues.pagedQueue("my-queue");
ブロッキング・キューの実装は、Coherenceイベントを使用して機能します。アプリケーション・コードがブロッキング・メソッドをコールすると、コール元スレッドはブロックされますが、ブロッキングはサーバー上では行われません。アプリケーション・コードは、サーバーからイベントを受信するとブロック解除されます。
たとえば、アプリケーション・コードがNamedBlockingQueue take()
メソッドをコールし、キューが空の場合、このメソッドはコール元スレッドをブロックします。要素が別のスレッド(別のJVM上にある場合もある)によってキューに入れられると、コール元アプリケーションはイベントを受信します。これにより、take()
が再試行され、成功すると戻ります。take()
の再試行が失敗した場合、コール元スレッドはブロックされたままになります。たとえば、別のスレッドまたは別のJVMも同じキューからの取得をブロックされていて、最初の試行で再試行に成功した場合です。
もう1つの例は、NamedBlockingQueue put()
メソッドをコールするアプリケーションです。このメソッドは、キューが満杯(2GBのサイズ制限)になるとブロックされます。この場合、コール元スレッドは、キュー内に領域があることを示す削除イベントを受信するまでブロックされます。put()
が再試行され、成功した場合はコール元スレッドに制御が戻されます。再試行が失敗した場合、スレッドはブロックされたままになります。たとえば、別のスレッドまたはJVMもput()
でブロックされていて、その再試行が成功すると、キューは再度満杯になります。
キューのサイズ設定
2つのCoherenceキュー実装でデータがどのように格納されるか、およびこれがキューのサイズをどのように制限するかを理解することが重要です。
-
単純なCoherenceキュー – 単純なキュー(およびデキュー)実装では、単一のCoherenceキャッシュ・パーティションにデータが格納されます。Coherenceキャッシュ・パーティションのサイズは2GBを超えることはできないため、これによって2GBのサイズ制限が適用されます。実際には、パーティションはこれよりもはるかに小さいです。大きいパーティションでは、記憶域が有効なメンバーがクラスタを離れるとリカバリが遅くなります。最新の高速ネットワークでは、300MB - 500MBが適切な最大パーティション・サイズです。10GBネットワークでは、1GBまで大きくできる可能性があります。
-
分散ページ・キュー - 分散ページ・キューは、通常のキャッシュ・データと同じように、Coherenceクラスタの複数のパーティションにわたって分散されたページにデータを格納します。つまり、ページ・キューでは2GBをはるかに超える容量を格納できます。パーティション・サイズによってキューの合計サイズが制限されることにも注意することが重要です。
パーティションごとに2GBの絶対的なハード制限によって、サイズは次のようになります:
2 GB x 257 = 514 GB
しかし、これは本番での使用で信頼するには大きすぎます。サイズ制限を500MBにし、デフォルトのパーティション数を257にすると、これがキュー・サイズに与える影響がわかります。
500 MB x 257 = 128 GB
したがって、デフォルトでは、ページ・キューの現実的な上限は約128GBです。パーティション数を1087に増やすと、キュー・サイズは次のようになります:
500 MB x 1087 = 543 GB
もちろん、これらのすべての例では、キュー・データをメモリーに格納するのに十分な大きさのヒープ・サイズのJVMがクラスタ内に存在することを想定しています。
制限
Coherenceでの現在のキュー実装には、次の制限があります:
-
前述のように、単純なキューのハード・サイズ制限は2GBです。単純なキューまたはデキューを使用する場合、サイズが2GBを超えると、Coherenceサーバーはキューへのオファーの受入れを拒否します。
java.util.Queue
コンタクトでは、キューがオファーを拒否できるため、このサイズ制限はキュー契約に準拠します。アプリケーション開発者は、オファリング・データからキューへのレスポンスをチェックして、オファーが成功したかどうかを判断する必要があります。ここでは「オファー」という用語を使用して、キューにデータを追加するすべてのキューおよびデキュー・メソッドをカバーします。offer()
コールからの戻り値のブール値をチェックするかわりに、キューが満杯の場合にput()
メソッドがブロックされるNamedBlockingQueue
を使用することもできます。 -
通常の操作では、キューは巨大になりません。キューが巨大になるということは通常、キューから読み取っているプロセスがキューに書き込むプロセスに追いついていないことを意味します。アプリケーション開発者は、キューを使用してアプリケーションの負荷テストを行い、容量に問題がないことを確認する必要があります。
-
オファリングやポーリングなどのキュー操作は、特定のデータ構造で競合し、これにより、パラレル・リクエストの数とリクエストの処理速度が制限されます。順序付けを維持するために、ポーリングは、キューのどちらの端がポーリングされているかに応じて、先頭または末尾のエントリのいずれかで競合します。つまり、ポーリング・メソッドは順次でしか処理できないため、ポーリングが効率的で高速であっても、多数の同時ポーリング・リクエストはキューに入れられ、一度に1つずつ処理されます。Offerメソッドは、先頭または末尾で競合しませんが、先頭および末尾の識別子を維持するために使用されるアトミック・カウンタで競合します。Coherenceでは、異なるワーカー・スレッドで複数のオファー・リクエストを処理できますが、
AtomicLong
の更新には軽微な競合があります。 -
オファリングやポーリングなど、先頭と末尾で作業するキュー操作は効率的です。
java.util.Queue
およびjava.util.Deque
のその他のメソッドには効率の悪いものもあります。たとえば、イテレータ・メソッド、contains()
などです。これらは、基本的なキュー機能を必要とするアプリケーションではあまり使用されません。キューを変更するjava.util.Queue
APIの一部のオプション・メソッドは、UnsupportedOperationException
をスローします(これはJavaキュー・コントラクトで許可されます)。たとえば、retainAll()
、removeAll()
、イテレータを使用した削除などです。
親トピック: 分散環境での同時実行性の実装