Publishers
がSubscription
によって管理される1つ以上のSubscribers
によって消費されるアイテムを生成するフロー制御コンポーネントを確立するための相互関係のあるインタフェースと静的メソッド。
これらのインタフェースは、reactive-streams仕様に対応しています。 これらは、同時および分散非同期設定の両方に適用されます: すべての(seven)メソッドは、 void
"one-way"メッセージ・スタイルで定義されています。 通信は、"push"ベースのシステムで発生するリソース管理の問題を回避するために使用できる単純なフロー制御(メソッドFlow.Subscription.request(long)
)に依存しています。
例 Flow.Publisher
は通常、独自のFlow.Subscription
実装を定義しています。メソッドsubscribe
に1を構築し、それを呼び出し元のFlow.Subscriber
に発行します。 通常、Executor
を使用してアイテムを非同期でサブスクライバにパブリッシュします。 たとえば、単一のサブスクライバに(リクエスト時)という単一の TRUE
アイテムのみを発行する非常に単純なパブリッシャがあります。 サブスクライバは単一のアイテムしか受け取らないため、このクラスはほとんどの実装で必要なバッファリングと順序制御を使用しません(たとえばSubmissionPublisher
)。
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (!completed) {
completed = true;
if (n <= 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}
Flow.Subscriber
はアイテムのリクエストと処理を手配します。 アイテム(Flow.Subscriber.onNext(T)
の呼び出し)はリクエストされない限り発行されませんが、複数のアイテムがリクエストされることがあります。 多くのサブスクライバの実装では、次の例のようにこの方法を整えることができます。バッファ・サイズが1ステップで、サイズが大きいほど、通常は通信のオーバーラップを抑えてより効率的なオーバーラップ処理が可能です。たとえば64の値を指定すると、32と64の合計未処理リクエストが保持されます。 特定のFlow.Subscription
に対するSubscriberメソッドの呼び出しは厳密に順序付けられているため、Subscriberが複数のSubscription (その場合、複数のサブスクライバを定義し、それぞれに独自のサブスクリプションがある方がよい)を保持していない限り、これらのメソッドでロックまたは揮発性を使用する必要はありません。
class SampleSubscriber<T> implements Subscriber<T> {
final Consumer<? super T> consumer;
Subscription subscription;
final long bufferSize;
long count;
SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
this.bufferSize = bufferSize;
this.consumer = consumer;
}
public void onSubscribe(Subscription subscription) {
long initialRequestSize = bufferSize;
count = bufferSize - bufferSize / 2; // re-request when half consumed
(this.subscription = subscription).request(initialRequestSize);
}
public void onNext(T item) {
if (--count <= 0)
subscription.request(count = bufferSize - bufferSize / 2);
consumer.accept(item);
}
public void onError(Throwable ex) { ex.printStackTrace(); }
public void onComplete() {}
}
defaultBufferSize()
のデフォルト値は、期待されるレート、リソース、および用途に基づいてFlowコンポーネント内のリクエスト・サイズと容量を選択する際に役立ちます。 あるいは、フロー制御が決して必要とされないとき、サブスクライバは、以下のように、事実上無制限の数のアイテムを最初にリクエストしてもよい:
class UnboundedSubscriber<T> implements Subscriber<T> {
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE); // effectively unbounded
}
public void onNext(T item) { use(item); }
public void onError(Throwable ex) { ex.printStackTrace(); }
public void onComplete() {}
void use(T item) { ... }
}
- 導入されたバージョン:
- 9
-
ネストされたクラスのサマリー
修飾子と型クラス説明static interface
Flow.Processor<T,R>
サブスクライバとパブリッシャの両方として機能するコンポーネント。static interface
サブスクライブ者が受け取ったアイテム(および関連する制御メッセージ)のプロデューサ。static interface
メッセージのレシーバ。static interface
Flow.Publisher
とFlow.Subscriber
をリンクするメッセージ制御。 -
メソッドのサマリー
修飾子と型メソッド説明static int
パブリッシャまたはサブスクライバのバッファリングのデフォルト値を返します。これは他の制約がない場合に使用されます。
-
メソッドの詳細
-
defaultBufferSize
public static int defaultBufferSize()パブリッシャまたはサブスクライバのバッファリングのデフォルト値を返します。これは他の制約がない場合に使用されます。- 実装上のノート:
- 返される現在の値は256です。
- 戻り値:
- バッファ・サイズの値
-