- java.lang.Object
-
- java.util.concurrent.Flow
-
public final class Flow extends Object
Publishers
がSubscription
によって管理される1つ以上のSubscribers
によって消費されるアイテムを生成するフロー制御コンポーネントを確立するための相互関係のあるインタフェースと静的メソッド。これらのインタフェースは、reactive-streams仕様に対応しています。 これらは、同時および分散非同期設定の両方に適用されます: すべての(seven)メソッドは、
void
"one-way"メッセージ・スタイルで定義されています。 通信は、"push"ベースのシステムで発生するリソース管理の問題を回避するために使用できる単純なフロー制御(メソッドFlow.Subscription.request(long)
)に依存しています。例
Flow.Publisher
は通常、独自のFlow.Subscription
実装を定義しています。メソッドsubscribe
に1を構築し、それを呼び出し元のFlow.Subscriber
に発行します。 通常、Executor
を使用してアイテムを非同期でサブスクライバにパブリッシュします。 たとえば、単一のサブスクライバに(リクエスト時)という単一のTRUE
アイテムのみを発行する非常に単純なパブリッシャがあります。 サブスクライバは単一のアイテムしか受け取らないため、このクラスはほとんどの実装で必要なバッファリングと順序制御を使用しません(たとえばSubmissionPublisher
)。class OneShotPublisher implements Publisher<Boolean> { private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based private boolean subscribed; // true after first subscribe public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { if (subscribed) subscriber.onError(new IllegalStateException()); // only one allowed else { subscribed = true; subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); } } static class OneShotSubscription implements Subscription { private final Subscriber<? super Boolean> subscriber; private final ExecutorService executor; private Future<?> future; // to allow cancellation private boolean completed; OneShotSubscription(Subscriber<? super Boolean> subscriber, ExecutorService executor) { this.subscriber = subscriber; this.executor = executor; } public synchronized void request(long n) { if (!completed) { completed = true; if (n <= 0) { IllegalArgumentException ex = new IllegalArgumentException(); executor.execute(() -> subscriber.onError(ex)); } else { future = executor.submit(() -> { subscriber.onNext(Boolean.TRUE); subscriber.onComplete(); }); } } } public synchronized void cancel() { completed = true; if (future != null) future.cancel(false); } } }
Flow.Subscriber
はアイテムのリクエストと処理を手配します。 アイテム(Flow.Subscriber.onNext(T)
の呼び出し)はリクエストされない限り発行されませんが、複数のアイテムがリクエストされることがあります。 多くのサブスクライバの実装では、次の例のようにこの方法を整えることができます。バッファ・サイズが1ステップで、サイズが大きいほど、通常は通信のオーバーラップを抑えてより効率的なオーバーラップ処理が可能です。たとえば64の値を指定すると、32と64の合計未処理リクエストが保持されます。 特定のFlow.Subscription
に対するSubscriberメソッドの呼び出しは厳密に順序付けられているため、Subscriberが複数のSubscription (その場合、複数のサブスクライバを定義し、それぞれに独自のサブスクリプションがある方がよい)を保持していない限り、これらのメソッドでロックまたは揮発性を使用する必要はありません。class SampleSubscriber<T> implements Subscriber<T> { final Consumer<? super T> consumer; Subscription subscription; final long bufferSize; long count; SampleSubscriber(long bufferSize, Consumer<? super T> consumer) { this.bufferSize = bufferSize; this.consumer = consumer; } public void onSubscribe(Subscription subscription) { long initialRequestSize = bufferSize; count = bufferSize - bufferSize / 2; // re-request when half consumed (this.subscription = subscription).request(initialRequestSize); } public void onNext(T item) { if (--count <= 0) subscription.request(count = bufferSize - bufferSize / 2); consumer.accept(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} }
defaultBufferSize()
のデフォルト値は、期待されるレート、リソース、および用途に基づいてFlowコンポーネント内のリクエスト・サイズと容量を選択する際に役立ちます。 あるいは、フロー制御が決して必要とされないとき、サブスクライバは、以下のように、事実上無制限の数のアイテムを最初にリクエストしてもよい:class UnboundedSubscriber<T> implements Subscriber<T> { public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); // effectively unbounded } public void onNext(T item) { use(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} void use(T item) { ... } }
- 導入されたバージョン:
- 9
-
-
ネストされたクラスのサマリー
ネストされたクラス 修飾子と型 クラス 説明 static interface
Flow.Processor<T,R>
サブスクライバとパブリッシャの両方として機能するコンポーネント。static interface
Flow.Publisher<T>
サブスクライブ者が受け取ったアイテム(および関連する制御メッセージ)のプロデューサ。static interface
Flow.Subscriber<T>
メッセージのレシーバ。static interface
Flow.Subscription
Flow.Publisher
とFlow.Subscriber
をリンクするメッセージ制御。
-