モジュール java.base
パッケージ java.util.concurrent

クラス・フロー


  • public final class Flow
    extends Object
    Publishersが1つ以上のSubscribersによって消費されるアイテムを生成し、それぞれがSubscriptionによって管理される、フロー制御コンポーネントを確立するための相互関連インタフェースおよび静的メソッド。

    これらのインタフェースは、reactive-streams仕様に対応しています。 これらは、同時および分散の両方の非同期設定で適用されます。すべての(7つの)メソッドは、 voidの一方向メッセージ・スタイルで定義されます。 通信は、単純な形式のフロー制御(メソッドFlow.Subscription.request(long))に依存します。この方法を使用すると、プッシュ・ベースのシステムで発生する可能性のあるリソース管理の問題を回避できます。

    Flow.Publisherは、通常、独自のFlow.Subscription実装を定義し、メソッドsubscribeで実装を作成し、それをコール元のFlow.Subscriberに発行します。 通常はExecutorを使用して、アイテムをサブスクライバに非同期にパブリッシュします。 たとえば、単一の TRUEアイテムのみを単一のサブスクライバに発行する、非常に単純なパブリッシャを次に示します(リクエストされた場合)。 サブスクライバは1つの項目のみを受信するため、このクラスはほとんどの実装で必要なバッファリングおよび順序付け制御を使用しません(SubmissionPublisherなど)。

     
     class OneShotPublisher implements Publisher<Boolean> {
       private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
       private boolean subscribed; // true after first subscribe
       public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
         if (subscribed)
           subscriber.onError(new IllegalStateException()); // only one allowed
         else {
           subscribed = true;
           subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
         }
       }
       static class OneShotSubscription implements Subscription {
         private final Subscriber<? super Boolean> subscriber;
         private final ExecutorService executor;
         private Future<?> future; // to allow cancellation
         private boolean completed;
         OneShotSubscription(Subscriber<? super Boolean> subscriber,
                             ExecutorService executor) {
           this.subscriber = subscriber;
           this.executor = executor;
         }
         public synchronized void request(long n) {
           if (!completed) {
             completed = true;
             if (n <= 0) {
               IllegalArgumentException ex = new IllegalArgumentException();
               executor.execute(() -> subscriber.onError(ex));
             } else {
               future = executor.submit(() -> {
                 subscriber.onNext(Boolean.TRUE);
                 subscriber.onComplete();
               });
             }
           }
         }
         public synchronized void cancel() {
           completed = true;
           if (future != null) future.cancel(false);
         }
       }
     }

    Flow.Subscriberは、アイテムがリクエストおよび処理されるように配置します。 アイテム(Flow.Subscriber.onNext(T)の呼出し)は、リクエストされないかぎり発行されませんが、複数のアイテムがリクエストされる可能性があります。 サブスクライバ実装の多くは、次の例のスタイルでこれを配置できます。このスタイルでは、バッファ・サイズが1つの単一ステップであり、サイズが大きいほど、通信の少ない効率的なオーバーラップ処理が可能になります。たとえば、値が64の場合、32から64の間の未処理のリクエストの合計が保持されます。 特定のFlow.Subscriptionに対するSubscriberメソッドの呼出しは厳密に順序付けされるため、サブスクライバが複数のサブスクリプションを保持しないかぎり、これらのメソッドでロックまたは揮発性を使用する必要はありません(その場合は、それぞれ独自のサブスクリプションを持つ複数のサブスクライバを定義することをお薦めします)。

     
     class SampleSubscriber<T> implements Subscriber<T> {
       final Consumer<? super T> consumer;
       Subscription subscription;
       final long bufferSize;
       long count;
       SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
         this.bufferSize = bufferSize;
         this.consumer = consumer;
       }
       public void onSubscribe(Subscription subscription) {
         long initialRequestSize = bufferSize;
         count = bufferSize - bufferSize / 2; // re-request when half consumed
         (this.subscription = subscription).request(initialRequestSize);
       }
       public void onNext(T item) {
         if (--count <= 0)
           subscription.request(count = bufferSize - bufferSize / 2);
         consumer.accept(item);
       }
       public void onError(Throwable ex) { ex.printStackTrace(); }
       public void onComplete() {}
     }

    デフォルト値のdefaultBufferSize()は、予想されるレート、リソースおよび使用状況に基づいてフロー・コンポーネントのリクエスト・サイズおよび容量を選択する際に役立つ開始点となる場合があります。 または、フロー制御が不要な場合、サブスクライバは、次のように、事実上無制限の数のアイテムを最初にリクエストできます。

     
     class UnboundedSubscriber<T> implements Subscriber<T> {
       public void onSubscribe(Subscription subscription) {
         subscription.request(Long.MAX_VALUE); // effectively unbounded
       }
       public void onNext(T item) { use(item); }
       public void onError(Throwable ex) { ex.printStackTrace(); }
       public void onComplete() {}
       void use(T item) { ... }
     }

    導入されたバージョン:
    9
    • メソッドの詳細

      • defaultBufferSize

        public static int defaultBufferSize()
        パブリッシャまたはサブスクライバ・バッファリングのデフォルト値を返します。この値は、他の制約がない場合に使用できます。
        実装上のノート:
        返される現在の値は256です。
        戻り値:
        バッファ・サイズ値