- java.lang.Object
-
- java.util.concurrent.SubmissionPublisher<T>
-
- 型パラメータ:
T
- 公開アイテム型
- すべての実装されたインタフェース:
AutoCloseable
,Flow.Publisher<T>
public class SubmissionPublisher<T> extends Object implements Flow.Publisher<T>, AutoCloseable
(non-null)アイテムを非同期で発行するFlow.Publisher
は、現在のサブスクライバが閉じられるまでそれを発行します。 現在の各サブスクライバは、ドロップまたは例外が発生しない限り、新しく送信されたアイテムを同じ順序で受信します。 SubmissionPublisherを使用すると、アイテム・ジェネレータは、フロー制御のドロップ処理やブロッキングに依存する準拠のreactive-streamsパブリッシャとして動作することができます。SubmissionPublisherは、コンストラクタで提供された
Executor
を使用して、サブスクライバに配信します。 エグゼキュータの最良の選択は、予想される使用法に依存します。 送信されたアイテムのジェネレータが別のスレッドで実行され、サブスクライバの数を見積もることができる場合は、Executors.newFixedThreadPool(int)
の使用を検討してください。 それ以外の場合は、デフォルトのForkJoinPool.commonPool()
を使用することを検討してください。バッファリングにより、プロデューサとコンシューマは異なる速度で一時的に動作することができます。 各サブスクライバは独立したバッファを使用します。 バッファは最初の使用時に作成され、必要に応じて指定された最大値まで展開されます。 (強制的な容量は、2の最も近い累乗に切り上げることができ、および/またはこの実装によってサポートされる最大値によって制限されることがあります。)
request
の呼び出しでは、バッファ拡張は直接行われませんが、満たされていないリクエストが最大容量を超えると、リスク飽和が発生します。Flow.defaultBufferSize()
のデフォルト値は、予想される料金、リソース、および用途に基づいて容量を選択するための出発点となります。1つのSubmissionPublisherを複数のソース間で共有することができます。 アイテムを公開する前にソース・スレッド内のアクション、または各サブスクライバによる対応するアクセスに続いてシグナルhappen-beforeアクションを発行します。 しかし、報告された遅延と需要の見積もりは、同期制御ではなく、モニタリングに使用するように設計されており、進行状況の古いか不正確な見解を反映している可能性があります。
パブリケーション・メソッドは、バッファが飽和したときの対処方法に関するさまざまなポリシーをサポートします。 メソッド
submit
は、リソースが利用可能になるまでブロックします。 これは最もシンプルですが、レスポンスは最も低くなります。offer
メソッドは、アイテム(直ちにまたは制限されたタイムアウトで)を削除しますが、ハンドラを挿入して再試行する機会を提供します。サブスクライバ・メソッドが例外をスローすると、そのサブスクリプションは取消されます。 ハンドラがコンストラクタ引数として指定された場合は、メソッド
onNext
の例外時に取消前に呼び出されますが、メソッドonSubscribe
、onError
、およびonComplete
の例外は取消前に記録または処理されません。 タスクを実行しようとするときに指定されたExecutorがRejectedExecutionException
(またはその他のRuntimeExceptionまたはError)をスローした場合、またはドロップ・ハンドラがドロップされたアイテムを処理するときに例外をスローすると、例外が再スローされます。 このような場合、すべてのサブスクライバが発行されたアイテムを発行されるわけではありません。 これらの場合、通常はcloseExceptionally
を実行することをお勧めします。メソッド
consume(Consumer)
は、サブスクライバの唯一のアクションが、指定された関数を使用してすべてのアイテムをリクエストし処理するという共通のケースのサポートを簡素化します。このクラスは、アイテムを生成するサブクラスの便利な基盤としても機能し、このクラスのメソッドを使用してそれらを公開します。 たとえば、サプライヤから生成されたアイテムを定期的に公開するクラスがあります。 (実際には、生成を独立して開始および停止するメソッド、パブリッシャ間でExecutorを共有するメソッドなどを追加したり、SubmissionPublisherをスーパークラスではなくコンポーネントとして使用したりできます。)
class PeriodicPublisher<T> extends SubmissionPublisher<T> { final ScheduledFuture<?> periodicTask; final ScheduledExecutorService scheduler; PeriodicPublisher(Executor executor, int maxBufferCapacity, Supplier<? extends T> supplier, long period, TimeUnit unit) { super(executor, maxBufferCapacity); scheduler = new ScheduledThreadPoolExecutor(1); periodicTask = scheduler.scheduleAtFixedRate( () -> submit(supplier.get()), 0, period, unit); } public void close() { periodicTask.cancel(false); scheduler.shutdown(); super.close(); } }
次に、
Flow.Processor
実装の例を示します。 これは、説明を簡単にするため、パブリッシャにシングル・ステップ・リクエストを使用しています。 適応性の高いバージョンでは、submit
から返されたラグ推定値を使用して、他のユーティリティ・メソッドとともにフローを監視できます。class TransformProcessor<S,T> extends SubmissionPublisher<T> implements Flow.Processor<S,T> { final Function<? super S, ? extends T> function; Flow.Subscription subscription; TransformProcessor(Executor executor, int maxBufferCapacity, Function<? super S, ? extends T> function) { super(executor, maxBufferCapacity); this.function = function; } public void onSubscribe(Flow.Subscription subscription) { (this.subscription = subscription).request(1); } public void onNext(S item) { subscription.request(1); submit(function.apply(item)); } public void onError(Throwable ex) { closeExceptionally(ex); } public void onComplete() { close(); } }
- 導入されたバージョン:
- 9
-
-
コンストラクタのサマリー
コンストラクタ コンストラクタ 説明 SubmissionPublisher()
サブスクライバ(少なくとも2つの並列処理レベルをサポートしていない場合を除き、この場合、各タスクを実行するために新しいスレッドが作成されます)への非同期配信用にForkJoinPool.commonPool()
を使用し、Flow.defaultBufferSize()
の最大バッファ容量を使用し、メソッドonNext
のサブスクライバ例外のハンドラを使用せずに新しいSubmissionPublisherを作成します。SubmissionPublisher(Executor executor, int maxBufferCapacity)
指定されたExecutorを使用してサブスクライバに非同期に配信するための新しいSubmissionPublisherを作成します。各サブスクライバの最大バッファ・サイズは指定されており、メソッドonNext
のサブスクライバ例外ハンドラはありません。SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
各サブスクライバに対して指定された最大バッファ・サイズでサブスクライバへの非同期配信用に、指定されたExecutorを使用して新しいSubmissionPublisherを作成し、非nullの場合、サブスクライバがonNext
メソッドで例外をスローしたときに呼び出されます。
-
メソッドのサマリー
修飾子と型 メソッド 説明 void
close()
すでにクローズされていない限り、onComplete
は現在のサブスクライバに信号を送り、後続のパブリッシュを許可しません。void
closeExceptionally(Throwable error)
すでにクローズされていない限り、onError
は指定されたエラーを持つ現在のサブスクライバに通知し、その後の発行を許可しません。CompletableFuture<Void>
consume(Consumer<? super T> consumer)
指定したコンシューマ関数を使用して、公開されたすべてのアイテムを処理します。int
estimateMaximumLag()
現在のすべてのサブスクライバ間で生成されたがまだ消費されていないアイテムの最大数の見積もりを返します。long
estimateMinimumDemand()
すべての現在のサブスクライバの中でリクエスト(request
経由で)されたがまだ生成されていないアイテムの最小数の見積もりを返します。Throwable
getClosedException()
closeExceptionally
に関連付けられた例外を返します。閉じていない場合、または正常に終了した場合はnullを返します。Executor
getExecutor()
非同期配信に使用されるExecutorを返します。int
getMaxBufferCapacity()
サブスクライバごとの最大バッファ容量を返します。int
getNumberOfSubscribers()
現在のサブスクライバの数を返します。List<Flow.Subscriber<? super T>>
getSubscribers()
サブスクライバでFlow.Subscriber
メソッドを呼び出すためではなく、モニタリングとトラッキングのために現在のサブスクライバのリストを返します。boolean
hasSubscribers()
このパブリッシャにサブスクライバがある場合はtrueを返します。boolean
isClosed()
このサイト運営者が投稿を受け入れていない場合はtrueを返します。boolean
isSubscribed(Flow.Subscriber<? super T> subscriber)
指定されたサブスクライバが現在サブスクライブされている場合はtrueを返します。int
offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
指定されたアイテムを、可能であれば現在の各サブスクライバに公開します。これは、onNext
メソッドを非同期的に呼び出し、任意のサブスクリプションのリソースを使用できなくしてもブロックします。指定されたタイムアウトまで、またはコールアウト・スレッドが中断するまで、指定されたハンドラ(非nullの場合)が呼び出され、trueが返された場合は1回再試行します。int
offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
onNext
メソッドを非同期に呼び出すことによって、可能であれば、指定されたアイテムを各現在のサブスクライバにパブリッシュします。int
submit(T item)
onNext
メソッドを非同期に呼び出すことによって、現在の各サブスクライバに指定されたアイテムをパブリッシュし、任意のサブスクライバのリソースを使用できない間中断しないようにブロックします。void
subscribe(Flow.Subscriber<? super T> subscriber)
指定されたサブスクライバを、すでにサブスクライブしていない限り追加します。
-
-
-
コンストラクタの詳細
-
SubmissionPublisher
public SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
各サブスクライバに対して指定された最大バッファ・サイズでサブスクライバへの非同期配信用に、指定されたExecutorを使用して新しいSubmissionPublisherを作成し、非nullの場合、サブスクライバがonNext
メソッドで例外をスローしたときに呼び出されます。- パラメータ:
executor
- エグゼキュータが非同期配信に使用し、少なくとも1つの独立したスレッドの作成をサポートmaxBufferCapacity
- 各サブスクライバ・バッファの最大容量(強制された容量は、2の最も近い累乗に切り上げることができ、および/またはこの実装によってサポートされる最大値によって制限されます。メソッドgetMaxBufferCapacity()
は実際の値を返します)handler
-nullでない場合は、メソッドonNext
でスローされた例外の発生時に呼び出されるプロシージャ- 例外:
NullPointerException
- executorがnullの場合IllegalArgumentException
- maxBufferCapacityが正でない場合
-
SubmissionPublisher
public SubmissionPublisher(Executor executor, int maxBufferCapacity)
指定されたExecutorを使用してサブスクライバに非同期に配信するための新しいSubmissionPublisherを作成します。各サブスクライバの最大バッファ・サイズは指定されており、メソッドonNext
のサブスクライバ例外ハンドラはありません。- パラメータ:
executor
- エグゼキュータが非同期配信に使用し、少なくとも1つの独立したスレッドの作成をサポートmaxBufferCapacity
- 各サブスクライバ・バッファの最大容量(強制された容量は、2の最も近い累乗に切り上げることができ、および/またはこの実装によってサポートされる最大値によって制限されます。メソッドgetMaxBufferCapacity()
は実際の値を返します)- 例外:
NullPointerException
- executorがnullの場合IllegalArgumentException
- maxBufferCapacityが正でない場合
-
SubmissionPublisher
public SubmissionPublisher()
サブスクライバ(少なくとも2つの並列処理レベルをサポートしていない場合を除き、この場合、各タスクを実行するために新しいスレッドが作成されます)への非同期配信用にForkJoinPool.commonPool()
を使用し、Flow.defaultBufferSize()
の最大バッファ容量を使用し、メソッドonNext
のサブスクライバ例外のハンドラを使用せずに新しいSubmissionPublisherを作成します。
-
-
メソッドの詳細
-
subscribe
public void subscribe(Flow.Subscriber<? super T> subscriber)
指定されたサブスクライバを、すでにサブスクライブしていない限り追加します。 すでにサブスクライブされている場合、IllegalStateException
を使用して既存のサブスクリプションでサブスクライバonError
メソッドが呼び出されます。 それ以外の場合は、成功すると、SubscriberonSubscribe
メソッドが新しいFlow.Subscription
と非同期に呼び出されます。onSubscribe
が例外をスローすると、サブスクリプションは取り消されます。 それ以外の場合、このSubmissionPublisherが例外的に閉じられた場合、サブスクライバのonError
メソッドが対応する例外とともに呼び出されるか、または例外なく閉じられた場合、サブスクライバのonComplete
メソッドが呼び出されます。 サブスクライバは、新しいサブスクリプションのrequest
メソッドを呼び出してアイテムを受信できるようにし、cancel
メソッドを呼び出すことでサブスクリプションを解除することができます。- 定義:
- インタフェース
Flow.Publisher<T>
内のsubscribe
- パラメータ:
subscriber
- サブスクライバ- 例外:
NullPointerException
- subscriberがnullの場合
-
submit
public int submit(T item)
onNext
メソッドを非同期に呼び出すことによって、現在の各サブスクライバに指定されたアイテムをパブリッシュし、任意のサブスクライバのリソースを使用できない間中断しないようにブロックします。 このメソッドは、現在のすべてのサブスクライバ間の最大遅延(送信されたがまだ消費されていないアイテムの数)の推定値を返します。 この値は、サブスクライバが存在する場合は少なくとも1つの(この送信されたアイテムの会計処理)です。それ以外の場合はゼロです。このパブリッシャのExecutorが、非同期的にサブスクライバに通知しようとするときにRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローすると、この例外が再スローされます。この場合、すべてのサブスクライバがこのアイテムを発行されるわけではありません。
- パラメータ:
item
- パブリッシュする(非null)アイテム- 戻り値:
- サブスクライバ間の推定最大遅延
- 例外:
IllegalStateException
- 閉じている場合NullPointerException
- itemがnullの場合RejectedExecutionException
- 実行者によってスローされた場合
-
offer
public int offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
onNext
メソッドを非同期に呼び出すことによって、可能であれば、指定されたアイテムを各現在のサブスクライバにパブリッシュします。 リソースの制限を超えた場合、指定されたハンドラ(非nullの場合)が呼び出され、trueを返す場合は、1回再試行されると、そのアイテムは1人以上のユーザーによって削除される可能性があります。 このクラスのメソッドへの他のスレッドによる他の呼び出しは、ハンドラが呼び出されている間はブロックされます。 リカバリが保証されていない限り、オプションは通常、エラーを記録したり、サブスクライバにonError
信号を発行することに限定されます。このメソッドは、ステータス・インジケータ: 負の場合は、(サブスクライバにアイテムを発行しようとして失敗した)の(negative)数を表します。 それ以外の場合は、現在のすべてのサブスクライバの中での最大遅延(送信されたがまだ消費されていないアイテムの数)の見積もりです。 この値は、サブスクライバが存在する場合は少なくとも1つの(この送信されたアイテムの会計処理)です。それ以外の場合はゼロです。
サブスクライバに非同期的に通知しようとすると、このパブリッシャのExecutorがRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローするか、ドロップされたアイテムを処理するときにドロップ・ハンドラが例外をスローすると、この例外が再送出されます。
- パラメータ:
item
- パブリッシュする(非null)アイテムonDrop
- 非nullの場合は、サブスクライバとアイテムの引数を持つサブスクライバへのドロップ時にハンドラが呼び出されます。それがtrueを返す場合、オファーは再試行されます。(once)- 戻り値:
- 負の値の場合は、(negative)のドロップ数。そうでなければ、最大遅延の推定値
- 例外:
IllegalStateException
- 閉じている場合NullPointerException
- itemがnullの場合RejectedExecutionException
- 実行者によってスローされた場合
-
offer
public int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
指定されたアイテムを、可能であれば現在の各サブスクライバに公開します。これは、onNext
メソッドを非同期的に呼び出し、任意のサブスクリプションのリソースを使用できなくしてもブロックします。指定されたタイムアウトまで、またはコールアウト・スレッドが中断するまで、指定されたハンドラ(非nullの場合)が呼び出され、trueが返された場合は1回再試行します。 (ドロップ・ハンドラは、現在のスレッドが中断されているかどうかをチェックすることによって、タイムアウトと割り込みとを区別することができます。) このクラスのメソッドへの他のスレッドによる他の呼び出しは、ハンドラが呼び出されている間はブロックされます。 リカバリが保証されていない限り、オプションは通常、エラーを記録したり、サブスクライバにonError
信号を発行することに限定されます。このメソッドは、ステータス・インジケータ: 負の場合は、(サブスクライバにアイテムを発行しようとして失敗した)の(negative)数を表します。 それ以外の場合は、現在のすべてのサブスクライバの中での最大遅延(送信されたがまだ消費されていないアイテムの数)の見積もりです。 この値は、サブスクライバが存在する場合は少なくとも1つの(この送信されたアイテムの会計処理)です。それ以外の場合はゼロです。
サブスクライバに非同期的に通知しようとすると、このパブリッシャのExecutorがRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローするか、ドロップされたアイテムを処理するときにドロップ・ハンドラが例外をスローすると、この例外が再送出されます。
- パラメータ:
item
- パブリッシュする(非null)アイテムtimeout
- あきらめる前に任意のサブスクライバのリソースをどれくらい待つか、unit
単位unit
-timeout
パラメータの解釈方法を決定するTimeUnit
onDrop
- 非nullの場合は、サブスクライバとアイテムの引数を持つサブスクライバへのドロップ時にハンドラが呼び出されます。それがtrueを返す場合、オファーは再試行されます。(once)- 戻り値:
- 負の値の場合は、(negative)のドロップ数。そうでなければ、最大遅延の推定値
- 例外:
IllegalStateException
- 閉じている場合NullPointerException
- itemがnullの場合RejectedExecutionException
- 実行者によってスローされた場合
-
close
public void close()
すでにクローズされていない限り、onComplete
は現在のサブスクライバに信号を送り、後続のパブリッシュを許可しません。 返却時に、このメソッドは、すべてのサブスクライバがまだ完了していることを保証するものではありません。- 定義:
close
、インタフェース:AutoCloseable
-
closeExceptionally
public void closeExceptionally(Throwable error)
すでにクローズされていない限り、onError
は指定されたエラーを持つ現在のサブスクライバに通知し、その後の発行を許可しません。 将来のサブスクライバにも所定のエラーが発生します。 返却時に、このメソッドは、すべてのサブスクライバがまだ完了していることを保証するものではありません。- パラメータ:
error
-onError
引数がサブスクライバに送信された- 例外:
NullPointerException
- エラーがnullの場合
-
isClosed
public boolean isClosed()
このサイト運営者が投稿を受け入れていない場合はtrueを返します。- 戻り値:
- 閉じている場合はtrue
-
getClosedException
public Throwable getClosedException()
closeExceptionally
に関連付けられた例外を返します。閉じていない場合、または正常に終了した場合はnullを返します。- 戻り値:
- 例外。ない場合はnull
-
hasSubscribers
public boolean hasSubscribers()
このパブリッシャにサブスクライバがある場合はtrueを返します。- 戻り値:
- このパブリッシャにサブスクライバがある場合はtrue
-
getNumberOfSubscribers
public int getNumberOfSubscribers()
現在のサブスクライバの数を返します。- 戻り値:
- 現在のサブスクライバ数
-
getExecutor
public Executor getExecutor()
非同期配信に使用されるExecutorを返します。- 戻り値:
- 非同期配信に使用されるExecutor
-
getMaxBufferCapacity
public int getMaxBufferCapacity()
サブスクライバごとの最大バッファ容量を返します。- 戻り値:
- サブスクライバごとの最大バッファ容量
-
getSubscribers
public List<Flow.Subscriber<? super T>> getSubscribers()
サブスクライバでFlow.Subscriber
メソッドを呼び出すためではなく、モニタリングとトラッキングのために現在のサブスクライバのリストを返します。- 戻り値:
- 現在のサブスクライバのリスト
-
isSubscribed
public boolean isSubscribed(Flow.Subscriber<? super T> subscriber)
指定されたサブスクライバが現在サブスクライブされている場合はtrueを返します。- パラメータ:
subscriber
- サブスクライバ- 戻り値:
- 現在サブスクライブされている場合はtrue
- 例外:
NullPointerException
- subscriberがnullの場合
-
estimateMinimumDemand
public long estimateMinimumDemand()
すべての現在のサブスクライバの中でリクエスト(request
経由で)されたがまだ生成されていないアイテムの最小数の見積もりを返します。- 戻り値:
- 推定値、またはサブスクライバがない場合はゼロ
-
estimateMaximumLag
public int estimateMaximumLag()
現在のすべてのサブスクライバ間で生成されたがまだ消費されていないアイテムの最大数の見積もりを返します。- 戻り値:
- 見積もり
-
consume
public CompletableFuture<Void> consume(Consumer<? super T> consumer)
指定したコンシューマ関数を使用して、公開されたすべてのアイテムを処理します。 このパブリッシャがonComplete
を通知したとき、またはエラーが発生したときに例外的に完了したとき、またはコンシューマによって例外がスローされたとき、または返されたCompletableFutureが取消されたときに正常に完了するCompletableFutureを返します。- パラメータ:
consumer
- 各onNextアイテムに適用される関数- 戻り値:
- パブリッシャがonCompleteにシグナルを送るときに正常に完了するCompletableFuture、例外的にエラーまたは取消が発生したとき
- 例外:
NullPointerException
- 消費者がnullの場合
-
-