モジュール java.base
パッケージ java.util.concurrent

クラスFlow

java.lang.Object
java.util.concurrent.Flow

public final class Flow extends Object
PublishersSubscriptionによって管理される1つ以上のSubscribersによって消費されるアイテムを生成するフロー制御コンポーネントを確立するための相互関係のあるインタフェースと静的メソッド。

これらのインタフェースは、reactive-streams仕様に対応しています。 これらは、同時および分散非同期設定の両方に適用されます: すべての(seven)メソッドは、 void "one-way"メッセージ・スタイルで定義されます。 通信は、"push"ベースのシステムで発生する可能性のあるリソース管理の問題を回避するために使用できる、単純な形式のフロー制御(メソッドFlow.Subscription.request(long))に依存します。

Flow.Publisherは通常、独自のFlow.Subscription実装を定義しています。メソッドsubscribeに1を構築し、それを呼び出し元のFlow.Subscriberに発行します。 通常はExecutorを使用して、アイテムをサブスクライバに非同期にパブリッシュします。 たとえば、単一のサブスクライバに単一の TRUEアイテムのみ(リクエスト時)を発行する非常に単純なパブリッシャを次に示します。 サブスクライバは1つの項目のみを受け取るため、このクラスはほとんどの実装(たとえばSubmissionPublisher)で必要なバッファリングおよび順序付け制御を使用しません。

 
 class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
   private boolean subscribed; // true after first subscribe
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // only one allowed
     else {
       subscribed = true;
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }
     public synchronized void request(long n) {
       if (!completed) {
         completed = true;
         if (n <= 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }

Flow.Subscriberは、アイテムがリクエストおよび処理されるように配置します。 項目(Flow.Subscriber.onNext(T)の呼び出し)は、リクエストされないかぎり発行されませんが、複数の項目をリクエストできます。 多くのサブスクライバ実装では、次の例のスタイルでこれを配置できます。バッファ・サイズが1つの単一ステップであり、サイズが大きいと、通常、より少ない通信で、より効率的な重複処理が可能になります。たとえば、値が64の場合、未処理のリクエストの合計は32から64の間になります。 特定のFlow.Subscriptionに対するサブスクライバ・メソッド呼出しは厳密に順序付けられているため、サブスクライバが複数のサブスクリプション(その場合は、かわりに、それぞれ独自のサブスクリプションを持つ複数のサブスクライバを定義することをお薦めします)を保持しないかぎり、これらのメソッドでロックまたはボラ・タイルを使用する必要はありません。

 
 class SampleSubscriber<T> implements Subscriber<T> {
   final Consumer<? super T> consumer;
   Subscription subscription;
   final long bufferSize;
   long count;
   SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
     this.bufferSize = bufferSize;
     this.consumer = consumer;
   }
   public void onSubscribe(Subscription subscription) {
     long initialRequestSize = bufferSize;
     count = bufferSize - bufferSize / 2; // re-request when half consumed
     (this.subscription = subscription).request(initialRequestSize);
   }
   public void onNext(T item) {
     if (--count <= 0)
       subscription.request(count = bufferSize - bufferSize / 2);
     consumer.accept(item);
   }
   public void onError(Throwable ex) { ex.printStackTrace(); }
   public void onComplete() {}
 }

defaultBufferSize()のデフォルト値は、予想されるレート、リソースおよび使用状況に基づいて、フロー・コンポーネントでリクエスト・サイズと容量を選択する際に役立つ開始点となる場合があります。 または、フロー制御が不要な場合、サブスクライバは、次のように、効果的に無制限の数のアイテムを最初にリクエストできます:

 
 class UnboundedSubscriber<T> implements Subscriber<T> {
   public void onSubscribe(Subscription subscription) {
     subscription.request(Long.MAX_VALUE); // effectively unbounded
   }
   public void onNext(T item) { use(item); }
   public void onError(Throwable ex) { ex.printStackTrace(); }
   public void onComplete() {}
   void use(T item) { ... }
 }

導入されたバージョン:
9
  • ネストされたクラスのサマリー

    ネストされたクラス
    修飾子と型
    クラス
    説明
    static interface 
    サブスクライバとパブリッシャの両方として機能するコンポーネント。
    static interface 
    サブスクライバが受信したアイテム(および関連する制御メッセージ)のプロデューサ。
    static interface 
    メッセージのレシーバ。
    static interface 
    Flow.PublisherFlow.Subscriberをリンクするメッセージ制御。
  • メソッドのサマリー

    修飾子と型
    メソッド
    説明
    static int
    パブリッシャまたはサブスクライバ・バッファリングのデフォルト値を返します。他の制約がない場合に使用できます。

    クラスjava.lang.Objectで宣言されたメソッド

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • メソッドの詳細

    • defaultBufferSize

      public static int defaultBufferSize()
      パブリッシャまたはサブスクライバ・バッファリングのデフォルト値を返します。他の制約がない場合に使用できます。
      実装上のノート:
      返される現在の値は256です。
      戻り値:
      バッファ・サイズの値