モジュール java.base
パッケージ java.util.concurrent

クラスSubmissionPublisher<T>

java.lang.Object
java.util.concurrent.SubmissionPublisher<T>
型パラメータ:
T - 公開アイテム型
すべての実装されたインタフェース:
AutoCloseable, Flow.Publisher<T>

public class SubmissionPublisher<T>
extends Object
implements Flow.Publisher<T>, AutoCloseable
(non-null)アイテムを非同期で発行するFlow.Publisherは、現在のサブスクライバが閉じられるまでそれを発行します。 現在の各サブスクライバは、ドロップまたは例外が発生しない限り、新しく送信されたアイテムを同じ順序で受信します。 SubmissionPublisherを使用すると、アイテム・ジェネレータは、フロー制御のドロップ処理やブロッキングに依存する準拠のreactive-streamsパブリッシャとして動作することができます。

SubmissionPublisherは、コンストラクタで提供されたExecutorを使用して、サブスクライバに配信します。 エグゼキュータの最良の選択は、予想される使用法に依存します。 送信されたアイテムのジェネレータが別のスレッドで実行され、サブスクライバの数を見積もることができる場合は、Executors.newFixedThreadPool(int)の使用を検討してください。 それ以外の場合は、デフォルトのForkJoinPool.commonPool()を使用することを検討してください。

バッファリングにより、プロデューサとコンシューマは異なる速度で一時的に動作することができます。 各サブスクライバは独立したバッファを使用します。 バッファは最初の使用時に作成され、必要に応じて指定された最大値まで展開されます。 (強制的な容量は、2の最も近い累乗に切り上げることができ、および/またはこの実装によってサポートされる最大値によって制限されることがあります。) requestの呼び出しでは、バッファ拡張は直接行われませんが、満たされていないリクエストが最大容量を超えると、リスク飽和が発生します。 Flow.defaultBufferSize()のデフォルト値は、予想される料金、リソース、および用途に基づいて容量を選択するための出発点となります。

1つのSubmissionPublisherを複数のソース間で共有することができます。 アイテムを公開する前にソース・スレッド内のアクション、または各サブスクライバによる対応するアクセスに続いてシグナルhappen-beforeアクションを発行します。 しかし、報告された遅延と需要の見積もりは、同期制御ではなく、モニタリングに使用するように設計されており、進行状況の古いか不正確な見解を反映している可能性があります。

パブリケーション・メソッドは、バッファが飽和したときの対処方法に関するさまざまなポリシーをサポートします。 メソッドsubmitは、リソースが利用可能になるまでブロックします。 これは最もシンプルですが、レスポンスは最も低くなります。 offerメソッドは、アイテム(直ちにまたは制限されたタイムアウトで)を削除しますが、ハンドラを挿入して再試行する機会を提供します。

サブスクライバ・メソッドが例外をスローすると、そのサブスクリプションは取消されます。 ハンドラがコンストラクタ引数として指定された場合は、メソッドonNextの例外時に取消前に呼び出されますが、メソッドonSubscribeonError、およびonCompleteの例外は取消前に記録または処理されません。 タスクを実行しようとするときに指定されたExecutorがRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローした場合、またはドロップ・ハンドラがドロップされたアイテムを処理するときに例外をスローすると、例外が再スローされます。 このような場合、すべてのサブスクライバが発行されたアイテムを発行されるわけではありません。 これらの場合、通常はcloseExceptionallyを実行することをお勧めします。

メソッドconsume(Consumer)は、サブスクライバの唯一のアクションが、指定された関数を使用してすべてのアイテムをリクエストし処理するという共通のケースのサポートを簡素化します。

このクラスは、アイテムを生成するサブクラスの便利な基盤としても機能し、このクラスのメソッドを使用してそれらを公開します。 たとえば、サプライヤから生成されたアイテムを定期的に公開するクラスがあります。 (実際には、生成を独立して開始および停止するメソッド、パブリッシャ間でExecutorを共有するメソッドなどを追加したり、SubmissionPublisherをスーパークラスではなくコンポーネントとして使用したりできます。)

 
 class PeriodicPublisher<T> extends SubmissionPublisher<T> {
   final ScheduledFuture<?> periodicTask;
   final ScheduledExecutorService scheduler;
   PeriodicPublisher(Executor executor, int maxBufferCapacity,
                     Supplier<? extends T> supplier,
                     long period, TimeUnit unit) {
     super(executor, maxBufferCapacity);
     scheduler = new ScheduledThreadPoolExecutor(1);
     periodicTask = scheduler.scheduleAtFixedRate(
       () -> submit(supplier.get()), 0, period, unit);
   }
   public void close() {
     periodicTask.cancel(false);
     scheduler.shutdown();
     super.close();
   }
 }

次に、Flow.Processor実装の例を示します。 これは、説明を簡単にするため、パブリッシャにシングル・ステップ・リクエストを使用しています。 適応性の高いバージョンでは、submitから返されたラグ推定値を使用して、他のユーティリティ・メソッドとともにフローを監視できます。

 
 class TransformProcessor<S,T> extends SubmissionPublisher<T>
   implements Flow.Processor<S,T> {
   final Function<? super S, ? extends T> function;
   Flow.Subscription subscription;
   TransformProcessor(Executor executor, int maxBufferCapacity,
                      Function<? super S, ? extends T> function) {
     super(executor, maxBufferCapacity);
     this.function = function;
   }
   public void onSubscribe(Flow.Subscription subscription) {
     (this.subscription = subscription).request(1);
   }
   public void onNext(S item) {
     subscription.request(1);
     submit(function.apply(item));
   }
   public void onError(Throwable ex) { closeExceptionally(ex); }
   public void onComplete() { close(); }
 }

導入されたバージョン:
9
  • コンストラクタのサマリー

    コンストラクタ 
    コンストラクタ 説明
    SubmissionPublisher()
    サブスクライバ(少なくとも2つの並列処理レベルをサポートしていない場合を除き、この場合、各タスクを実行するために新しいスレッドが作成されます)への非同期配信用にForkJoinPool.commonPool()を使用し、Flow.defaultBufferSize()の最大バッファ容量を使用し、メソッドonNextのサブスクライバ例外のハンドラを使用せずに新しいSubmissionPublisherを作成します。
    SubmissionPublisher​(Executor executor, int maxBufferCapacity)
    指定されたExecutorを使用してサブスクライバに非同期に配信するための新しいSubmissionPublisherを作成します。各サブスクライバの最大バッファ・サイズは指定されており、メソッドonNextのサブスクライバ例外ハンドラはありません。
    SubmissionPublisher​(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,​? super Throwable> handler)
    各サブスクライバに対して指定された最大バッファ・サイズでサブスクライバへの非同期配信用に、指定されたExecutorを使用して新しいSubmissionPublisherを作成し、非nullの場合、サブスクライバがonNextメソッドで例外をスローしたときに呼び出されます。
  • メソッドのサマリー

    修飾子と型 メソッド 説明
    void close()
    すでにクローズされていない限り、onCompleteは現在のサブスクライバに信号を送り、後続のパブリッシュを許可しません。
    void closeExceptionally​(Throwable error)
    すでにクローズされていない限り、onErrorは指定されたエラーを持つ現在のサブスクライバに通知し、その後の発行を許可しません。
    CompletableFuture<Void> consume​(Consumer<? super T> consumer)
    指定したコンシューマ関数を使用して、公開されたすべてのアイテムを処理します。
    int estimateMaximumLag()
    現在のすべてのサブスクライバ間で生成されたがまだ消費されていないアイテムの最大数の見積もりを返します。
    long estimateMinimumDemand()
    すべての現在のサブスクライバの中でリクエスト(request経由で)されたがまだ生成されていないアイテムの最小数の見積もりを返します。
    Throwable getClosedException()
    closeExceptionallyに関連付けられた例外を返します。閉じていない場合、または正常に終了した場合はnullを返します。
    Executor getExecutor()
    非同期配信に使用されるExecutorを返します。
    int getMaxBufferCapacity()
    サブスクライバごとの最大バッファ容量を返します。
    int getNumberOfSubscribers()
    現在のサブスクライバの数を返します。
    List<Flow.Subscriber<? super T>> getSubscribers()
    サブスクライバでFlow.Subscriberメソッドを呼び出すためではなく、モニタリングとトラッキングのために現在のサブスクライバのリストを返します。
    boolean hasSubscribers()
    このパブリッシャにサブスクライバがある場合はtrueを返します。
    boolean isClosed()
    このサイト運営者が投稿を受け入れていない場合はtrueを返します。
    boolean isSubscribed​(Flow.Subscriber<? super T> subscriber)
    指定されたサブスクライバが現在サブスクライブされている場合はtrueを返します。
    int offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,​? super T> onDrop)
    指定されたアイテムを、可能であれば現在の各サブスクライバに公開します。これは、onNextメソッドを非同期的に呼び出し、任意のサブスクリプションのリソースを使用できなくしてもブロックします。指定されたタイムアウトまで、またはコールアウト・スレッドが中断するまで、指定されたハンドラ(非nullの場合)が呼び出され、trueが返された場合は1回再試行します。
    int offer​(T item, BiPredicate<Flow.Subscriber<? super T>,​? super T> onDrop)
    onNextメソッドを非同期に呼び出すことによって、可能であれば、指定されたアイテムを各現在のサブスクライバにパブリッシュします。
    int submit​(T item)
    onNextメソッドを非同期に呼び出すことによって、現在の各サブスクライバに指定されたアイテムをパブリッシュし、任意のサブスクライバのリソースを使用できない間中断しないようにブロックします。
    void subscribe​(Flow.Subscriber<? super T> subscriber)
    指定されたサブスクライバを、すでにサブスクライブしていない限り追加します。

    クラス java.lang.Objectで宣言されたメソッド

    cloneequalsfinalizegetClasshashCodenotifynotifyAlltoStringwaitwaitwait
  • コンストラクタの詳細

    • SubmissionPublisher

      public SubmissionPublisher​(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,​? super Throwable> handler)
      各サブスクライバに対して指定された最大バッファ・サイズでサブスクライバへの非同期配信用に、指定されたExecutorを使用して新しいSubmissionPublisherを作成し、非nullの場合、サブスクライバがonNextメソッドで例外をスローしたときに呼び出されます。
      パラメータ:
      executor - エグゼキュータが非同期配信に使用し、少なくとも1つの独立したスレッドの作成をサポート
      maxBufferCapacity - 各サブスクライバ・バッファの最大容量(強制された容量は、2の最も近い累乗に切り上げることができ、および/またはこの実装によってサポートされる最大値によって制限されます。メソッドgetMaxBufferCapacity()は実際の値を返します)
      handler -nullでない場合は、メソッドonNextでスローされた例外の発生時に呼び出されるプロシージャ
      例外:
      NullPointerException - executorがnullの場合
      IllegalArgumentException - maxBufferCapacityが正でない場合
    • SubmissionPublisher

      public SubmissionPublisher​(Executor executor, int maxBufferCapacity)
      指定されたExecutorを使用してサブスクライバに非同期に配信するための新しいSubmissionPublisherを作成します。各サブスクライバの最大バッファ・サイズは指定されており、メソッドonNextのサブスクライバ例外ハンドラはありません。
      パラメータ:
      executor - エグゼキュータが非同期配信に使用し、少なくとも1つの独立したスレッドの作成をサポート
      maxBufferCapacity - 各サブスクライバ・バッファの最大容量(強制された容量は、2の最も近い累乗に切り上げることができ、および/またはこの実装によってサポートされる最大値によって制限されます。メソッドgetMaxBufferCapacity()は実際の値を返します)
      例外:
      NullPointerException - executorがnullの場合
      IllegalArgumentException - maxBufferCapacityが正でない場合
    • SubmissionPublisher

      public SubmissionPublisher()
      サブスクライバ(少なくとも2つの並列処理レベルをサポートしていない場合を除き、この場合、各タスクを実行するために新しいスレッドが作成されます)への非同期配信用にForkJoinPool.commonPool()を使用し、Flow.defaultBufferSize()の最大バッファ容量を使用し、メソッドonNextのサブスクライバ例外のハンドラを使用せずに新しいSubmissionPublisherを作成します。
  • メソッドの詳細

    • subscribe

      public void subscribe​(Flow.Subscriber<? super T> subscriber)
      指定されたサブスクライバを、すでにサブスクライブしていない限り追加します。 すでにサブスクライブされている場合、IllegalStateExceptionを使用して既存のサブスクリプションでサブスクライバonErrorメソッドが呼び出されます。 それ以外の場合は、成功すると、Subscriber onSubscribeメソッドが新しいFlow.Subscriptionと非同期に呼び出されます。 onSubscribeが例外をスローすると、サブスクリプションは取り消されます。 それ以外の場合、このSubmissionPublisherが例外的に閉じられた場合、サブスクライバのonErrorメソッドが対応する例外とともに呼び出されるか、または例外なく閉じられた場合、サブスクライバのonCompleteメソッドが呼び出されます。 サブスクライバは、新しいサブスクリプションのrequestメソッドを呼び出してアイテムを受信できるようにし、cancelメソッドを呼び出すことでサブスクリプションを解除することができます。
      定義:
      インタフェースFlow.Publisher<T>内のsubscribe
      パラメータ:
      subscriber - サブスクライバ
      例外:
      NullPointerException - subscriberがnullの場合
    • submit

      public int submit​(T item)
      onNextメソッドを非同期に呼び出すことによって、現在の各サブスクライバに指定されたアイテムをパブリッシュし、任意のサブスクライバのリソースを使用できない間中断しないようにブロックします。 このメソッドは、現在のすべてのサブスクライバ間の最大遅延(送信されたがまだ消費されていないアイテムの数)の推定値を返します。 この値は、サブスクライバが存在する場合は少なくとも1つの(この送信されたアイテムの会計処理)です。それ以外の場合はゼロです。

      このパブリッシャのExecutorが、非同期的にサブスクライバに通知しようとするときにRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローすると、この例外が再スローされます。この場合、すべてのサブスクライバがこのアイテムを発行されるわけではありません。

      パラメータ:
      item - パブリッシュする(非null)アイテム
      戻り値:
      サブスクライバ間の推定最大遅延
      例外:
      IllegalStateException - 閉じている場合
      NullPointerException - itemがnullの場合
      RejectedExecutionException - 実行者によってスローされた場合
    • offer

      public int offer​(T item, BiPredicate<Flow.Subscriber<? super T>,​? super T> onDrop)
      onNextメソッドを非同期に呼び出すことによって、可能であれば、指定されたアイテムを各現在のサブスクライバにパブリッシュします。 リソースの制限を超えた場合、指定されたハンドラ(非nullの場合)が呼び出され、trueを返す場合は、1回再試行されると、そのアイテムは1人以上のユーザーによって削除される可能性があります。 このクラスのメソッドへの他のスレッドによる他の呼び出しは、ハンドラが呼び出されている間はブロックされます。 リカバリが保証されていない限り、オプションは通常、エラーを記録したり、サブスクライバにonError信号を発行することに限定されます。

      このメソッドは、ステータス・インジケータ: 負の場合は、(サブスクライバにアイテムを発行しようとして失敗した)の(negative)数を表します。 それ以外の場合は、現在のすべてのサブスクライバの中での最大遅延(送信されたがまだ消費されていないアイテムの数)の見積もりです。 この値は、サブスクライバが存在する場合は少なくとも1つの(この送信されたアイテムの会計処理)です。それ以外の場合はゼロです。

      サブスクライバに非同期的に通知しようとすると、このパブリッシャのExecutorがRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローするか、ドロップされたアイテムを処理するときにドロップ・ハンドラが例外をスローすると、この例外が再送出されます。

      パラメータ:
      item - パブリッシュする(非null)アイテム
      onDrop - 非nullの場合は、サブスクライバとアイテムの引数を持つサブスクライバへのドロップ時にハンドラが呼び出されます。それがtrueを返す場合、オファーは再試行されます。(once)
      戻り値:
      負の値の場合は、(negative)のドロップ数。そうでなければ、最大遅延の推定値
      例外:
      IllegalStateException - 閉じている場合
      NullPointerException - itemがnullの場合
      RejectedExecutionException - 実行者によってスローされた場合
    • offer

      public int offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,​? super T> onDrop)
      指定されたアイテムを、可能であれば現在の各サブスクライバに公開します。これは、onNextメソッドを非同期的に呼び出し、任意のサブスクリプションのリソースを使用できなくしてもブロックします。指定されたタイムアウトまで、またはコールアウト・スレッドが中断するまで、指定されたハンドラ(非nullの場合)が呼び出され、trueが返された場合は1回再試行します。 (ドロップ・ハンドラは、現在のスレッドが中断されているかどうかをチェックすることによって、タイムアウトと割り込みとを区別することができます。) このクラスのメソッドへの他のスレッドによる他の呼び出しは、ハンドラが呼び出されている間はブロックされます。 リカバリが保証されていない限り、オプションは通常、エラーを記録したり、サブスクライバにonError信号を発行することに限定されます。

      このメソッドは、ステータス・インジケータ: 負の場合は、(サブスクライバにアイテムを発行しようとして失敗した)の(negative)数を表します。 それ以外の場合は、現在のすべてのサブスクライバの中での最大遅延(送信されたがまだ消費されていないアイテムの数)の見積もりです。 この値は、サブスクライバが存在する場合は少なくとも1つの(この送信されたアイテムの会計処理)です。それ以外の場合はゼロです。

      サブスクライバに非同期的に通知しようとすると、このパブリッシャのExecutorがRejectedExecutionException (またはその他のRuntimeExceptionまたはError)をスローするか、ドロップされたアイテムを処理するときにドロップ・ハンドラが例外をスローすると、この例外が再送出されます。

      パラメータ:
      item - パブリッシュする(非null)アイテム
      timeout - あきらめる前に任意のサブスクライバのリソースをどれくらい待つか、unit単位
      unit - timeoutパラメータの解釈方法を決定するTimeUnit
      onDrop - 非nullの場合は、サブスクライバとアイテムの引数を持つサブスクライバへのドロップ時にハンドラが呼び出されます。それがtrueを返す場合、オファーは再試行されます。(once)
      戻り値:
      負の値の場合は、(negative)のドロップ数。そうでなければ、最大遅延の推定値
      例外:
      IllegalStateException - 閉じている場合
      NullPointerException - itemがnullの場合
      RejectedExecutionException - 実行者によってスローされた場合
    • close

      public void close()
      すでにクローズされていない限り、onCompleteは現在のサブスクライバに信号を送り、後続のパブリッシュを許可しません。 返却時に、このメソッドは、すべてのサブスクライバがまだ完了していることを保証するものではありません。
      定義:
      close、インタフェース: AutoCloseable
    • closeExceptionally

      public void closeExceptionally​(Throwable error)
      すでにクローズされていない限り、onErrorは指定されたエラーを持つ現在のサブスクライバに通知し、その後の発行を許可しません。 将来のサブスクライバにも所定のエラーが発生します。 返却時に、このメソッドは、すべてのサブスクライバがまだ完了していることを保証するものではありません。
      パラメータ:
      error - onError引数がサブスクライバに送信された
      例外:
      NullPointerException - エラーがnullの場合
    • isClosed

      public boolean isClosed()
      このサイト運営者が投稿を受け入れていない場合はtrueを返します。
      戻り値:
      閉じている場合はtrue
    • getClosedException

      public Throwable getClosedException()
      closeExceptionallyに関連付けられた例外を返します。閉じていない場合、または正常に終了した場合はnullを返します。
      戻り値:
      例外。ない場合はnull
    • hasSubscribers

      public boolean hasSubscribers()
      このパブリッシャにサブスクライバがある場合はtrueを返します。
      戻り値:
      このパブリッシャにサブスクライバがある場合はtrue
    • getNumberOfSubscribers

      public int getNumberOfSubscribers()
      現在のサブスクライバの数を返します。
      戻り値:
      現在のサブスクライバ数
    • getExecutor

      public Executor getExecutor()
      非同期配信に使用されるExecutorを返します。
      戻り値:
      非同期配信に使用されるExecutor
    • getMaxBufferCapacity

      public int getMaxBufferCapacity()
      サブスクライバごとの最大バッファ容量を返します。
      戻り値:
      サブスクライバごとの最大バッファ容量
    • getSubscribers

      public List<Flow.Subscriber<? super T>> getSubscribers()
      サブスクライバでFlow.Subscriberメソッドを呼び出すためではなく、モニタリングとトラッキングのために現在のサブスクライバのリストを返します。
      戻り値:
      現在のサブスクライバのリスト
    • isSubscribed

      public boolean isSubscribed​(Flow.Subscriber<? super T> subscriber)
      指定されたサブスクライバが現在サブスクライブされている場合はtrueを返します。
      パラメータ:
      subscriber - サブスクライバ
      戻り値:
      現在サブスクライブされている場合はtrue
      例外:
      NullPointerException - subscriberがnullの場合
    • estimateMinimumDemand

      public long estimateMinimumDemand()
      すべての現在のサブスクライバの中でリクエスト(request経由で)されたがまだ生成されていないアイテムの最小数の見積もりを返します。
      戻り値:
      推定値、またはサブスクライバがない場合はゼロ
    • estimateMaximumLag

      public int estimateMaximumLag()
      現在のすべてのサブスクライバ間で生成されたがまだ消費されていないアイテムの最大数の見積もりを返します。
      戻り値:
      見積もり
    • consume

      public CompletableFuture<Void> consume​(Consumer<? super T> consumer)
      指定したコンシューマ関数を使用して、公開されたすべてのアイテムを処理します。 このパブリッシャがonCompleteを通知したとき、またはエラーが発生したときに例外的に完了したとき、またはコンシューマによって例外がスローされたとき、または返されたCompletableFutureが取消されたときに正常に完了するCompletableFutureを返します。
      パラメータ:
      consumer - 各onNextアイテムに適用される関数
      戻り値:
      パブリッシャがonCompleteにシグナルを送るときに正常に完了するCompletableFuture、例外的にエラーまたは取消が発生したとき
      例外:
      NullPointerException - 消費者がnullの場合