Publishers
がSubscription
によって管理される1つ以上のSubscribers
によって消費されるアイテムを生成するフロー制御コンポーネントを確立するための相互関係のあるインタフェースと静的メソッド。
これらのインタフェースは、reactive-streams仕様に対応しています。 これらは、同時および分散非同期設定の両方に適用されます: すべての(seven)メソッドは、 void
"one-way"メッセージ・スタイルで定義されます。 通信は、"push"ベースのシステムで発生する可能性のあるリソース管理の問題を回避するために使用できる、単純な形式のフロー制御(メソッドFlow.Subscription.request(long)
)に依存します。
例 Flow.Publisher
は通常、独自のFlow.Subscription
実装を定義しています。メソッドsubscribe
に1を構築し、それを呼び出し元のFlow.Subscriber
に発行します。 通常はExecutor
を使用して、アイテムをサブスクライバに非同期にパブリッシュします。 たとえば、単一のサブスクライバに単一の TRUE
アイテムのみ(リクエスト時)を発行する非常に単純なパブリッシャを次に示します。 サブスクライバは1つの項目のみを受け取るため、このクラスはほとんどの実装(たとえばSubmissionPublisher
)で必要なバッファリングおよび順序付け制御を使用しません。
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (!completed) {
completed = true;
if (n <= 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}
Flow.Subscriber
は、アイテムがリクエストおよび処理されるように配置します。 項目(Flow.Subscriber.onNext(T)
の呼び出し)は、リクエストされないかぎり発行されませんが、複数の項目をリクエストできます。 多くのサブスクライバ実装では、次の例のスタイルでこれを配置できます。バッファ・サイズが1つの単一ステップであり、サイズが大きいと、通常、より少ない通信で、より効率的な重複処理が可能になります。たとえば、値が64の場合、未処理のリクエストの合計は32から64の間になります。 特定のFlow.Subscription
に対するサブスクライバ・メソッド呼出しは厳密に順序付けられているため、サブスクライバが複数のサブスクリプション(その場合は、かわりに、それぞれ独自のサブスクリプションを持つ複数のサブスクライバを定義することをお薦めします)を保持しないかぎり、これらのメソッドでロックまたはボラ・タイルを使用する必要はありません。
class SampleSubscriber<T> implements Subscriber<T> {
final Consumer<? super T> consumer;
Subscription subscription;
final long bufferSize;
long count;
SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
this.bufferSize = bufferSize;
this.consumer = consumer;
}
public void onSubscribe(Subscription subscription) {
long initialRequestSize = bufferSize;
count = bufferSize - bufferSize / 2; // re-request when half consumed
(this.subscription = subscription).request(initialRequestSize);
}
public void onNext(T item) {
if (--count <= 0)
subscription.request(count = bufferSize - bufferSize / 2);
consumer.accept(item);
}
public void onError(Throwable ex) { ex.printStackTrace(); }
public void onComplete() {}
}
defaultBufferSize()
のデフォルト値は、予想されるレート、リソースおよび使用状況に基づいて、フロー・コンポーネントでリクエスト・サイズと容量を選択する際に役立つ開始点となる場合があります。 または、フロー制御が不要な場合、サブスクライバは、次のように、効果的に無制限の数のアイテムを最初にリクエストできます:
class UnboundedSubscriber<T> implements Subscriber<T> {
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE); // effectively unbounded
}
public void onNext(T item) { use(item); }
public void onError(Throwable ex) { ex.printStackTrace(); }
public void onComplete() {}
void use(T item) { ... }
}
- 導入されたバージョン:
- 9
-
ネストされたクラスのサマリー
修飾子と型クラス説明static interface
サブスクライバとパブリッシャの両方として機能するコンポーネント。static interface
サブスクライバが受信したアイテム(および関連する制御メッセージ)のプロデューサ。static interface
メッセージのレシーバ。static interface
Flow.Publisher
とFlow.Subscriber
をリンクするメッセージ制御。 -
メソッドのサマリー
-
メソッドの詳細
-
defaultBufferSize
public static int defaultBufferSize()パブリッシャまたはサブスクライバ・バッファリングのデフォルト値を返します。他の制約がない場合に使用できます。- 実装上のノート:
- 返される現在の値は256です。
- 戻り値:
- バッファ・サイズの値
-