41 分散環境での同時実行性の実装

Coherence同時実行モジュールは、エグゼキュータ、アトミック、ロック、セマフォ、ラッチなどのjava.util.concurrentパッケージの同時実行性プリミティブを分散実装します。

すでに使い慣れた構成を使用して同時実行アプリケーションを実装でき、さらに、同時実行性のスコープを単一のプロセスからCoherenceクラスタ内の数百のプロセスに拡張することもできます。エグゼキュータを使用して、クラスタ内のどこかで実行されるタスクを送信できます。ロック、ラッチおよびセマフォを使用すると、多くのクラスタ・メンバー間で実行を同期できます。また、アトミックを使用すると、多くのプロセスにわたってグローバル・カウンタを実装できます。

これらの機能は非常に強力で、すでにある知識を再利用できますが、スケーラビリティやパフォーマンスに悪影響を及ぼす可能性があります。ロック、ラッチまたはセマフォを介して実行を同期するたびに、アーキテクチャにボトルネックが発生する可能性があります。分散アトミックを使用してグローバル・カウンタを実装する場合、増分や減分など、ローカルではナノ秒しかかからない非常に単純な操作が、ミリ秒かかる(負荷が高い場合は、さらに長くブロックされる可能性がある)かなり高価なネットワーク呼出しに変わります。

そのため、これらの機能は慎重に使用してください。多くの場合、エントリ・プロセッサ、アグリゲータ、イベントなどのCoherenceプリミティブを使用して同じ目標を達成するための、より適切で高速かつスケーラブルな方法があります。これらのプリミティブは、分散環境で適切に実行およびスケーリングするように設計されています。

ノート:

  • 同時実行機能を使用するには、Oracleでは、ブートストラップAPIを使用してCoherenceクラスタ・メンバーを起動することをお薦めします。ブートストラップAPIの使用を参照してください。
  • Coherence同時実行機能は、フェデレーションをサポートしておらず、これを使用するように構成できません。Coherenceフェデレーションは非同期です。したがって、本来アトミックな性質を持つデータをフェデレートすることは意味がありません。

この章の内容は次のとおりです。

ファクトリ・クラスの使用

各機能(エグゼキュータ、アトミック、ロック、セマフォおよびラッチ)は、1つ以上のCoherenceキャッシュ(場合によっては事前構成済のインターセプタ)によってバックアップされます。下位レベルのCoherenceプリミティブとのすべてのやりとりは、必要なクラスのインスタンスを取得できる様々なファクトリ・クラスの背後に隠されています。

たとえば、Atomicsクラス内のファクトリ・メソッドを使用して様々なアトミック型のインスタンスを取得し、Locksを使用してロック・インスタンスを取得し、LatchesおよびSemaphoresを使用してラッチおよびセマフォを取得します。

ローカルおよびリモート・インスタンスの使用

多くの場合、ファクトリ・クラスを使用すると、様々な構造のローカル・インスタンスとリモート・インスタンスの両方を取得できます。たとえば、Locks.localLockは標準 java.util.concurrent.locks.ReentrantLockのインスタンスを提供し、Locks.remoteLockRemoteLockのインスタンスを返します。

JDKが標準インタフェースを提供しない場合(アトミック、ラッチおよびセマフォの場合)、既存のJDKクラスのインタフェースが抽出され、対応するJDK実装を囲むThinラッパーが作成されます。たとえば、Coherence同時実行には、java.util.concurrent.SemaphoreをラップするSemaphoreインタフェースおよびLocalSemaphoreクラスが用意されています。CountDownLatchおよびすべてのアトミック型についても同様です。

ファクトリ・クラスを使用してローカル・インスタンスとリモート・インスタンスの両方を構築する主な利点は、リモート・ロックと同じ方法でローカル・ロックに名前を付けられることです。Locks.localLock("foo")をコールすると、常に同じLockインスタンスが返されます。これは、Locksクラスによって作成されるローカル・インスタンスとリモート・インスタンスの両方が内部的にキャッシュされるためです。リモート・ロックの場合、ローカルにキャッシュされたすべてのリモート・ロック・インスタンスは、最終的にクラスタ内のどこかで共有ロック・インスタンスによってバックアップされます。これは、プロセス間でロック状態を同期するために使用されます。

シリアライズの使用

Coherence同時実行は、JavaシリアライズとPOFの即時利用可能なシリアライズの両方をサポートしており、Javaシリアライズがデフォルトです。

POFを使用する場合は、coherence.concurrent.serializerシステム・プロパティをpofに設定する必要があります。組込みのCoherence同時実行タイプを登録するには、独自のPOF構成ファイルにもcoherence-concurrent-pof-config.xmlファイルを含める必要があります。

永続性の使用

Coherence同時実行では、アクティブとオンデマンドの両方の永続性がサポートされますが、Coherenceの他の部分と同様に、デフォルトでon-demandに設定されます。

アクティブな永続性を使用するには、coherence.concurrent.persistence.environmentシステム・プロパティをdefault-activeに設定するか、アクティブな永続性が有効になっている別の永続性環境を使用する必要があります。

ノート:

ロックおよびセマフォ・データを格納するキャッシュは一時的なものとして構成され、アクティブまたはオンデマンドの永続性を使用する場合は永続化されません。

Coherence同時実行機能の使用

Coherence同時実行機能は、pom.xmlファイルで機能を依存関係として宣言した後に使用できます。

宣言するには、pom.xmlファイルに次のエントリを作成します。

<dependency>
    <groupId>${coherence.groupId}</groupId>
    <artifactId>coherence-concurrent</artifactId>
    <version>${coherence.version}</version>
</dependency>

エグゼキュータの使用

Coherence同時実行は、RunnableCallableまたはTaskのいずれかのタスクをCoherenceクラスタにディスパッチして実行する機能を提供します。送信されたタスクを実行するエグゼキュータは、キャッシュ構成リソース内に1つ以上の名前付きエグゼキュータを定義することで、各クラスタ・メンバーで構成されます。

この項には次のトピックが含まれます:

エグゼキュータの使用 - 例

デフォルトでは、クラスパスにcoherence-concurrentモジュールがある各Coherenceクラスタには、ディスパッチされたタスクの実行に使用できる単一スレッド・エグゼキュータが含まれます。

この場合、最も単純な例は次のようになります:
RemoteExecutor remoteExecutor = RemoteExecutor.getDefault();

Future<Void> result = remoteExecutor.submit(() -> System.out.println("Executed"));

result.get(); // block until completion
エグゼキュータがFixed5という名前で構成されている場合は、次のようにしてエグゼキュータへの参照を取得できます:
RemoteExecutor remoteExecutor = RemoteExecutor.get("Fixed5");
指定された名前のエグゼキュータが構成されない場合、RemoteExecutorは次の例外をスローします。
RejectedExecutionException

RemoteExecutorインスタンスには、RemoteExecutorが不要になったときに解放する必要があるローカル・リソースを保持できます。ExecutorServiceと同様に、RemoteExecutorにはエグゼキュータを停止する同様のメソッドがあります。これらのメソッドをコールしても、クラスタ内に登録されたエグゼキュータには影響しません。

高度なオーケストレーション

RemoteExecutorは、JDKに含まれる標準のExecutorServiceと同様の機能を提供しますが、Coherenceのコンテキストでは十分でない可能性があります。タスクは、複数のCoherenceメンバー間で実行し、中間結果を生成し、タスクを実行しているクラスタ・メンバーに障害が発生しても永続性を維持することが必要な場合があります。

このような場合、タスク・オーケストレーションを使用できます。オーケストレーションの詳細に入る前に、次の概念を理解する必要があります。

表41-1 タスク・オーケストレーション・インタフェース

インタフェース 説明

Task

タスクは、1つ以上のスレッドによって潜在的に実行されるように設計されているという点で、CallableおよびRunnableクラスに似ています。CallableクラスおよびRunnableクラスとは異なり、実行は、異なるJava仮想マシンで発生し、異なるJava Virtual Machineプロセス間で失敗またはリカバリ(あるいはその両方)が発生する可能性があります。

Task.Context

実行時にTaskのコンテキスト情報を提供します。これには、Taskを実行しているExecutorの中間結果にアクセスして更新する機能も含まれます。

Task.Orchestration

特定のRemoteExecutorの複数のCoherenceメンバーにまたがって定義された、一連のエグゼキュータにわたるTaskのオーケストレーションに関する情報を定義します。

Task.Coordinator

収集されたTask結果のパブリッシャで、送信されたTaskの調整を追加で許可します。

Task.Subscriber

Task.Coordinatorによって生成されるアイテムの受信側。

Task.Properties

タスクの状態共有メカニズム。

Task.Collector

可変結果コンテナに結果を蓄積し、オプションですべての結果が処理された後で蓄積された結果を最終的な表現に変換する可変リダクション操作。

タスク

Task実装は、タスクを実行するexecute(Context)という単一のメソッドを定義し、場合によっては後で実行されるようにします。メソッドの実行が完了すると、結果を返すか、例外をスローし(ただし、Yield例外はスローしません)、割り当てられたExecutorに対してタスクが完了したとみなされます。

Taskは、Yield例外をスローすることで、一定時間実行を停止することがあります。この例外タイプは、ExecutorによるTaskの実行が一時停止され、通常同じExecutorによって、後で再開されることを示します。

タスク・コンテキスト

Taskが実行されると、Contextインスタンスが実行引数として渡されます。

Contextは、複数のJava仮想マシンで実行されているタスク間で共有状態を許可するタスク・プロパティへのアクセスを提供します。

Contextは、全体的な実行ステータスの詳細を示します。

表41-2 実行ステータス

実行状態 メソッド 説明

Complete

Context.isDone()

タスクが完了したかどうかをTaskが判断できるようにします。完了は、正常終了、例外または取消によるものです。これらのいずれの場合も、このメソッドはtrueを返します。

Cancelled

Context.isCancelled()

タスクが実際に取り消されたかどうかをTaskが判断できるようにします。

Resuming

Context.isResuming()

ExecutorによるTaskの実行が、(フェイルオーバーなどの)リカバリ後の再開なのか、タスクが以前にYield例外をスローした後の再開によるものなのかを判断します。

タスク・オーケストレーション

オーケストレーションは、RemoteExecutor.orchestrate(Task)をコールすることから始まります。これにより、指定されたTaskに対してTask.Orchestrationインスタンスが返されます。Task.Orchestrationを使用すると、タスクを実行する側面を構成できます。

表41-3 タスク・オーケストレーション・メソッド

メソッド 説明

concurrently()

タスクは、指定されたエグゼキュータが定義/構成されているすべてのJava仮想マシンで同時に実行されます。これはデフォルトです。

sequentially()

タスクは、指定されたエグゼキュータが定義/構成されているすべてのJava仮想マシンで順番に実行されます。

limit(int)

タスクをnエグゼキュータに制限します。これを使用して、タスク実行の対象となるエグゼキュータの数を制限します。設定しない場合、デフォルトの動作では、指定したエグゼキュータが定義/構成されているすべてのJava仮想マシンでタスクが実行されます。

filter(Predicate)

フィルタリングによって、タスクが実行される場所を制限する追加の方法が提供されます。述語は、各Java Virtual Machineの各エグゼキュータに関連付けられたメタデータに適用されます。メタデータの例としては、エグゼキュータが実行されているメンバー、またはメンバーのロールなどがあります。述語は連鎖して、適切なエグゼキュータを決定する際のブール・ロジックを提供できます。

define(String, <V>)

タスクが実行されているJava Virtual Machineに関係なく、すべてのタスクで使用できる初期状態を定義します。

retain(Duration)

指定した場合、タスクは保持され、完了後にタスク計算の最終結果を新規サブスクライバに通知できます。

collect(Collector)

これは、Task.Collectableを返すオーケストレーション・ビルダーのターミナルで、結果の収集方法を定義し、最終的にタスクをグリッドに送信します。

タスク・コレクタおよびコレクタブル

オーケストレーションに渡されるTask.Collectorは、タスクから結果を収集し、オプションで収集された結果を最終形式に変換します。コレクタは、TaskCollectorクラスで使用可能なコレクタの例を使用すると、最もわかりやすいです。

表41-4 タスク・コレクタのメソッド

メソッド 説明

count()

実行中のタスクから収集されたnull以外の結果の数。

firstOf()

実行中のタスクによって提供された最初の結果を収集して返します。

lastOf()

実行中のタスクによって返された最後の結果を収集して返します。

setOf()

null以外のすべての結果を収集し、セットとして返します。

listOf()

null以外のすべての結果を収集し、リストとして返します。

オーケストレーションでcollectをコールして返されるTask.Collectableインスタンスでは、登録済サブスクライバによって結果を収集または公開しなくなる条件を設定できます。Task.Collectablesubmit()をコールすると、タスクのオーケストレーションが開始されます。

タスク・コーディネータ

オーケストレーションCollectablesubmit()をコールすると、Task.Coordinatorが返されます。Task.Collectableと同様に、Task.Coordinatorではサブスクライバを登録できます。さらに、オーケストレーションを取り消したり、完了ステータスを確認したりできます。

タスク・サブスクライバ

Task.Subscriberは、オーケストレーションの実行ステータスに関する様々なイベントを受信します。

表41-5 タスク・サブスクライバ・イベント

メソッド 説明

onComplete()

オーケストレーションの完了を通知します。

onError(Throwable)

回復不可能なエラー(引数として指定)が発生したときにコールされます。

onNext(<T>)

Task.Coordinatorが結果を生成したときにコールされます。

onSubscribe(Task.Subscription)

onComplete()onError(Throwable)またはonNext(<T>)へのコールの前にコールされます。指定されるTask.Subscriptionは、サブスクリプションの取消しまたはTask.Coordinatorへの参照の取得へのアクセスを提供します。

高度なオーケストレーション - 例

最初に、オーケストレーションの例に共通する次のコードについて考えてみます:

// demonstrate orchestration using the default RemoteExecutor
RemoteExecutor executor = RemoteExecutor.getDefault();

// WaitingSubscriber is an implementation of the
// com.oracle.coherence.concurrent.executor.Task.Subscriber interface
// that has a get() method that blocks until Subscriber.onComplete() is
// called and will return the results received by onNext()
WaitingSubscriber subscriber = new WaitingSubscriber();

// ValueTask is an implementation of the
// com.oracle.coherence.concurrent.executor.Task interface
// that returns the value provided at construction time
ValueTask task = new ValueTask("Hello World");

前述の例を考慮すると、オーケストレーションの最も簡単な例は次のとおりです:

// orchestrate the task, subscribe, and submit
executor.orchestrate(task)
        .subscribe(subscriber)
        .submit();

// wait for the task to complete
// if this was run on four cluster members running the default executor service,
// the returned Collection will have four results
Collection<String> results = subscriber.get();

前述の例を基に、2つの記憶域メンバーと2つのプロキシ・メンバーを持つクラスタを想定します。クラスタ・メンバーは、それぞれstorageおよびproxyのロールで構成されます。たとえば、storageメンバーでのみタスクを実行する必要がある場合、オーケストレーションは次のようになります:

// orchestrate the task, filtering by a role, subscribe, and submit
executor.orchestrate(task)
        .filter(Predicates.role("storage"))
        .subscribe(subscriber)
        .submit();

// wait for the task to complete
// as there are only two storage members in this hypothetical, only two
// results will be returned
Collection<String> results = subscriber.get();

com.oracle.coherence.concurrent.executor.function.Predicatesで使用できる述語はいくつかありますが、ターゲット・ユース・ケースに適用されない場合は、Remote.Predicateインタフェースを実装するだけです。

collect(Collector)およびuntil(Predicate)を使用して、結果の収集およびサブスクライバへの提示方法をカスタマイズできます:

// orchestrate the task, collecting the first non-null result,
// subscribe, and submit
executor.orchestrate(new MayReturnNullTask())
        .collect(TaskCollectors.firstOf())
        .until(Predicates.nonNullValue())
        .subscribe(subscriber)
        .submit();

// wait for the task to complete
// the first non-result returned will be the one provided to the
// subscriber
Collection<String> results = subscriber.get();

com.oracle.coherence.concurrent.executor.TaskCollectorsには複数のコレクタが用意されていますが、ターゲット・ユース・ケースに適用されない場合は、Task.Collectorインタフェースを実装してください。

エグゼキュータの構成

構成には、いくつかのエグゼキュータ・タイプを使用できます。

表41-6 エグゼキュータのタイプ

ExecutorServiceタイプ 説明

単一スレッド

単一スレッドを持つExecutorServiceを作成します。

固定スレッド

固定数のスレッドを持つExecutorServiceを作成します。

キャッシュ

必要に応じて新しいスレッドを作成し、可能な場合は既存のスレッドを再利用するExecutorServiceを作成します。

ワーク・スティーリング

使用可能なプロセッサの数をターゲットの並列性レベルとして使用し、ワーク・スティーリング・スレッド・プールを作成します。

カスタム

非標準エグゼキュータの作成を許可します。

VirtualThread

VirtualThread-per-task ExecutorServiceを作成します。JDK 21以上が必要です。

表41-7 構成要素

要素名 必須 予想される型 説明

single

いいえ

該当なし

単一スレッド・エグゼキュータを定義します。

fixed

いいえ

該当なし

固定スレッド・プール・エグゼキュータを定義します。

cached

いいえ

該当なし

キャッシュ済スレッド・プール・エグゼキュータを定義します

work-stealing

いいえ

該当なし

ワーク・スティーリング・プール・エグゼキュータを定義します。

custom-executor

いいえ

java.util.concurrent.ExecutorService

カスタム・エグゼキュータを定義します。

virtual-per-task

いいえ

該当なし

VirtualThread-per-taskエグゼキュータを定義します。

name

はい

java.lang.String

エグゼキュータの論理nameを定義します。

thread-count

はい

java.lang.Integer

固定スレッド・プール・エグゼキュータのスレッド数を定義します。

parallelism

いいえ

java.lang.Integer

ワーク・スティーリング・スレッド・プール・エグゼキュータの並列性を定義します。定義しない場合、デフォルトでシステムで使用可能なプロセッサの数になります。

thread-factory

いいえ

該当なし

java.util.concurrent.ThreadFactoryを定義します。singlefixedおよびcachedエグゼキュータによって使用されます。

instance

はい

java.util.concurrent.ThreadFactory

ThreadFactoryのインスタンス化方法を定義します。インスタンス要素の詳細は、instanceを参照してください。この要素は、thread-factory要素の子である必要があります。

詳細は、スキーマを参照してください。

構成例

エグゼキュータを定義するには、構成要素を認識するためにcache-configルート要素にcoherence-concurrent NamespaceHandlerを含める必要があります。
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
               xmlns:c="class://com.oracle.coherence.concurrent.config.NamespaceHandler"
               xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd class://com.oracle.coherence.concurrent.config.NamespaceHandler concurrent.xsd"> .
.
.
</cache-config>

ノート:

構成で定義されたエグゼキュータは、ドキュメント内の他の要素より前にする必要があります。これを行わないと、ドキュメントを検証できなくなります。
次の例では、NamespaceHandlerに定義されているxmlネームスペースがcであると想定しています。
<!-- creates a single-threaded executor named <em>Single</em> -->
<c:single>
  <c:name>Single</c:name>
</c:single>
<!-- creates a single-threaded executor named <em>Single</em> with a thread factory-->
<c:single>
  <c:name>SingleTF</c:name>
  <c:thread-factory>
    <c:instance>
      <c:class-name>my.custom.ThreadFactory</c:class-name>
    </c:instance>
  </c:thread-factory>
</c:single>
<!-- creates a fixed-thread executor named <em>Fixed5</em> -->
<c:fixed>
  <c:name>Single</c:name>
  <c:thread-count>5</c:thread-count>
</c:fixed>

エグゼキュータの管理

エグゼキュータを管理およびモニターするには様々な方法があります。次のいずれかのオプションを使用できます。

RESTを介したエグゼキュータの管理

RESTを介したCoherence管理では、エンドポイントを公開して、ExecutorMBeanインスタンスに対するアクションを問い合せて呼び出します。

表41-8 RESTエンドポイント

説明 メソッド パス 次のものを生成

すべてのエグゼキュータを表示

GET

/management/coherence/cluster/executors

JSON

名前が一致するすべてのエグゼキュータを表示

GET

/management/coherence/cluster/executors/{name}

JSON

エグゼキュータ統計を名前でリセット

POST

/management/coherence/cluster/executors/{name}/resetStatistics

JSON

CDIの使用

CDIを介してRemoteExecutorsを注入できます。

たとえば:
@Inject
private RemoteExecutor single;    // injects a RemoteExecutor named 'single'.

@Inject
@Name("Fixed5")
private RemoteExecutor fixedPoolRemoteExecutor;    // injects a RemoteExecutor named 'Fixed5'. 

アトミックの使用

Coherence同時実行では、AtomicIntegerAtomicLongAtomicReferenceなどのアトミック型の分散実装が提供されます。また、同じタイプのローカル実装も提供されます。ローカル実装は、分散型バリアントと同じインタフェースを実装する既存のjava.util.concurrent.atomic型を囲む単なるThinラッパーで、置換え可能です。
アトミック型のインスタンスを作成するには、Atomicsクラスで適切なファクトリ・メソッドをコールする必要があります。
// Creates a local, in-process instance of named 'AtomicInteger' with an implicit initial value of 0.
AtomicInteger localFoo  = Atomics.localAtomicInteger("foo"); 

// Creates a remote, distributed instance of named 'AtomicInteger', distinct from the local instance 'foo', 
// with an implicit initial value of '0'.   
AtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");  

// Creates a remote, distributed instance of named 'AtomicLong', with an initial value of '5'.
AtomicLong    remoteBar = Atomics.remoteAtomicLong("bar", 5L);

ノート:

前述のコードで使用されているAtomicIntegerおよびAtomicLong型は、java.util.concurrent.atomicパッケージの型ではありません。実際には、LocalAtomicXyzクラスとRemoteAtomicXyzクラスの両方が実装するcom.oracle.coherence.concurrent.atomicパッケージ内に定義されているインタフェースであり、これは前述のメソッドによって実際に返されるインスタンスです。
したがって、前述のコードを次のように書き換えることができます。
LocalAtomicInteger  localFoo  = Atomics.localAtomicInteger("foo");
RemoteAtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
RemoteAtomicLong    remoteBar = Atomics.remoteAtomicLong("bar", 5L);

ただし、インタフェースを使用すると、必要に応じてローカル実装と分散実装を簡単に切り替えることができるため、具体的な型のかわりにインタフェースを使用することを強くお薦めします。

インスタンスの作成後は、対応するjava.util.concurrent.atomic型のいずれかを使用する場合と同じ方法で使用できます。
int  counter1 = remoteFoo.incrementAndGet();
long counter5 = remoteBar.addAndGet(5L);

この項には次のトピックが含まれます:

アトミック型の非同期実装

AtomicIntegerAtomicLongなどの数値アトミック型のインスタンスは、クライアントがその値を増分する必要があるが、必ずしも新しい値が何であるかを知る必要がないアプリケーション内の様々なカウンタを表すために頻繁に使用されます。

ローカル・アトミックを操作する場合は、前に示したものと同じAPIを使用し(アトミックの使用を参照)、戻り値は無視されるだけです。ただし、サーバーからのレスポンスを待機している間にクライアントで不要なブロッキングが発生する分散アトミックを使用すると、破棄されるだけです。明らかに、これはアトミックのパフォーマンスとスループットの両方に悪影響を及ぼします。

このような状況でのリモート・コールの影響を軽減するために、Coherence同時実行では、サポートするすべてのアトミック型の非ブロッキングの非同期実装も提供されます。

サポートされているアトミック型の非ブロッキング・インスタンスを取得するには、その型のブロッキング・インスタンスでasyncメソッドをコールするだけです。
// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicInteger', with an implicit initial value of 0.
AsyncAtomicInteger asyncFoo = Atomics.remoteAtomicInteger("foo").async();      

// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicLong', with an initial value of 5.
AsyncAtomicLong    asyncBar = Atomics.remoteAtomicLong("bar", 5L).async();
これらのインスタンスを作成した後は、対応するブロッキング・タイプのいずれかを使用する場合と同じ方法で使用できます。唯一の違いは、非ブロッキング・インスタンスは結果に対してCompletableFutureを返すだけで、ブロックしないことです。
CompletableFuture<Integer> futureCounter1 = asyncFoo.incrementAndGet();
CompletableFuture<Long>    futureCounter5 = asyncBar.addAndGet(5L);

同じ名前の分散アトミック型のブロッキング・インスタンスと非ブロッキング・インスタンスは、両方とも同じクラスタ側のアトミック・インスタンス状態によってバックアップされるため、これらを同じ意味で使用できます。

CDIの使用

また、Coherence同時実行のアトミック型はCDIを使用して注入できるため、Atomicsクラスでの明示的なファクトリ・メソッド・コールが不要になります。

// Injects a local, in-process instance of an 'AtomicInteger' named 'foo', with an implicit initial value of '0'.
@Inject
@Name("foo")
private AtomicInteger localFoo;   

// Injects a remote, distributed instance of an 'AtomicInteger' named 'foo', distinct from 
// the local instance 'foo', with an implicit initial value of '0'.
@Inject
@Remote
@Name("foo")
private AtomicInteger remoteFoo;  

// Injects a remote, distributed instance of non-blocking 'AsyncAtomicLong', with an implicit name of 'asyncBar'.
@Inject
@Remote
private AsyncAtomicLong asyncBar 

CDI注入を介してアトミック型のインスタンスを取得した後は、Atomicsファクトリ・クラスから直接取得したインスタンスと同じ方法で使用できます。

ロックの使用

Coherence同時実行では、java.util.concurrent.locksパッケージのLockおよびReadWriteLockインタフェースの分散実装が提供されるため、必要に応じてクラスタ・メンバー間でロックベースの並行処理制御を実装できます。

ローカルJDK実装とは異なり、このパッケージのクラスは、クラスタ・メンバー/プロセスIDおよびスレッドIDを使用してロック所有者を識別し、共有ロック状態をCoherence NamedMap内に格納します。ただし、これは、ロックを取得および解放する呼出しは、別のクラスタ・メンバーに格納される可能性がある共有状態を更新する必要があるため、リモートのネットワーク呼出しであるということも意味します。この更新は、lockおよびunlock操作のパフォーマンスに影響する可能性があります。

この項には次のトピックが含まれます:

排他的ロックの使用

RemoteLockクラスは、Lockインタフェースの実装を提供し、1つのメンバー上の1つのスレッドのみが、いつでもロックによって保護されるクリティカル・セクションを実行していることを確認できます。

RemoteLockのインスタンスを取得するには、Locks.remoteLock factoryメソッドをコールします。
Lock foo = Locks.remoteLock("foo");
Atomicsに示すように、LocksクラスからローカルLockインスタンスを取得できます。これは、localLockファクトリ・メソッドをコールして、標準のjava.util.concurrent.locks.ReentrantLockのインスタンスを返すだけです。
Lock foo = Locks.localLock("foo");
Lockインスタンスを作成した後、通常どおりに使用できます。
foo.lock();
try {
    // critical section guarded by the exclusive lock `foo`
}
finally {
    foo.unlock();
}

読取り/書込みロックの使用

RemoteReadWriteLockクラスは、ReadWriteLockインタフェースの実装を提供し、複数の同時リーダーを許可しながら、1つのメンバー上の1つのスレッドのみが、いつでもwriteロックによって保護されるクリティカル・セクションを実行していることを確認できます。

RemoteReadWriteLockのインスタンスを取得するには、Locks.remoteReadWriteLockファクトリ・メソッドをコールします。
ReadWriteLock bar = Locks.remoteReadWriteLock("bar");
Atomicsに示すように、LocksクラスからローカルReadWriteLockインスタンスを取得できます。これは、localReadWriteLockファクトリ・メソッドをコールして、標準のjava.util.concurrent.locks.ReentrantReadWriteLockのインスタンスを返すだけです。
ReadWriteLock bar = Locks.localReadWriteLock("bar");
ReadWriteLockインスタンスを作成した後、通常どおりに使用できます。
bar.writeLock().lock()
try {
    // critical section guarded by the exclusive write lock `bar`
}
finally {
    bar.writeLock().unlock();
}

または:

bar.readLock().lock()
try {
    // critical section guarded by the shared read lock `bar`
}
finally {
    bar.readLock().unlock();
}

CDIの使用

CDIを使用して、排他的と読取り/書込みの両方のロック・インスタンスを、それらを必要とするオブジェクトに注入することもできます。

// Injects distributed exclusive lock named 'foo' into the 'lock' field.
@Inject
@Remote
@Name("foo")
private Lock lock;           

// Injects distributed read/write lock named 'bar' into the 'bar' field.
@Inject
@Remote
@Name("bar")
private ReadWriteLock bar; 

CDI注入を介してロックのインスタンスを取得した後は、Locksファクトリ・クラスから直接取得したインスタンスと同じ方法で使用できます。

ラッチおよびセマフォの使用

Coherence同時実行では、java.util.concurrentパッケージのCountDownLatchおよびSemaphoreクラスの分散実装も提供されるため、複数のCoherenceクラスタ・メンバー間での実行の同期を、2つのJDKクラスを使用して単一のプロセス内で実装するのと同じくらい簡単に実装できます。また、リモート実装とローカル実装の両方が準拠する、これら2つの同時実行性プリミティブのインタフェースも提供されます。

Atomicsに示すように、ローカル実装は、対応するJDKクラスを囲むThinラッパーにすぎません。

この項には次のトピックが含まれます:

カウントダウン・ラッチの使用

RemoteCoundDownLatchクラスは、CountDownLatchの分散実装を提供し、ラッチを待機している任意のクラスタ・メンバー上のコードの実行が、ラッチがゼロに達したときにのみ続行されるようにできます。すべてのクラスタ・メンバーは、ラッチを待機してカウントダウンできます。

RemoteCountDownLatchのインスタンスを取得するには、Latches.remoteCountDownLatchファクトリ・メソッドをコールします。
// Creates an instance of a 'RemoteCountDownLatch' with the initial count of '5'.
CoundDownLatch foo = Latches.remoteCountDownLatch("foo", 5);
AtomicsおよびLocksに示すように、remoteCountDownLatchファクトリ・メソッドをコールすることで、LatchesクラスからローカルCountDownLatchインスタンスを取得できます。
// Creates an instance of a 'LocalCountDownLatch' with the initial count of '10'.
CoundDownLatch foo = Latches.localCountDownLatch("foo", 10);

RemoteCountDownLatchインスタンスを作成したら、countDownおよびawaitメソッドをコールすることで、通常どおりに使用できます。

セマフォの使用

RemoteSemaphoreクラスは、Semaphoreの分散実装を提供し、すべてのクラスタ・メンバーが同じsemaphoreインスタンスから許可を取得および解放できるようにします。

RemoteSemaphoreのインスタンスを取得するには、Semaphores.remoteSemaphoreファクトリ・メソッドをコールします。
// Creates an instance of a 'RemoteSemaphore' with '5' permits.
Semaphore foo = Semaphores.remoteSemaphore("foo", 5);
AtomicsおよびLocksに示すように、localSemaphoreファクトリ・メソッドをコールすることで、SemaphoresクラスからローカルSemaphoreインスタンスを取得できます。
// Creates an instance of a 'LocalSemaphore' with '0' permits.  
Semaphore foo = Semaphores.localSemaphore("foo");

Semaphoreインスタンスを作成したら、releaseおよびacquireメソッドをコールすることで、通常どおりに使用できます。

CDIの使用

CDIを使用して、CountDownLatchSemaphoreの両方のインスタンスを、それらを必要とするオブジェクトに注入することもできます。

// Injects an instance of 'LocalCountDownLatch' with the initial count of '5'.
@Inject
@Name("foo")
@Count(5)
private CountDownLatch localLatchFoo;

// Injects an instance of 'RemoteCountDownLatch' with the initial count of '10'.
@Inject
@Name("foo")
@Remote
@Count(10)
private CountDownLatch remoteLatchFoo;          

// Inject an instance of 'LocalSemaphore' with '0' (zero) permits available.
@Inject
@Name("bar")
@Remote
private Semaphore localSemaphoreBar;            

// Inject an instance of 'RemoteSemaphore' with '1' permit available.
@Inject
@Name("bar")
@Remote
@Permits(1)
private Semaphore remoteSemaphoreBar;

CDI注入を介してlatchまたはsemaphoreインスタンスを取得した後は、LatchesまたはSemaphoresファクトリ・クラスから直接取得したインスタンスと同じ方法で使用できます。

@Name注釈は、注入ポイントからメンバー名(前述の例ではフィールド名)を取得できるかぎり、どちらの場合もオプションですが、それ以外の場合(コンストラクタ注入を使用する場合など)は必須です。

@Count注釈は、最初のラッチ数を指定し、省略するとデフォルトで1になります。@Permits注釈は、セマフォの使用可能な許可数を指定し、省略するとデフォルトで0になります。つまり、最初のacquireコールは、別のスレッドが1つ以上の許可を解放するまでブロックされます。

ブロッキング・キューの使用

Coherenceでは、Coherence CE 24.03からデータ構造としてキューがサポートされます。Coherence NamedQueuejava.util.Queueの実装で、NamedDequejava.util.Dequeの実装です。

Coherenceには、BlockingQueueの2つの実装があります。1つは単純なサイズ制限付きキューで、2つ目ははるかに大きい容量の分散ページ・キューです。単純なキューは、BlockingQueueと両端キューのBlockingDequeの両方として使用できます。分散ページ・キューは、BlockingQueue実装としてのみ使用できます。

ノート:

Coherenceキューは、キューと同じ名前を持つキャッシュにマップされます。キャッシュがキューに使用されている場合、同じキャッシュを通常のデータ・キャッシュとして使用することはできません。

ブロッキング・キュー

Coherence同時実行モジュールには、NamedBlockingQueueというjava.util.concurrent.BlockingQueueの実装と、NamedBlockingDequeというjava.util.concurrent.BlockingDequeの実装が含まれます。

アプリケーションでCoherenceブロッキング・キューを使用するには、次のようにcoherence-concurrentモジュールに対する依存関係を追加する必要があります:


    <dependency>
        <groupId>com.oracle.coherence</groupId>
        <artifactId>coherence-concurrent</artifactId>
        <version>14.1.2-0-0</version>
    </dependency>

ブロッキング・キューのインスタンスを取得するには、com.oracle.coherence.concurrent.Queuesファクトリ・クラスを使用します。

"my-queue"という名前の単純なサイズ制限付きBlockingQueueを取得するには、次の例を参照してください:

NamedBlockingQueue<String> queue = Queues.queue("my-queue");

"my-deque"という名前の単純なサイズ制限付きBlockingDequeを取得するには、次の例を参照してください:

NamedBlockingDeque<String> queue = Queues.deque("my-deque");

"my-queue"という名前の分散ページBlockingQueueを取得するには、次の例を参照してください:

NamedBlockingQueue<String> queue = Queues.pagedQueue("my-queue");

ブロッキング・キューの実装は、Coherenceイベントを使用して機能します。アプリケーション・コードがブロッキング・メソッドをコールすると、コール元スレッドはブロックされますが、ブロッキングはサーバー上では行われません。アプリケーション・コードは、サーバーからイベントを受信するとブロック解除されます。

たとえば、アプリケーション・コードがNamedBlockingQueue take()メソッドをコールし、キューが空の場合、このメソッドはコール元スレッドをブロックします。要素が別のスレッド(別のJVM上にある場合もある)によってキューに入れられると、コール元アプリケーションはイベントを受信します。これにより、take()が再試行され、成功すると戻ります。take()の再試行が失敗した場合、コール元スレッドはブロックされたままになります。たとえば、別のスレッドまたは別のJVMも同じキューからの取得をブロックされていて、最初の試行で再試行に成功した場合です。

もう1つの例は、NamedBlockingQueue put()メソッドをコールするアプリケーションです。このメソッドは、キューが満杯(2GBのサイズ制限)になるとブロックされます。この場合、コール元スレッドは、キュー内に領域があることを示す削除イベントを受信するまでブロックされます。put()が再試行され、成功した場合はコール元スレッドに制御が戻されます。再試行が失敗した場合、スレッドはブロックされたままになります。たとえば、別のスレッドまたはJVMもput()でブロックされていて、その再試行が成功すると、キューは再度満杯になります。

キューのサイズ設定

2つのCoherenceキュー実装でデータがどのように格納されるか、およびこれがキューのサイズをどのように制限するかを理解することが重要です。

  • 単純なCoherenceキュー – 単純なキュー(およびデキュー)実装では、単一のCoherenceキャッシュ・パーティションにデータが格納されます。Coherenceキャッシュ・パーティションのサイズは2GBを超えることはできないため、これによって2GBのサイズ制限が適用されます。実際には、パーティションはこれよりもはるかに小さいです。大きいパーティションでは、記憶域が有効なメンバーがクラスタを離れるとリカバリが遅くなります。最新の高速ネットワークでは、300MB - 500MBが適切な最大パーティション・サイズです。10GBネットワークでは、1GBまで大きくできる可能性があります。

  • 分散ページ・キュー - 分散ページ・キューは、通常のキャッシュ・データと同じように、Coherenceクラスタの複数のパーティションにわたって分散されたページにデータを格納します。つまり、ページ・キューでは2GBをはるかに超える容量を格納できます。パーティション・サイズによってキューの合計サイズが制限されることにも注意することが重要です。

パーティションごとに2GBの絶対的なハード制限によって、サイズは次のようになります:

2 GB x 257 = 514 GB

しかし、これは本番での使用で信頼するには大きすぎます。サイズ制限を500MBにし、デフォルトのパーティション数を257にすると、これがキュー・サイズに与える影響がわかります。

500 MB x 257 = 128 GB

したがって、デフォルトでは、ページ・キューの現実的な上限は約128GBです。パーティション数を1087に増やすと、キュー・サイズは次のようになります:

500 MB x 1087 = 543 GB

もちろん、これらのすべての例では、キュー・データをメモリーに格納するのに十分な大きさのヒープ・サイズのJVMがクラスタ内に存在することを想定しています。

制限

Coherenceでの現在のキュー実装には、次の制限があります:

  • 前述のように、単純なキューのハード・サイズ制限は2GBです。単純なキューまたはデキューを使用する場合、サイズが2GBを超えると、Coherenceサーバーはキューへのオファーの受入れを拒否します。java.util.Queueコンタクトでは、キューがオファーを拒否できるため、このサイズ制限はキュー契約に準拠します。アプリケーション開発者は、オファリング・データからキューへのレスポンスをチェックして、オファーが成功したかどうかを判断する必要があります。ここでは「オファー」という用語を使用して、キューにデータを追加するすべてのキューおよびデキュー・メソッドをカバーします。offer()コールからの戻り値のブール値をチェックするかわりに、キューが満杯の場合にput()メソッドがブロックされるNamedBlockingQueueを使用することもできます。

  • 通常の操作では、キューは巨大になりません。キューが巨大になるということは通常、キューから読み取っているプロセスがキューに書き込むプロセスに追いついていないことを意味します。アプリケーション開発者は、キューを使用してアプリケーションの負荷テストを行い、容量に問題がないことを確認する必要があります。

  • オファリングやポーリングなどのキュー操作は、特定のデータ構造で競合し、これにより、パラレル・リクエストの数とリクエストの処理速度が制限されます。順序付けを維持するために、ポーリングは、キューのどちらの端がポーリングされているかに応じて、先頭または末尾のエントリのいずれかで競合します。つまり、ポーリング・メソッドは順次でしか処理できないため、ポーリングが効率的で高速であっても、多数の同時ポーリング・リクエストはキューに入れられ、一度に1つずつ処理されます。Offerメソッドは、先頭または末尾で競合しませんが、先頭および末尾の識別子を維持するために使用されるアトミック・カウンタで競合します。Coherenceでは、異なるワーカー・スレッドで複数のオファー・リクエストを処理できますが、AtomicLongの更新には軽微な競合があります。

  • オファリングやポーリングなど、先頭と末尾で作業するキュー操作は効率的です。java.util.Queueおよびjava.util.Dequeのその他のメソッドには効率の悪いものもあります。たとえば、イテレータ・メソッド、contains()などです。これらは、基本的なキュー機能を必要とするアプリケーションではあまり使用されません。キューを変更するjava.util.Queue APIの一部のオプション・メソッドは、UnsupportedOperationExceptionをスローします(これはJavaキュー・コントラクトで許可されます)。たとえば、retainAll()removeAll()、イテレータを使用した削除などです。