モジュール java.base
パッケージ java.util.concurrent

クラスFlow


  • public final class Flow
    extends Object
    PublishersSubscriptionによって管理される1つ以上のSubscribersによって消費されるアイテムを生成するフロー制御コンポーネントを確立するための相互関係のあるインタフェースと静的メソッド。

    これらのインタフェースは、reactive-streams仕様に対応しています。 これらは、同時および分散非同期設定の両方に適用されます: すべての(seven)メソッドは、 void "one-way"メッセージ・スタイルで定義されています。 通信は、"push"ベースのシステムで発生するリソース管理の問題を回避するために使用できる単純なフロー制御(メソッドFlow.Subscription.request(long))に依存しています。

    Flow.Publisherは通常、独自のFlow.Subscription実装を定義しています。メソッドsubscribeに1を構築し、それを呼び出し元のFlow.Subscriberに発行します。 通常、Executorを使用してアイテムを非同期でサブスクライバにパブリッシュします。 たとえば、単一のサブスクライバに(リクエスト時)という単一の TRUEアイテムのみを発行する非常に単純なパブリッシャがあります。 サブスクライバは単一のアイテムしか受け取らないため、このクラスはほとんどの実装で必要なバッファリングと順序制御を使用しません(たとえばSubmissionPublisher)。

     
     class OneShotPublisher implements Publisher<Boolean> {
       private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
       private boolean subscribed; // true after first subscribe
       public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
         if (subscribed)
           subscriber.onError(new IllegalStateException()); // only one allowed
         else {
           subscribed = true;
           subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
         }
       }
       static class OneShotSubscription implements Subscription {
         private final Subscriber<? super Boolean> subscriber;
         private final ExecutorService executor;
         private Future<?> future; // to allow cancellation
         private boolean completed;
         OneShotSubscription(Subscriber<? super Boolean> subscriber,
                             ExecutorService executor) {
           this.subscriber = subscriber;
           this.executor = executor;
         }
         public synchronized void request(long n) {
           if (!completed) {
             completed = true;
             if (n <= 0) {
               IllegalArgumentException ex = new IllegalArgumentException();
               executor.execute(() -> subscriber.onError(ex));
             } else {
               future = executor.submit(() -> {
                 subscriber.onNext(Boolean.TRUE);
                 subscriber.onComplete();
               });
             }
           }
         }
         public synchronized void cancel() {
           completed = true;
           if (future != null) future.cancel(false);
         }
       }
     }

    Flow.Subscriberはアイテムのリクエストと処理を手配します。 アイテム(Flow.Subscriber.onNext(T)の呼び出し)はリクエストされない限り発行されませんが、複数のアイテムがリクエストされることがあります。 多くのサブスクライバの実装では、次の例のようにこの方法を整えることができます。バッファ・サイズが1ステップで、サイズが大きいほど、通常は通信のオーバーラップを抑えてより効率的なオーバーラップ処理が可能です。たとえば64の値を指定すると、32と64の合計未処理リクエストが保持されます。 特定のFlow.Subscriptionに対するSubscriberメソッドの呼び出しは厳密に順序付けられているため、Subscriberが複数のSubscription (その場合、複数のサブスクライバを定義し、それぞれに独自のサブスクリプションがある方がよい)を保持していない限り、これらのメソッドでロックまたは揮発性を使用する必要はありません。

     
     class SampleSubscriber<T> implements Subscriber<T> {
       final Consumer<? super T> consumer;
       Subscription subscription;
       final long bufferSize;
       long count;
       SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
         this.bufferSize = bufferSize;
         this.consumer = consumer;
       }
       public void onSubscribe(Subscription subscription) {
         long initialRequestSize = bufferSize;
         count = bufferSize - bufferSize / 2; // re-request when half consumed
         (this.subscription = subscription).request(initialRequestSize);
       }
       public void onNext(T item) {
         if (--count <= 0)
           subscription.request(count = bufferSize - bufferSize / 2);
         consumer.accept(item);
       }
       public void onError(Throwable ex) { ex.printStackTrace(); }
       public void onComplete() {}
     }

    defaultBufferSize()のデフォルト値は、期待されるレート、リソース、および用途に基づいてFlowコンポーネント内のリクエスト・サイズと容量を選択する際に役立ちます。 あるいは、フロー制御が決して必要とされないとき、サブスクライバは、以下のように、事実上無制限の数のアイテムを最初にリクエストしてもよい:

     
     class UnboundedSubscriber<T> implements Subscriber<T> {
       public void onSubscribe(Subscription subscription) {
         subscription.request(Long.MAX_VALUE); // effectively unbounded
       }
       public void onNext(T item) { use(item); }
       public void onError(Throwable ex) { ex.printStackTrace(); }
       public void onComplete() {}
       void use(T item) { ... }
     }

    導入されたバージョン:
    9
    • メソッドの詳細

      • defaultBufferSize

        public static int defaultBufferSize()
        パブリッシャまたはサブスクライバのバッファリングのデフォルト値を返します。これは他の制約がない場合に使用されます。
        実装上のノート:
        返される現在の値は256です。
        戻り値:
        バッファ・サイズの値