モジュール java.base
パッケージ java.util.concurrent

クラスPhaser

java.lang.Object
java.util.concurrent.Phaser

public class Phaser extends Object
再使用可能な同期化バリアーで、機能はCyclicBarrierおよびCountDownLatchと同様ですが、より柔軟な使用方法をサポートします。

登録 他のバリアーの場合とは異なり、フェーザ上で同期をとるために登録されるパーティの数は、時間とともに変化することがあります。 タスクは(メソッドregister()bulkRegister(int)、またはパーティの初期の数を確立するコンストラクタの形式を使用して)いつでも登録でき、任意の到着時に(arriveAndDeregister()を使用して)必要に応じて登録を解除できます。 もっとも基本的な同期構造と同様に、登録と登録解除は内部のカウントにしか影響を与えません。それ以上の内部登録は確立されないため、タスクは、自身が登録されているかどうかを照会できません。 (ただし、このクラスのサブクラス化によってこのような登録を導入できます。)

同期 CyclicBarrierと同様に、 Phaserは繰返し待機できます。 arriveAndAwaitAdvance()メソッドには、CyclicBarrier.awaitに類似した効果があります。 フェーザの各世代には、関連付けられたフェーズ番号があります。 このフェーズ番号は0から始まり、すべてのパーティがフェーザに到着すると進められ、 Integer.MAX_VALUEに達すると0にラップされます。 フェーズ番号を使用すると、登録されたいずれかのパーティから呼び出すことのできる2種類のメソッドを通して、フェーザへの到着時や、ほかを待機しているときのアクションを個別に制御できます。

  • 受信 arrive()メソッドとarriveAndDeregister()メソッドは、到着を記録します。 これらのメソッドはブロックを行わず、関連付けられた到着フェーズ番号、つまり、その到着が適用されるフェーザのフェーズ番号を返します。 特定のフェーズの最後のパーティが到着すると、オプションのアクションが実行され、フェーズが進められます。 これらのアクションは、フェーズをトリガーして進めるパーティによって実行され、終了も制御するオーバーライド・メソッドonAdvance(int, int)によって調整されます。 このメソッドのオーバーライドは CyclicBarrierへのバリアー・アクションの提供に似ていますが、それより柔軟です。
  • 待機中。 awaitAdvance(int)メソッドは、到着フェーズ番号を示す引数を必要とし、フェーザが別のフェーズに進むと(または、すでにそのフェーズにあるときは)復帰します。 CyclicBarrierを使用した類似の構築とは異なり、awaitAdvanceメソッドは、待機中のスレッドが割り込まれた場合でも引き続き待機します。 割込み可能なバージョンやタイムアウト・バージョンも使用できますが、タスクが割込み可能な状態またはタイム・アウトで待機中に検出された例外によって、フェーザの状態は変更されません。 必要に応じて、しばしばforceTerminationを呼び出したあとに、これらの例外のハンドラ内で関連する任意の回復を実行できます。 フェイザは、ForkJoinPoolで実行中のタスクでも使用できます。 プールの並列度レベルが同時にブロックされるパーティの最大数に対応できる場合は、進捗が確認されます。

終了 フェーザは終了状態に入ることがあり、これはisTerminated()メソッドを使用してチェックできます。 終了時は、負の戻り値で示されるように、すべての同期メソッドが、フェーズが進むのを待機せずにただちに復帰します。 同様に、終了時に登録しようとしても効果はありません。 終了は、onAdvanceの呼出しがtrueを返したときにトリガーされます。 デフォルトの実装では、登録解除によって登録済みパーティの数が0になった場合に trueを返します。 下に示されているように、フェーザが固定の反復数を使用してアクションを制御しているときは、多くの場合、現在のフェーズ番号がしきい値に達したときに終了するようにこのメソッドをオーバーライドすると便利です。 また、forceTermination()メソッドを使用すると、待機中のスレッドを強制的に解放して終了可能にすることもできます。

階層化。 フェーザは、競合を減らすために、階層型にする(つまり、ツリー構造で構築する)ことができます。 多数のパーティが含まれるために、通常であれば大きな同期競合コストが発生するフェーザを、代わりに、サブフェーザのグループが共通の親を共有するように設定できます。 これにより、操作ごとのオーバーヘッドが増えるとしても、スループットが大幅に向上する可能性があります。

階層型フェーザのツリーでは、子フェーザのその親への登録と登録解除は自動的に管理されます。 子フェーザの登録済みパーティの数が(Phaser(Phaser,int)コンストラクタ、register()、またはbulkRegister(int)で確立された) 0以外になった場合は常に、子フェーザがその親に登録されます。 arriveAndDeregister()の呼出しの結果として登録済みパーティの数が0になった場合は常に、子フェーザがその親から登録解除されます。

監視 同期メソッドを呼び出すことができるのは登録済みパーティだけですが、フェーザの現在の状態はどの呼出し側でも監視できます。 どの時点でも、合計でgetRegisteredParties()パーティが存在し、そのうちのgetArrivedParties()が現在のフェーズ(getPhase())に到着しています。 残りの(getUnarrivedParties())パーティが到着すると、フェーズが進められます。 これらのメソッドによって返された値は一時的な状態を反映している可能性があるため、一般に、同期の制御には役立ちません。 toString()メソッドは、これらの状態クエリーのスナップショットを非公式の監視に便利な形式で返します。

メモリーの一貫性の効果: 任意の形式の到着メソッドhappen-beforeより前のアクションでは、対応するフェーズが前進し、onAdvanceアクション(存在する場合)では、フェーズが進んだ後にhappen-beforeアクションが実行されます。

使用例:

Phaserは、可変数のパーティを処理する単発的なアクションを制御するために、CountDownLatchの代わりに使用できます。 一般的な慣用句は、次のように、これを最初に登録し、次にすべてのアクションを開始してから登録解除するメソッドです:

 
 void runTasks(List<Runnable> tasks) {
   Phaser startingGate = new Phaser(1); // "1" to register self
   // create and start threads
   for (Runnable task : tasks) {
     startingGate.register();
     new Thread(() -> {
       startingGate.arriveAndAwaitAdvance();
       task.run();
     }).start();
   }

   // deregister self to allow threads to proceed
   startingGate.arriveAndDeregister();
 }

一連のスレッドで、指定された反復数だけアクションを繰返し実行させるための1つの方法として、onAdvanceをオーバーライドします。

 
 void startTasks(List<Runnable> tasks, int iterations) {
   Phaser phaser = new Phaser() {
     protected boolean onAdvance(int phase, int registeredParties) {
       return phase >= iterations - 1 || registeredParties == 0;
     }
   };
   phaser.register();
   for (Runnable task : tasks) {
     phaser.register();
     new Thread(() -> {
       do {
         task.run();
         phaser.arriveAndAwaitAdvance();
       } while (!phaser.isTerminated());
     }).start();
   }
   // allow threads to proceed; don't wait for them
   phaser.arriveAndDeregister();
 }
メイン・タスクがあとで終了を待機する必要がある場合は、再登録してから、同様のループを実行できます。
 
   // ...
   phaser.register();
   while (!phaser.isTerminated())
     phaser.arriveAndAwaitAdvance();

関連する構築を使用すると、このフェーズが決してInteger.MAX_VALUEをラップしないことが確実なコンテキストで、特定のフェーズ番号を待機できます。 たとえば、

 
 void awaitPhase(Phaser phaser, int phase) {
   int p = phaser.register(); // assumes caller not already registered
   while (p < phase) {
     if (phaser.isTerminated())
       // ... deal with unexpected termination
     else
       p = phaser.arriveAndAwaitAdvance();
   }
   phaser.arriveAndDeregister();
 }

フェーザのツリーを使用してnタスクのセットを作成するには、構築時に登録するPhaserを受け入れるコンストラクタを備えたTaskクラスを想定すると、次の形式のコードを使用できます。 build(new Task[n], 0, n, new Phaser())の呼出しのあと、たとえばプールに送信することによって、これらのタスクを起動できます。

 
 void build(Task[] tasks, int lo, int hi, Phaser ph) {
   if (hi - lo > TASKS_PER_PHASER) {
     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
       int j = Math.min(i + TASKS_PER_PHASER, hi);
       build(tasks, i, j, new Phaser(ph));
     }
   } else {
     for (int i = lo; i < hi; ++i)
       tasks[i] = new Task(ph);
       // assumes new Task(ph) performs ph.register()
   }
 }
TASKS_PER_PHASERの最適な値は主に、想定される同期の頻度によって異なります。 フェーズごとのタスク本体が極端に小さい(そのため、頻度が高い)場合は4程度の小さい値が、また極端に大きい場合は最大数百の値が適している可能性があります。

実装上のノート: この実装では、パーティの最大数を65535に制限します。 追加のパーティを登録しようとすると、IllegalStateExceptionが発生します。 ただし、任意の大きな参加者のセットを格納するために階層型フェーザを作成できるため、その方法を取るようにしてください。

導入されたバージョン:
1.7