- 型パラメータ:
T
- 公開アイテム型
- すべての実装されたインタフェース:
AutoCloseable
,Flow.Publisher<T>
Flow.Publisher
。クローズするまで、送信済の(non-null)アイテムを現在のサブスクライバに非同期に発行します。 ドロップまたは例外が発生しないかぎり、現在の各サブスクライバは、新しく発行されたアイテムを同じ順序で受け取ります。 SubmissionPublisherを使用すると、アイテム・ジェネレータは、ドロップ処理やフロー制御のブロックに依存する準拠のreactive-streamsパブリッシャとして機能できます。
SubmissionPublisherは、サブスクライバへの配信にコンストラクタで提供されるExecutor
を使用します。 エグゼキュータの最適な選択は、予想される使用状況によって異なります。 送信された項目のthegenerator(s)が別々のスレッドで実行され、サブスクライバの数を推定できる場合は、Executors.newFixedThreadPool(int)
の使用を検討してください。 それ以外の場合は、デフォルト(通常はForkJoinPool.commonPool()
)の使用を検討してください。
バッファリングにより、プロデューサとコンシューマは様々なレートで一時的に動作できます。 各サブスクライバは、独立したバッファを使用します。 バッファは最初の使用時に作成され、必要に応じて指定された最大値まで拡張されます。 (強制容量は、2つの最も近い累乗に切り上げることも、この実装でサポートされている最大値に制限することもできます。) request
を呼び出すと、バッファ拡張は直接発生しませんが、満たされていないリクエストが最大容量を超えると、飽和リスクが発生します。 Flow.defaultBufferSize()
のデフォルト値は、予想されるレート、リソースおよび使用状況に基づいて容量を選択する際に役立つ開始点となります。
1つのSubmissionPublisherを複数のソース間で共有することができます。 アイテムを公開する前にソース・スレッド内のアクション、または各サブスクライバによる対応するアクセスに続いてシグナルhappen-beforeアクションを発行します。 しかし、報告された遅延と需要の見積もりは、同期制御ではなく、モニタリングに使用するように設計されており、進行状況の古いか不正確な見解を反映している可能性があります。
パブリケーション・メソッドでは、バッファが飽和した場合に何をするかに関する様々なポリシーがサポートされます。 メソッドsubmit
は、リソースが使用可能になるまでブロックします。 これは最も簡単ですが、応答性が最小です。 offer
メソッドは、アイテム(直ちにまたは制限されたタイムアウトで)を削除できますが、ハンドラを割り込み、再試行する機会を提供します。
サブスクライバ・メソッドが例外をスローすると、そのサブスクリプションは取り消されます。 コンストラクタ引数としてハンドラが指定されている場合は、メソッドonNext
の例外時に取消しの前に呼び出されますが、メソッドonSubscribe
、onError
およびonComplete
の例外は、取消し前に記録または処理されません。 指定されたエグゼキュータがタスクの実行時にRejectedExecutionException
(またはその他のRuntimeExceptionまたはError)をスローするか、ドロップ・ハンドラが削除されたアイテムの処理時に例外をスローすると、例外が再スローされます。 このような場合、すべてのサブスクライバが公開済アイテムを発行されるわけではありません。 このような場合は、通常、closeExceptionally
を使用することをお薦めします。
メソッドconsume(Consumer)
は、サブスクライバの唯一のアクションが、指定されたファンクションを使用してすべてのアイテムをリクエストおよび処理することである一般的なケースのサポートを簡略化します。
このクラスは、アイテムを生成するサブクラスの便利なベースとしても機能し、このクラスのメソッドを使用してアイテムを公開します。 たとえば、サプライヤから生成された品目を定期的に公開するクラスがあります。 (実際には、生成を個別に開始および停止したり、パブリッシャ間でエグゼキュータを共有したり、スーパークラスではなくコンポーネントとしてSubmissionPublisherを使用するメソッドを追加できます。)
class PeriodicPublisher<T> extends SubmissionPublisher<T> {
final ScheduledFuture<?> periodicTask;
final ScheduledExecutorService scheduler;
PeriodicPublisher(Executor executor, int maxBufferCapacity,
Supplier<? extends T> supplier,
long period, TimeUnit unit) {
super(executor, maxBufferCapacity);
scheduler = new ScheduledThreadPoolExecutor(1);
periodicTask = scheduler.scheduleAtFixedRate(
() -> submit(supplier.get()), 0, period, unit);
}
public void close() {
periodicTask.cancel(false);
scheduler.shutdown();
super.close();
}
}
次に、Flow.Processor
実装の例を示します。 簡単に説明できるように、パブリッシャへのシングル・ステップ・リクエストを使用します。 より適応性の高いバージョンでは、他のユーティリティ・メソッドとともに、submit
から返されたラグ見積りを使用してフローを監視できます。
class TransformProcessor<S,T> extends SubmissionPublisher<T>
implements Flow.Processor<S,T> {
final Function<? super S, ? extends T> function;
Flow.Subscription subscription;
TransformProcessor(Executor executor, int maxBufferCapacity,
Function<? super S, ? extends T> function) {
super(executor, maxBufferCapacity);
this.function = function;
}
public void onSubscribe(Flow.Subscription subscription) {
(this.subscription = subscription).request(1);
}
public void onNext(S item) {
subscription.request(1);
submit(function.apply(item));
}
public void onError(Throwable ex) { closeExceptionally(ex); }
public void onComplete() { close(); }
}
- 導入されたバージョン:
- 9
-
コンストラクタのサマリー
コンストラクタ説明サブスクライバ(少なくとも2つの並列度レベルをサポートしていない場合は、各タスクを実行するために新しいスレッドが作成されます)への非同期配信用にForkJoinPool.commonPool()
を使用し、Flow.defaultBufferSize()
の最大バッファ容量を使用し、メソッドonNext
のサブスクライバ例外のハンドラを使用せずに新しいSubmissionPublisherを作成します。SubmissionPublisher
(Executor executor, int maxBufferCapacity) サブスクライバへの非同期配信用に指定されたエグゼキュータを使用して新しいSubmissionPublisherを作成します。各サブスクライバには指定された最大バッファ・サイズを使用し、メソッドonNext
ではサブスクライバ例外のハンドラはありません。SubmissionPublisher
(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) サブスクライバへの非同期配信用に指定されたエグゼキュータを使用して、サブスクライバごとに指定された最大バッファ・サイズで新しいSubmissionPublisherを作成します。null以外の場合、任意のサブスクライバがメソッドonNext
で例外をスローしたときに、指定されたハンドラが呼び出されます。 -
メソッドのサマリー
修飾子と型メソッド説明void
close()
すでにクローズしていないかぎり、onComplete
シグナルを現在のサブスクライバに発行し、後続のパブリッシュの試行を禁止します。void
closeExceptionally
(Throwable error) すでにクローズしていないかぎり、指定されたエラーでonError
シグナルを現在のサブスクライバに発行し、後続のパブリッシュの試行を禁止します。指定されたコンシューマ機能を使用して、すべての公開済項目を処理します。int
現在のすべてのサブスクライバ間で生成され、まだ消費されていないアイテムの最大数を返します。long
すべての現在のサブスクライバの中で、リクエストされた(request
経由で)がまだ生成されていないアイテムの最小数の見積りを返します。closeExceptionally
に関連付けられた例外を返します。閉じていない場合、または正常に閉じられている場合はnullを返します。非同期配信に使用されるエグゼキュータを返します。int
サブスクライバ当たりの最大バッファ容量を返します。int
現在のサブスクライバの数を返します。List
<Flow.Subscriber<? super T>> サブスクライバでFlow.Subscriber
メソッドを呼び出すのではなく、モニタリングおよびトラッキングの目的で現在のサブスクライバのリストを返します。boolean
このパブリッシャにサブスクライバがある場合、trueを返します。boolean
isClosed()
この発行元が発行を受け入れていない場合、trueを返します。boolean
isSubscribed
(Flow.Subscriber<? super T> subscriber) 指定されたサブスクライバが現在サブスクライブされている場合、trueを返します。int
offer
(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) 可能な場合は、指定されたアイテムをonNext
メソッドを非同期に呼び出して現在の各サブスクライバに公開し、任意のサブスクリプションのリソースが使用できない間、指定されたタイムアウトまで、または呼出し側スレッドが中断されるまでブロックします。この時点で、指定されたハンドラ(null以外の場合)が呼び出され、trueが返された場合は1回再試行されます。int
offer
(T item, BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) 可能な場合は、onNext
メソッドを非同期に呼び出して、指定されたアイテムを現在の各サブスクライバに公開します。int
onNext
メソッドを非同期に呼び出して、指定されたアイテムを現在の各サブスクライバに公開します。これにより、任意のサブスクライバのリソースが使用できなくなる間、無停止でブロックされます。void
subscribe
(Flow.Subscriber<? super T> subscriber) すでにサブスクライブしていないかぎり、指定されたサブスクライバを追加します。
-
コンストラクタの詳細
-
SubmissionPublisher
public SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) サブスクライバへの非同期配信用に指定されたエグゼキュータを使用して、サブスクライバごとに指定された最大バッファ・サイズで新しいSubmissionPublisherを作成します。null以外の場合、任意のサブスクライバがメソッドonNext
で例外をスローしたときに、指定されたハンドラが呼び出されます。- パラメータ:
executor
- 非同期配信に使用するエグゼキュータで、1つ以上の独立したスレッドの作成をサポートmaxBufferCapacity
- 各サブスクライバのバッファ(強制された容量は、2の最も近い累乗に切り上げることができ、および/またはこの実装によってサポートされる最大値によって制限されます。メソッドgetMaxBufferCapacity()
は実際の値を返します)の最大容量handler
-null以外の場合、メソッドonNext
でスローされた例外時に起動するプロシージャ- 例外:
NullPointerException
- エグゼキュータがnullの場合IllegalArgumentException
- maxBufferCapacityが正でない場合
-
SubmissionPublisher
public SubmissionPublisher(Executor executor, int maxBufferCapacity) サブスクライバへの非同期配信用に指定されたエグゼキュータを使用して新しいSubmissionPublisherを作成します。各サブスクライバには指定された最大バッファ・サイズを使用し、メソッドonNext
ではサブスクライバ例外のハンドラはありません。- パラメータ:
executor
- 非同期配信に使用するエグゼキュータで、1つ以上の独立したスレッドの作成をサポートmaxBufferCapacity
- 各サブスクライバのバッファ(強制された容量は、2の最も近い累乗に切り上げることができ、および/またはこの実装によってサポートされる最大値によって制限されます。メソッドgetMaxBufferCapacity()
は実際の値を返します)の最大容量- 例外:
NullPointerException
- エグゼキュータがnullの場合IllegalArgumentException
- maxBufferCapacityが正でない場合
-
SubmissionPublisher
public SubmissionPublisher()サブスクライバ(少なくとも2つの並列度レベルをサポートしていない場合は、各タスクを実行するために新しいスレッドが作成されます)への非同期配信用にForkJoinPool.commonPool()
を使用し、Flow.defaultBufferSize()
の最大バッファ容量を使用し、メソッドonNext
のサブスクライバ例外のハンドラを使用せずに新しいSubmissionPublisherを作成します。
-
-
メソッドの詳細
-
subscribe
public void subscribe(Flow.Subscriber<? super T> subscriber) すでにサブスクライブしていないかぎり、指定されたサブスクライバを追加します。 すでにサブスクライブしている場合、サブスクライバのonError
メソッドは、IllegalStateException
を使用して既存のサブスクリプションで呼び出されます。 それ以外の場合、成功すると、サブスクライバのonSubscribe
メソッドが、新しいFlow.Subscription
と非同期に呼び出されます。onSubscribe
が例外をスローすると、サブスクリプションは取り消されます。 それ以外の場合、このSubmissionPublisherが例外的にクローズされると、サブスクライバのonError
メソッドが対応する例外で呼び出されるか、例外なしでクローズされると、サブスクライバのonComplete
メソッドが呼び出されます。 サブスクライバは、新しいサブスクリプションのrequest
メソッドを呼び出してアイテムの受信を有効にし、cancel
メソッドを呼び出してサブスクライブを解除できます。- 定義:
- インタフェース
Flow.Publisher<T>
内のsubscribe
- パラメータ:
subscriber
- サブスクライバ- 例外:
NullPointerException
- subscriberがnullの場合
-
submit
public int submit(T item) onNext
メソッドを非同期に呼び出して、指定されたアイテムを現在の各サブスクライバに公開します。これにより、任意のサブスクライバのリソースが使用できなくなる間、無停止でブロックされます。 このメソッドは、現在のすべてのサブスクライバの中で最大ラグ(送信されたがまだ消費されていないアイテムの数)の見積りを返します。 この値は、サブスクライバがある場合は少なくとも1つの(この送信されたアイテムの会計処理)、それ以外の場合はゼロです。このパブリッシャのエグゼキュータがサブスクライバに非同期で通知しようとしたときにRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローした場合、この例外は再スローされます。この場合、すべてのサブスクライバがこのアイテムを発行されるわけではありません。
- パラメータ:
item
- パブリッシュする(non-null)アイテム- 戻り値:
- サブスクライバ間の推定最大遅延
- 例外:
IllegalStateException
- 閉じている場合NullPointerException
- itemがnullの場合RejectedExecutionException
- 実行者によってスローされた場合
-
offer
public int offer(T item, BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) 可能な場合は、onNext
メソッドを非同期に呼び出して、指定されたアイテムを現在の各サブスクライバに公開します。 リソース制限を超えた場合、アイテムは1つ以上のサブスクライバによって削除される可能性があります。この場合、指定されたハンドラ(null以外の場合)が呼び出され、trueが返された場合は、1回再試行されます。 他のスレッドによるこのクラスのメソッドへのその他のコールは、ハンドラの起動中にブロックされます。 リカバリが保証されないかぎり、オプションは通常、エラーのロギングまたはサブスクライバへのonError
シグナルの発行(あるいはその両方)に制限されます。このメソッドは、ステータス・インジケータ: 負の場合、(サブスクライバにアイテムを発行しようとして失敗した)の削除の(negative)数を表します。 それ以外の場合は、現在のすべてのサブスクライバ間の最大ラグ(送信されたがまだ消費されていないアイテムの数)の見積りです。 この値は、サブスクライバがある場合は少なくとも1つの(この送信されたアイテムの会計処理)、それ以外の場合はゼロです。
このパブリッシャのエグゼキュータがサブスクライバに非同期的に通知しようとしたときにRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローした場合、またはドロップ・ハンドラが削除されたアイテムの処理時に例外をスローした場合、この例外は再スローされます。
- パラメータ:
item
- パブリッシュする(non-null)アイテムonDrop
-null以外の場合、サブスクライバおよびアイテムの引数を使用して、サブスクライバへのドロップ時にハンドラが呼び出されます。trueを返す場合、オファーは(once)に再試行されます- 戻り値:
- 負の場合、(negative)のドロップ数。それ以外の場合、最大ラグの見積り
- 例外:
IllegalStateException
- 閉じている場合NullPointerException
- itemがnullの場合RejectedExecutionException
- 実行者によってスローされた場合
-
offer
public int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) 可能な場合は、指定されたアイテムをonNext
メソッドを非同期に呼び出して現在の各サブスクライバに公開し、任意のサブスクリプションのリソースが使用できない間、指定されたタイムアウトまで、または呼出し側スレッドが中断されるまでブロックします。この時点で、指定されたハンドラ(null以外の場合)が呼び出され、trueが返された場合は1回再試行されます。 (ドロップ・ハンドラは、現在のスレッドが中断されているかどうかをチェックすることによって、タイムアウトと割り込みを区別することがあります。) 他のスレッドによるこのクラスのメソッドへのその他のコールは、ハンドラの起動中にブロックされます。 リカバリが保証されないかぎり、オプションは通常、エラーのロギングまたはサブスクライバへのonError
シグナルの発行(あるいはその両方)に制限されます。このメソッドは、ステータス・インジケータ: 負の場合、(サブスクライバにアイテムを発行しようとして失敗した)の削除の(negative)数を表します。 それ以外の場合は、現在のすべてのサブスクライバ間の最大ラグ(送信されたがまだ消費されていないアイテムの数)の見積りです。 この値は、サブスクライバがある場合は少なくとも1つの(この送信されたアイテムの会計処理)、それ以外の場合はゼロです。
このパブリッシャのエグゼキュータがサブスクライバに非同期的に通知しようとしたときにRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローした場合、またはドロップ・ハンドラが削除されたアイテムの処理時に例外をスローした場合、この例外は再スローされます。
- パラメータ:
item
- パブリッシュする(non-null)アイテムtimeout
- 諦める前に任意のサブスクライバのリソースを待機する時間(unit
単位)unit
-timeout
パラメータの解釈方法を決定するTimeUnit
onDrop
-null以外の場合、サブスクライバおよびアイテムの引数を使用して、サブスクライバへのドロップ時にハンドラが呼び出されます。trueを返す場合、オファーは(once)に再試行されます- 戻り値:
- 負の場合、(negative)のドロップ数。それ以外の場合、最大ラグの見積り
- 例外:
IllegalStateException
- 閉じている場合NullPointerException
- itemがnullの場合RejectedExecutionException
- 実行者によってスローされた場合
-
close
public void close()すでにクローズしていないかぎり、onComplete
シグナルを現在のサブスクライバに発行し、後続のパブリッシュの試行を禁止します。 返却時に、このメソッドは、すべてのサブスクライバがまだ完了していることを保証しません。- 定義:
close
、インタフェースAutoCloseable
-
closeExceptionally
public void closeExceptionally(Throwable error) すでにクローズしていないかぎり、指定されたエラーでonError
シグナルを現在のサブスクライバに発行し、後続のパブリッシュの試行を禁止します。 将来のサブスクライバも、指定されたエラーを受け取ります。 返却時に、このメソッドは、すべてのサブスクライバがまだ完了していることを保証しません。- パラメータ:
error
-onError
引数がサブスクライバに送信された- 例外:
NullPointerException
- エラーがnullの場合
-
isClosed
public boolean isClosed()この発行元が発行を受け入れていない場合、trueを返します。- 戻り値:
- 閉じている場合はtrue
-
getClosedException
public Throwable getClosedException()closeExceptionally
に関連付けられた例外を返します。閉じていない場合、または正常に閉じられている場合はnullを返します。- 戻り値:
- 例外、またはない場合はnull
-
hasSubscribers
public boolean hasSubscribers()このパブリッシャにサブスクライバがある場合、trueを返します。- 戻り値:
- このパブリッシャにサブスクライバがある場合はtrue
-
getNumberOfSubscribers
public int getNumberOfSubscribers()現在のサブスクライバの数を返します。- 戻り値:
- 現在のサブスクライバ数
-
getExecutor
-
getMaxBufferCapacity
public int getMaxBufferCapacity()サブスクライバ当たりの最大バッファ容量を返します。- 戻り値:
- サブスクライバごとの最大バッファ容量
-
getSubscribers
public List<Flow.Subscriber<? super T>> getSubscribers()サブスクライバでFlow.Subscriber
メソッドを呼び出すのではなく、モニタリングおよびトラッキングの目的で現在のサブスクライバのリストを返します。- 戻り値:
- 現在のサブスクライバのリスト
-
isSubscribed
public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) 指定されたサブスクライバが現在サブスクライブされている場合、trueを返します。- パラメータ:
subscriber
- サブスクライバ- 戻り値:
- 現在サブスクライブされている場合はtrue
- 例外:
NullPointerException
- subscriberがnullの場合
-
estimateMinimumDemand
public long estimateMinimumDemand()すべての現在のサブスクライバの中で、リクエストされた(request
経由で)がまだ生成されていないアイテムの最小数の見積りを返します。- 戻り値:
- 見積り、またはサブスクライバがない場合はゼロ
-
estimateMaximumLag
public int estimateMaximumLag()現在のすべてのサブスクライバ間で生成され、まだ消費されていないアイテムの最大数を返します。- 戻り値:
- 見積もり
-
consume
public CompletableFuture<Void> consume(Consumer<? super T> consumer) 指定されたコンシューマ機能を使用して、すべての公開済項目を処理します。 このパブリッシャがonComplete
にシグナルを送ったり、エラーが発生すると例外的に完了したり、コンシューマによって例外がスローされたり、戻されたCompletableFutureが取り消されたりすると、それ以上の項目は処理されない場合に、正常に完了するCompletableFutureを返します。- パラメータ:
consumer
- 各onNextアイテムに適用される関数- 戻り値:
- パブリッシャがonCompleteにシグナルを送るときに正常に完了するCompletableFuture。エラーまたは取消し時に例外的に完了するCompletableFuture
- 例外:
NullPointerException
- 消費者がnullの場合
-