- java.lang.Object
-
- java.util.concurrent.Flow
-
public final class Flow extends Object
Publishersが1つ以上のSubscribersによって消費されるアイテムを生成し、それぞれがSubscriptionによって管理される、フロー制御コンポーネントを確立するための相互関連インタフェースおよび静的メソッド。これらのインタフェースは、reactive-streams仕様に対応しています。 これらは、同時および分散の両方の非同期設定で適用されます。すべての(7つの)メソッドは、
voidの一方向メッセージ・スタイルで定義されます。 通信は、単純な形式のフロー制御(メソッドFlow.Subscription.request(long))に依存します。この方法を使用すると、プッシュ・ベースのシステムで発生する可能性のあるリソース管理の問題を回避できます。例
Flow.Publisherは、通常、独自のFlow.Subscription実装を定義し、メソッドsubscribeで実装を作成し、それをコール元のFlow.Subscriberに発行します。 通常はExecutorを使用して、アイテムをサブスクライバに非同期にパブリッシュします。 たとえば、単一のTRUEアイテムのみを単一のサブスクライバに発行する、非常に単純なパブリッシャを次に示します(リクエストされた場合)。 サブスクライバは1つの項目のみを受信するため、このクラスはほとんどの実装で必要なバッファリングおよび順序付け制御を使用しません(SubmissionPublisherなど)。class OneShotPublisher implements Publisher<Boolean> { private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based private boolean subscribed; // true after first subscribe public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { if (subscribed) subscriber.onError(new IllegalStateException()); // only one allowed else { subscribed = true; subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); } } static class OneShotSubscription implements Subscription { private final Subscriber<? super Boolean> subscriber; private final ExecutorService executor; private Future<?> future; // to allow cancellation private boolean completed; OneShotSubscription(Subscriber<? super Boolean> subscriber, ExecutorService executor) { this.subscriber = subscriber; this.executor = executor; } public synchronized void request(long n) { if (!completed) { completed = true; if (n <= 0) { IllegalArgumentException ex = new IllegalArgumentException(); executor.execute(() -> subscriber.onError(ex)); } else { future = executor.submit(() -> { subscriber.onNext(Boolean.TRUE); subscriber.onComplete(); }); } } } public synchronized void cancel() { completed = true; if (future != null) future.cancel(false); } } }Flow.Subscriberは、アイテムがリクエストおよび処理されるように配置します。 アイテム(Flow.Subscriber.onNext(T)の呼出し)は、リクエストされないかぎり発行されませんが、複数のアイテムがリクエストされる可能性があります。 サブスクライバ実装の多くは、次の例のスタイルでこれを配置できます。このスタイルでは、バッファ・サイズが1つの単一ステップであり、サイズが大きいほど、通信の少ない効率的なオーバーラップ処理が可能になります。たとえば、値が64の場合、32から64の間の未処理のリクエストの合計が保持されます。 特定のFlow.Subscriptionに対するSubscriberメソッドの呼出しは厳密に順序付けされるため、サブスクライバが複数のサブスクリプションを保持しないかぎり、これらのメソッドでロックまたは揮発性を使用する必要はありません(その場合は、それぞれ独自のサブスクリプションを持つ複数のサブスクライバを定義することをお薦めします)。class SampleSubscriber<T> implements Subscriber<T> { final Consumer<? super T> consumer; Subscription subscription; final long bufferSize; long count; SampleSubscriber(long bufferSize, Consumer<? super T> consumer) { this.bufferSize = bufferSize; this.consumer = consumer; } public void onSubscribe(Subscription subscription) { long initialRequestSize = bufferSize; count = bufferSize - bufferSize / 2; // re-request when half consumed (this.subscription = subscription).request(initialRequestSize); } public void onNext(T item) { if (--count <= 0) subscription.request(count = bufferSize - bufferSize / 2); consumer.accept(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} }デフォルト値の
defaultBufferSize()は、予想されるレート、リソースおよび使用状況に基づいてフロー・コンポーネントのリクエスト・サイズおよび容量を選択する際に役立つ開始点となる場合があります。 または、フロー制御が不要な場合、サブスクライバは、次のように、事実上無制限の数のアイテムを最初にリクエストできます。class UnboundedSubscriber<T> implements Subscriber<T> { public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); // effectively unbounded } public void onNext(T item) { use(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} void use(T item) { ... } }- 導入されたバージョン:
- 9
-
-
ネストされたクラスのサマリー
ネストされたクラス 修飾子と型 クラス 説明 static interfaceFlow.Processor<T,R>サブスクライバとパブリッシャの両方として機能するコンポーネント。static interfaceFlow.Publisher<T>サブスクライバが受信したアイテム(および関連する制御メッセージ)のプロデューサ。static interfaceFlow.Subscriber<T>メッセージの受信者。static interfaceFlow.SubscriptionFlow.PublisherとFlow.Subscriberをリンクするメッセージ制御。
-