モジュール java.base
パッケージ java.util.concurrent

クラスCountedCompleter<T>

java.lang.Object
java.util.concurrent.ForkJoinTask<T>
java.util.concurrent.CountedCompleter<T>
型パラメータ:
T - コンプリタの結果のタイプ
すべての実装されたインタフェース:
Serializable, Future<T>

public abstract class CountedCompleter<T> extends ForkJoinTask<T>
トリガーされた時点で保留中のアクションが残っていない場合に実行される完了アクションを含むForkJoinTaskです。 CountedCompleterは、一般にサブタスクの停止やブロックが発生した場合は他の形式のForkJoinTaskより堅牢ですが、直感的にプログラミングしにくくなります。 CountedCompleterの使用方法は他の完了ベースのコンポーネント(CompletionHandlerなど)と同様ですが、完了アクションonCompletion(CountedCompleter)をトリガーするためには、1つではなく複数の保留完了が必要になることがあります。 別の方法で初期化された場合を除き、保留カウントはゼロから始まりますが、setPendingCount(int)addToPendingCount(int)およびcompareAndSetPendingCount(int, int)メソッドを使用して(原子的に)変更できます。 tryComplete()を呼び出したときに、保留アクションのカウントがゼロでない場合は、値が減らされます。それ以外の場合は、完了アクションが実行され、このコンプリータ自体がコンプリータを持っている場合はそのコンプリータで処理が続行されます。 PhaserSemaphoreなどの関連する同期化コンポーネントと同様に、これらのメソッドは内部のカウントにしか影響を与えず、それ以上の内部登録を確立しません。 特に、保留タスクの識別情報は保持されません。 次に示すように、必要に応じて保留タスクまたはそれらの結果の一部または全部を記録するサブクラスを作成できます。 次に示すように、完了のトラバースのカスタマイズをサポートするユーティリティ・メソッドも提供されます。 ただし、CountedCompleterは基本的な同期化メカニズムのみを提供するため、一連の関連する用途に適したリンク、フィールドおよび追加のサポート・メソッドを保持するその他の抽象サブクラスを作成するときに便利な場合があります。

具象CountedCompleterクラスは、ほとんどのケースで(次に示すように)復帰する前にtryComplete()を1回呼び出すcompute()メソッドを定義する必要があります。 このクラスでは、オプションで、正常な完了時にアクションを実行するonCompletion(CountedCompleter)メソッドと、例外の発生時にアクションを実行するonExceptionalCompletion(Throwable, CountedCompleter)メソッドをオーバーライドすることもできます。

ほとんどの場合、CountedCompleterは結果を生成しません。通常はCountedCompleter<Void>として宣言され、結果値として常にnullを返します。 それ以外の場合は、getRawResult()メソッドをオーバーライドして、join()、invoke()および関連メソッドから結果を提供するようにしてください。 一般に、このメソッドは完了時の結果を保持するCountedCompleterオブジェクトのフィールド(または1つ以上のフィールドの関数)の値を返す必要があります。 setRawResult(T)メソッドは、デフォルトではCountedCompleterに何の影響も与えません。 適用可能なケースはほとんどありませんが、このメソッドをオーバーライドして、結果データを持つ他のオブジェクトやフィールドを保持することも可能です。

それ自体ではコンプリータを持たない(つまり、getCompleter()nullを返す)CountedCompleterは、この追加機能を含む通常のForkJoinTaskとして使用できます。 ただし、別のコンプリータを持つコンプリータは、他の計算の内部ヘルパーとしてのみ機能するため、(ForkJoinTask.isDone()などのメソッドで報告される)それ自体のタスク・ステータスは不定です。このステータスは、complete(T)ForkJoinTask.cancel(boolean)ForkJoinTask.completeExceptionally(Throwable)の明示的な呼出し時、またはcomputeメソッドの例外完了時にのみ変更されます。 例外完了時に、タスクのコンプリータ(およびそのコンプリータ、以下同様)が存在し、まだ完了していない場合は、それらに例外が中継されます。 同様に、内部のCountedCompleterを取り消しても、そのコンプリータに対するローカルの効果しかないため、多くの場合有効ではありません。

使用例。

並列の再帰的分解。 CountedCompleterは、RecursiveActionで使用されるものと同様のツリーに配置できます。ただし、それらの設定では通常、異なる方法で構築が行われます。 この場合、各タスクのコンプリータは、計算ツリーではタスクの親になります。 必要な登録作業がやや多くなりますが、配列またはコレクションの各要素に時間のかかる(それ以上分割できない)オペレーションを適用する場合(特に、入出力などの固有の差違またはガベージ・コレクションなどの補助効果のいずれかが原因で、そのオペレーションの完了にかかる時間が要素によって大幅に異なる場合)、CountedCompleterはより適切な方法である可能性があります。 CountedCompletersは独自の継続を提供するため、他のタスクは実行を待機してブロックする必要はありません。

たとえば、divide-by-two再帰的分解を使用して作業を1つの部分(リーフ・タスク)に分割するユーティリティ・メソッドの初期バージョンを次に示します。 作業が個別の呼出しに分割されている場合でも、通常はリーフ・タスクを直接フォークするより、スレッド間通信が減少し、負荷分散が向上するツリーベースの手法をお薦めします。 再帰的ケースでは、終了するサブタスクの各ペアの2番目が、親の(結果の組合せが実行されないため、メソッドonCompletionのデフォルトのno-op実装はオーバーライドされません)の完了をトリガーします。 ユーティリティ・メソッドは、ルート・タスクを設定し、それを(ここでは、ForkJoinPool.commonPool()を暗黙的に使用)を呼び出します。 保留数を常に子タスクの数に設定し、戻す直前に tryComplete()をコールするのは、簡単で信頼性の高い(最適ではない)です。

 
 public static <E> void forEach(E[] array, Consumer<E> action) {
   class Task extends CountedCompleter<Void> {
     final int lo, hi;
     Task(Task parent, int lo, int hi) {
       super(parent); this.lo = lo; this.hi = hi;
     }

     public void compute() {
       if (hi - lo >= 2) {
         int mid = (lo + hi) >>> 1;
         // must set pending count before fork
         setPendingCount(2);
         new Task(this, mid, hi).fork(); // right child
         new Task(this, lo, mid).fork(); // left child
       }
       else if (hi > lo)
         action.accept(array[lo]);
       tryComplete();
     }
   }
   new Task(null, 0, array.length).invoke();
 }
再帰的なケースでは、タスクがその右側のタスクをフォークした後で何も実行しないため、復帰する前にその左側のタスクを直接呼び出すことができる点に気づくことで、この設計を改良できます。 (これは末尾再帰の削除に似ています。) また、タスクの最後のアクションがサブタスク(a "テール・コール")をフォークまたは呼び出す場合、 tryComplete()へのコールは、保留カウントを"1つずつオフ"に見せるコストで最適化できます。
 
     public void compute() {
       if (hi - lo >= 2) {
         int mid = (lo + hi) >>> 1;
         setPendingCount(1); // looks off by one, but correct!
         new Task(this, mid, hi).fork(); // right child
         new Task(this, lo, mid).compute(); // direct invoke
       } else {
         if (hi > lo)
           action.accept(array[lo]);
         tryComplete();
       }
     }
さらなる最適化として、左側のタスクも存在する必要がないことに注意してください。 新しいタスクを作成するかわりに、元のタスクを引き続き使用し、フォークごとに保留カウントを追加できます。 また、このツリー内のタスクはonCompletion(CountedCompleter)メソッドを実装していないため、tryCompletepropagateCompletion()に置き換えることができます。
 
     public void compute() {
       int n = hi - lo;
       for (; n >= 2; n /= 2) {
         addToPendingCount(1);
         new Task(this, lo + n/2, lo + n).fork();
       }
       if (n > 0)
         action.accept(array[lo]);
       propagateCompletion();
     }
保留中のカウントを事前計算できる場合は、コンストラクタ内で確立できます:
 
 public static <E> void forEach(E[] array, Consumer<E> action) {
   class Task extends CountedCompleter<Void> {
     final int lo, hi;
     Task(Task parent, int lo, int hi) {
       super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo));
       this.lo = lo; this.hi = hi;
     }

     public void compute() {
       for (int n = hi - lo; n >= 2; n /= 2)
         new Task(this, lo + n/2, lo + n).fork();
       action.accept(array[lo]);
       propagateCompletion();
     }
   }
   if (array.length > 0)
     new Task(null, 0, array.length).invoke();
 }
このようなクラスの追加の最適化には、リーフ・ステップのクラスを専門とし、反復ごとに2つではなく4つずつ細分化し、常に1つの要素に細分化するのではなく適応しきい値を使用することが必要になる場合があります。

検索 CountedCompleterのツリーでは、データ構造の様々な部分に含まれる値またはプロパティを検索し、結果が見つかった場合はただちにAtomicReferenceに結果を報告できます。 他のタスクはその結果をポーリングして、不要な作業を回避できます。 (さらに、他のタスクを取り消すこともできますが、通常は結果が設定されたことをそれらに通知し、その場合は後続の処理を省略させるのがより簡単で効率的です。) 完全なパーティション化を使用する配列を使って再度説明します(この場合も、実際には、リーフ・タスクはほとんど常に複数の要素を処理します)。

 
 class Searcher<E> extends CountedCompleter<E> {
   final E[] array; final AtomicReference<E> result; final int lo, hi;
   Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
     super(p);
     this.array = array; this.result = result; this.lo = lo; this.hi = hi;
   }
   public E getRawResult() { return result.get(); }
   public void compute() { // similar to ForEach version 3
     int l = lo, h = hi;
     while (result.get() == null && h >= l) {
       if (h - l >= 2) {
         int mid = (l + h) >>> 1;
         addToPendingCount(1);
         new Searcher(this, array, result, mid, h).fork();
         h = mid;
       }
       else {
         E x = array[l];
         if (matches(x) && result.compareAndSet(null, x))
           quietlyCompleteRoot(); // root task is now joinable
         break;
       }
     }
     tryComplete(); // normally complete whether or not found
   }
   boolean matches(E e) { ... } // return true if found

   public static <E> E search(E[] array) {
       return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
   }
 }
この例では、タスクが共通の結果をcompareAndSet以外に他の効果を持たない他のタスクと同様に、ルート・タスクの完了後に完了を管理するためにブックキーピングが必要ないため、tryCompleteの末尾の無条件呼出しを条件付き(if (result.get() == null) tryComplete();)にできます。

サブタスクの記録。 複数のサブタスクの結果を結合するCountedCompleterタスクは、通常、onCompletion(CountedCompleter)メソッドでこれらの結果にアクセスする必要があります。 次の(マッピングとリダクションの型がすべてEである簡易形式のマップ・リデュースを実行する)クラスに示すように、分割統治型の設計でこれを行う方法の1つは、各サブタスクにその兄弟を記録させ、onCompletionメソッドでそれにアクセスできるようにすることです。 この手法は、左と右の結果を結合する順序が問題にならないようなリダクションに適用されます。順序付けされたリダクションでは、左と右を明示的に指定する必要があります。 前の例に示した他の合理化方法のバリアントを適用することもできます。

 
 class MyMapper<E> { E apply(E v) {  ...  } }
 class MyReducer<E> { E apply(E x, E y) {  ...  } }
 class MapReducer<E> extends CountedCompleter<E> {
   final E[] array; final MyMapper<E> mapper;
   final MyReducer<E> reducer; final int lo, hi;
   MapReducer<E> sibling;
   E result;
   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
              MyReducer<E> reducer, int lo, int hi) {
     super(p);
     this.array = array; this.mapper = mapper;
     this.reducer = reducer; this.lo = lo; this.hi = hi;
   }
   public void compute() {
     if (hi - lo >= 2) {
       int mid = (lo + hi) >>> 1;
       MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
       MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
       left.sibling = right;
       right.sibling = left;
       setPendingCount(1); // only right is pending
       right.fork();
       left.compute();     // directly execute left
     }
     else {
       if (hi > lo)
           result = mapper.apply(array[lo]);
       tryComplete();
     }
   }
   public void onCompletion(CountedCompleter<?> caller) {
     if (caller != this) {
       MapReducer<E> child = (MapReducer<E>)caller;
       MapReducer<E> sib = child.sibling;
       if (sib == null || sib.result == null)
         result = child.result;
       else
         result = reducer.apply(child.result, sib.result);
     }
   }
   public E getRawResult() { return result; }

   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
     return new MapReducer<E>(null, array, mapper, reducer,
                              0, array.length).invoke();
   }
 }
ここで、onCompletionメソッドは結果を結合する多くの完了設計に共通する形式を取っています。 このコールバック形式のメソッドは、保留カウントがゼロである(またはゼロになる)2つの異なるコンテキストのいずれかで、タスクごとに1回トリガーされます。つまり、(1) tryCompleteの呼出し時にその保留カウントがゼロである場合は、そのタスク自体によってトリガーされ、(2)そのサブタスクのいずれかが完了し、保留カウントをゼロに減らしたときは、そのサブタスクによってトリガーされます。 caller引数はケースを区別します。 ほとんどの場合、呼出し側がthisであれば、アクションは不要です。 それ以外の場合、caller引数は結合される値(または他の値へのリンク、あるいはその両方)を提供するために(通常はキャストを介して)使用されます。 保留カウントが適切に使用されていれば、onCompletion内のアクションはタスクおよびそのサブタスクの完了時に(1回)発生します。 このタスクまたは他の完了したタスクのフィールドへのアクセスのスレッド安全性を確保するために、このメソッド内に同期化を追加する必要はありません。

完了のトラバース onCompletionを使用して完了を処理することが不適当または不都合である場合は、firstComplete()およびnextComplete()メソッドを使用してカスタム・トラバースを作成できます。 たとえば、3番目のForEachの例の形式で右側のタスクを分割するのみのMapReducerを定義するには、使い果たされていないサブタスクのリンクに沿って完了を協調的にリデュースする必要があり、次のように行うことができます。

 
 class MapReducer<E> extends CountedCompleter<E> { // version 2
   final E[] array; final MyMapper<E> mapper;
   final MyReducer<E> reducer; final int lo, hi;
   MapReducer<E> forks, next; // record subtask forks in list
   E result;
   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
              MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
     super(p);
     this.array = array; this.mapper = mapper;
     this.reducer = reducer; this.lo = lo; this.hi = hi;
     this.next = next;
   }
   public void compute() {
     int l = lo, h = hi;
     while (h - l >= 2) {
       int mid = (l + h) >>> 1;
       addToPendingCount(1);
       (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
       h = mid;
     }
     if (h > l)
       result = mapper.apply(array[l]);
     // process completions by reducing along and advancing subtask links
     for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) {
       for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next)
         t.result = reducer.apply(t.result, s.result);
     }
   }
   public E getRawResult() { return result; }

   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
     return new MapReducer<E>(null, array, mapper, reducer,
                              0, array.length, null).invoke();
   }
 }

トリガー。 CountedCompleterの中には、それ自体はフォークされないが、かわりに他の設計の構成要素の一部として機能するものがあります。これには、1つ以上の非同期タスクの完了が別の非同期タスクをトリガーするCountedCompleterが含まれます。 たとえば、

 
 class HeaderBuilder extends CountedCompleter<...> { ... }
 class BodyBuilder extends CountedCompleter<...> { ... }
 class PacketSender extends CountedCompleter<...> {
   PacketSender(...) { super(null, 1); ... } // trigger on second completion
   public void compute() { } // never called
   public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
 }
 // sample use:
 PacketSender p = new PacketSender();
 new HeaderBuilder(p, ...).fork();
 new BodyBuilder(p, ...).fork();

導入されたバージョン:
1.8
関連項目: