モジュール java.base
パッケージ java.util.concurrent

クラスSubmissionPublisher<T>

java.lang.Object
java.util.concurrent.SubmissionPublisher<T>
型パラメータ:
T - 公開アイテム型
すべての実装されたインタフェース:
AutoCloseable, Flow.Publisher<T>

public class SubmissionPublisher<T> extends Object implements Flow.Publisher<T>, AutoCloseable
Flow.Publisher。クローズするまで、送信済の(non-null)アイテムを現在のサブスクライバに非同期に発行します。 ドロップまたは例外が発生しないかぎり、現在の各サブスクライバは、新しく発行されたアイテムを同じ順序で受け取ります。 SubmissionPublisherを使用すると、アイテム・ジェネレータは、ドロップ処理やフロー制御のブロックに依存する準拠のreactive-streamsパブリッシャとして機能できます。

SubmissionPublisherは、サブスクライバへの配信にコンストラクタで提供されるExecutorを使用します。 エグゼキュータの最適な選択は、予想される使用状況によって異なります。 送信された項目のthegenerator(s)が別々のスレッドで実行され、サブスクライバの数を推定できる場合は、Executors.newFixedThreadPool(int)の使用を検討してください。 それ以外の場合は、デフォルト(通常はForkJoinPool.commonPool())の使用を検討してください。

バッファリングにより、プロデューサとコンシューマは様々なレートで一時的に動作できます。 各サブスクライバは、独立したバッファを使用します。 バッファは最初の使用時に作成され、必要に応じて指定された最大値まで拡張されます。 (強制容量は、2つの最も近い累乗に切り上げることも、この実装でサポートされている最大値に制限することもできます。) requestを呼び出すと、バッファ拡張は直接発生しませんが、満たされていないリクエストが最大容量を超えると、飽和リスクが発生します。 Flow.defaultBufferSize()のデフォルト値は、予想されるレート、リソースおよび使用状況に基づいて容量を選択する際に役立つ開始点となります。

1つのSubmissionPublisherを複数のソース間で共有することができます。 アイテムを公開する前にソース・スレッド内のアクション、または各サブスクライバによる対応するアクセスに続いてシグナルhappen-beforeアクションを発行します。 しかし、報告された遅延と需要の見積もりは、同期制御ではなく、モニタリングに使用するように設計されており、進行状況の古いか不正確な見解を反映している可能性があります。

パブリケーション・メソッドでは、バッファが飽和した場合に何をするかに関する様々なポリシーがサポートされます。 メソッドsubmitは、リソースが使用可能になるまでブロックします。 これは最も簡単ですが、応答性が最小です。 offerメソッドは、アイテム(直ちにまたは制限されたタイムアウトで)を削除できますが、ハンドラを割り込み、再試行する機会を提供します。

サブスクライバ・メソッドが例外をスローすると、そのサブスクリプションは取り消されます。 コンストラクタ引数としてハンドラが指定されている場合は、メソッドonNextの例外時に取消しの前に呼び出されますが、メソッドonSubscribeonErrorおよびonCompleteの例外は、取消し前に記録または処理されません。 指定されたエグゼキュータがタスクの実行時にRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローするか、ドロップ・ハンドラが削除されたアイテムの処理時に例外をスローすると、例外が再スローされます。 このような場合、すべてのサブスクライバが公開済アイテムを発行されるわけではありません。 このような場合は、通常、closeExceptionallyを使用することをお薦めします。

メソッドconsume(Consumer)は、サブスクライバの唯一のアクションが、指定されたファンクションを使用してすべてのアイテムをリクエストおよび処理することである一般的なケースのサポートを簡略化します。

このクラスは、アイテムを生成するサブクラスの便利なベースとしても機能し、このクラスのメソッドを使用してアイテムを公開します。 たとえば、サプライヤから生成された品目を定期的に公開するクラスがあります。 (実際には、生成を個別に開始および停止したり、パブリッシャ間でエグゼキュータを共有したり、スーパークラスではなくコンポーネントとしてSubmissionPublisherを使用するメソッドを追加できます。)

 
 class PeriodicPublisher<T> extends SubmissionPublisher<T> {
   final ScheduledFuture<?> periodicTask;
   final ScheduledExecutorService scheduler;
   PeriodicPublisher(Executor executor, int maxBufferCapacity,
                     Supplier<? extends T> supplier,
                     long period, TimeUnit unit) {
     super(executor, maxBufferCapacity);
     scheduler = new ScheduledThreadPoolExecutor(1);
     periodicTask = scheduler.scheduleAtFixedRate(
       () -> submit(supplier.get()), 0, period, unit);
   }
   public void close() {
     periodicTask.cancel(false);
     scheduler.shutdown();
     super.close();
   }
 }

次に、Flow.Processor実装の例を示します。 簡単に説明できるように、パブリッシャへのシングル・ステップ・リクエストを使用します。 より適応性の高いバージョンでは、他のユーティリティ・メソッドとともに、submitから返されたラグ見積りを使用してフローを監視できます。

 
 class TransformProcessor<S,T> extends SubmissionPublisher<T>
   implements Flow.Processor<S,T> {
   final Function<? super S, ? extends T> function;
   Flow.Subscription subscription;
   TransformProcessor(Executor executor, int maxBufferCapacity,
                      Function<? super S, ? extends T> function) {
     super(executor, maxBufferCapacity);
     this.function = function;
   }
   public void onSubscribe(Flow.Subscription subscription) {
     (this.subscription = subscription).request(1);
   }
   public void onNext(S item) {
     subscription.request(1);
     submit(function.apply(item));
   }
   public void onError(Throwable ex) { closeExceptionally(ex); }
   public void onComplete() { close(); }
 }

導入されたバージョン:
9