モジュール java.base
パッケージ java.util.concurrent

クラスSubmissionPublisher<T>

  • 型パラメータ:
    T - 公開済項目タイプ
    すべての実装されたインタフェース:
    AutoCloseable, Flow.Publisher<T>

    public class SubmissionPublisher<T>
    extends Object
    implements Flow.Publisher<T>, AutoCloseable
    現在のサブスクライバに送信された(null以外の)アイテムがクローズされるまで非同期に発行されるFlow.Publisher 現行のサブスクライバは、ドロップまたは例外が発生しないかぎり、新しく送信されたアイテムを同じ順序で受信します。 SubmissionPublisherを使用すると、アイテム・ジェネレータは、ドロップ処理またはフロー制御のブロック(あるいはその両方)に依存する、準拠したリアクティブ・ストリーム・パブリッシャとして機能できます。

    SubmissionPublisherは、コンストラクタに指定されたExecutorを使用して、サブスクライバに配信します。 エグゼキュータの最適な選択は、予想される使用状況によって異なります。 送信されたアイテムのジェネレータが別々のスレッドで実行され、サブスクライバの数を推定できる場合は、Executors.newFixedThreadPool(int)の使用を検討してください。 それ以外の場合は、デフォルト(通常は ForkJoinPool.commonPool())の使用を検討してください。

    バッファリングにより、生産者と消費者は異なる速度で一時的に運用できます。 各サブスクライバは、独立したバッファを使用します。 バッファは最初の使用時に作成され、必要に応じて指定された最大値まで拡張されます。 (強制容量は、2のもっとも近い累乗に切り上げたり、この実装でサポートされる最大値で制限したりできます)。 requestを呼び出すと、バッファが直接拡張されるわけではなく、満たされていないリクエストが最大容量を超えると、リスクが飽和します。 デフォルト値のFlow.defaultBufferSize()は、予測されるレート、リソースおよび使用量に基づいて容量を選択する際の有用な開始点となります。

    単一のSubmissionPublisherは、複数のソース間で共有できます。 アイテムを公開する前のソース・スレッド内のアクション、または各サブスクライバによる対応するアクセスの後に発生前のシグナル・アクションを発行するアクション。 ただし、報告されたラグと需要の見積りは、同期制御用ではなくモニタリング用に設計されており、進捗の古いビューまたは不正確なビューを反映している可能性があります。

    パブリケーション・メソッドは、バッファが飽和状態になったときに実行する処理に関する様々なポリシーをサポートします。 メソッドsubmitは、リソースが使用可能になるまでブロックします。 これは最も単純ですが、応答性が低くなります。 offerメソッドは、項目を(即時または制限付きタイムアウトで)削除できますが、ハンドラを中断してから再試行する機会を提供します。

    Subscriberメソッドで例外がスローされた場合、そのサブスクリプションは取り消されます。 コンストラクタ引数としてハンドラが指定されている場合は、メソッドonNextの例外が取り消される前に呼び出されますが、メソッドonSubscribeonErrorおよびonCompleteの例外は、取り消す前に記録または処理されません。 指定されたエグゼキュータがタスクを実行しようとしたときにRejectedExecutionException (または他のRuntimeExceptionまたはエラー)をスローするか、削除されたアイテムの処理中にドロップ・ハンドラが例外をスローした場合、例外は再スローされます。 このような場合、すべてのサブスクライバが発行されるわけではありません。 通常、このような場合はcloseExceptionallyを使用することをお薦めします。

    メソッドconsume(Consumer)は、加入者の唯一のアクションが、指定されたファンクションを使用してすべてのアイテムをリクエストおよび処理する一般的なケースのサポートを簡略化します。

    このクラスは、アイテムを生成するサブクラスの便利なベースとしても機能し、このクラスのメソッドを使用してアイテムを公開します。 たとえば、サプライヤから生成された品目を定期的に公開する区分を次に示します。 (実際には、生成を個別に開始および停止したり、パブリッシャ間でエグゼキュータを共有したり、スーパークラスではなくコンポーネントとしてSubmissionPublisherを使用するメソッドを追加できます。)

     
     class PeriodicPublisher<T> extends SubmissionPublisher<T> {
       final ScheduledFuture<?> periodicTask;
       final ScheduledExecutorService scheduler;
       PeriodicPublisher(Executor executor, int maxBufferCapacity,
                         Supplier<? extends T> supplier,
                         long period, TimeUnit unit) {
         super(executor, maxBufferCapacity);
         scheduler = new ScheduledThreadPoolExecutor(1);
         periodicTask = scheduler.scheduleAtFixedRate(
           () -> submit(supplier.get()), 0, period, unit);
       }
       public void close() {
         periodicTask.cancel(false);
         scheduler.shutdown();
         super.close();
       }
     }

    次に、Flow.Processor実装の例を示します。 イラストレーションを簡略化するために、パブリッシャへの単一ステップのリクエストを使用します。 より適応性の高いバージョンでは、submitから返されたラグ見積りと他のユーティリティ・メソッドを使用してフローを監視できます。

     
     class TransformProcessor<S,T> extends SubmissionPublisher<T>
       implements Flow.Processor<S,T> {
       final Function<? super S, ? extends T> function;
       Flow.Subscription subscription;
       TransformProcessor(Executor executor, int maxBufferCapacity,
                          Function<? super S, ? extends T> function) {
         super(executor, maxBufferCapacity);
         this.function = function;
       }
       public void onSubscribe(Flow.Subscription subscription) {
         (this.subscription = subscription).request(1);
       }
       public void onNext(S item) {
         subscription.request(1);
         submit(function.apply(item));
       }
       public void onError(Throwable ex) { closeExceptionally(ex); }
       public void onComplete() { close(); }
     }

    導入されたバージョン:
    9
    • コンストラクタのサマリー

      コンストラクタ 
      コンストラクタ 説明
      SubmissionPublisher()
      サブスクライバへの非同期配信にForkJoinPool.commonPool()を使用して新しいSubmissionPublisherを作成します(少なくとも2つの並列処理レベルをサポートしていない場合、この場合、各タスクを実行するために新しいスレッドが作成されます)、最大バッファ容量はFlow.defaultBufferSize()で、サブスクライバ例外のハンドラはメソッドonNextにありません)。
      SubmissionPublisher​(Executor executor, int maxBufferCapacity)
      サブスクライバへの非同期配信用に指定されたエグゼキュータを使用して新しいSubmissionPublisherを作成し、サブスクライバごとに指定された最大バッファ・サイズを指定し、メソッドonNextでサブスクライバ例外のハンドラも作成しません。
      SubmissionPublisher​(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,​? super Throwable> handler)
      サブスクライバへの非同期配信用に指定されたエグゼキュータを使用して、各サブスクライバに指定された最大バッファ・サイズで新しいSubmissionPublisherを作成し、null以外の場合は、サブスクライバがメソッドonNextで例外をスローしたときに呼び出される指定されたハンドラを作成します。
    • メソッドのサマリー

      すべてのメソッド インスタンス・メソッド 具象メソッド 
      修飾子と型 メソッド 説明
      void close()
      すでにクローズされていないかぎり、onCompleteは現行のサブスクライバにシグナルを発行し、その後のパブリッシュの試行を禁止します。
      void closeExceptionally​(Throwable error)
      すでにクローズしていないかぎり、onErrorは、指定されたエラーで現在のサブスクライバにシグナルを発行し、その後の公開試行を禁止します。
      CompletableFuture<Void> consume​(Consumer<? super T> consumer)
      指定されたコンシューマ機能を使用して、すべての公開済項目を処理します。
      int estimateMaximumLag()
      すべての現在のサブスクライバ間で生成済だがまだ消費されていないアイテムの最大数の見積を返します。
      long estimateMinimumDemand()
      現在のすべてのサブスクライバ間で、(requestを介して)リクエストされたが、まだ生成されていないアイテムの最小数の見積りを返します。
      Throwable getClosedException()
      closeExceptionallyに関連付けられた例外を返します。閉じていない場合、または正常に閉じられている場合はnullを返します。
      Executor getExecutor()
      非同期配信に使用されるエグゼキュータを返します。
      int getMaxBufferCapacity()
      サブスクライバ当たりの最大バッファ容量を返します。
      int getNumberOfSubscribers()
      現在のサブスクライバの数を返します。
      List<Flow.Subscriber<? super T>> getSubscribers()
      サブスクライバでFlow.Subscriberメソッドを起動するのではなく、監視および追跡の目的で現在のサブスクライバのリストを返します。
      boolean hasSubscribers()
      このパブリッシャにサブスクライバがある場合、trueを返します。
      boolean isClosed()
      このパブリッシャが発行を受け入れていない場合はtrueを返します。
      boolean isSubscribed​(Flow.Subscriber<? super T> subscriber)
      指定されたサブスクライバが現在サブスクライブされている場合はtrueを返します。
      int offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,​? super T> onDrop)
      可能な場合は、onNextメソッドを非同期的に起動し、サブスクリプションのリソースが使用できない間、指定されたタイムアウトまで、または呼出し側スレッドが中断されるまでブロックし、その時点で指定されたハンドラ(null以外の場合)が呼び出され、trueが返された場合は1回再試行することで、指定された項目を現在の各サブスクライバに公開します。
      int offer​(T item, BiPredicate<Flow.Subscriber<? super T>,​? super T> onDrop)
      可能な場合は、onNextメソッドを非同期に呼び出して、指定された項目を現在の各サブスクライバに公開します。
      int submit​(T item)
      onNextメソッドを非同期的に起動し、任意のサブスクライバのリソースが使用できない間に中断せずにブロックすることで、指定された項目を各現行のサブスクライバに公開します。
      void subscribe​(Flow.Subscriber<? super T> subscriber)
      すでにサブスクライブしていないかぎり、指定されたサブスクライバを追加します。
    • コンストラクタの詳細

      • SubmissionPublisher

        public SubmissionPublisher​(Executor executor,
                                   int maxBufferCapacity,
                                   BiConsumer<? super Flow.Subscriber<? super T>,​? super Throwable> handler)
        サブスクライバへの非同期配信用に指定されたエグゼキュータを使用して、各サブスクライバに指定された最大バッファ・サイズで新しいSubmissionPublisherを作成し、null以外の場合は、サブスクライバがメソッドonNextで例外をスローしたときに呼び出される指定されたハンドラを作成します。
        パラメータ:
        executor - 非同期配信に使用するエグゼキュータで、少なくとも1つの独立したスレッドの作成をサポートします。
        maxBufferCapacity - 各サブスクライバのバッファの最大容量(強制容量は、2のもっとも近い累乗に切り上げられるか、またはこの実装でサポートされている最大値によって制限されます。メソッドgetMaxBufferCapacity()は実際の値を返します)
        handler - NULL以外の場合、メソッドonNextでスローされた例外時に呼び出すプロシージャ
        例外:
        NullPointerException - executorがnullの場合
        IllegalArgumentException - maxBufferCapacityが正でない場合
      • SubmissionPublisher

        public SubmissionPublisher​(Executor executor,
                                   int maxBufferCapacity)
        サブスクライバへの非同期配信用に指定されたエグゼキュータを使用して新しいSubmissionPublisherを作成し、サブスクライバごとに指定された最大バッファ・サイズを指定し、メソッドonNextでサブスクライバ例外のハンドラも作成しません。
        パラメータ:
        executor - 非同期配信に使用するエグゼキュータで、少なくとも1つの独立したスレッドの作成をサポートします。
        maxBufferCapacity - 各サブスクライバのバッファの最大容量(強制容量は、2のもっとも近い累乗に切り上げられるか、またはこの実装でサポートされている最大値によって制限されます。メソッドgetMaxBufferCapacity()は実際の値を返します)
        例外:
        NullPointerException - executorがnullの場合
        IllegalArgumentException - maxBufferCapacityが正でない場合
      • SubmissionPublisher

        public SubmissionPublisher()
        サブスクライバへの非同期配信にForkJoinPool.commonPool()を使用して新しいSubmissionPublisherを作成します(少なくとも2つの並列処理レベルをサポートしていない場合、この場合、各タスクを実行するために新しいスレッドが作成されます)、最大バッファ容量はFlow.defaultBufferSize()で、サブスクライバ例外のハンドラはメソッドonNextにありません)。
    • メソッドの詳細

      • subscribe

        public void subscribe​(Flow.Subscriber<? super T> subscriber)
        すでにサブスクライブしていないかぎり、指定されたサブスクライバを追加します。 すでにサブスクライブされている場合、サブスクライバのonErrorメソッドは、IllegalStateExceptionを持つ既存のサブスクリプションで起動されます。 それ以外の場合は、成功すると、サブスクライバのonSubscribeメソッドが新しいFlow.Subscriptionで非同期に呼び出されます。 onSubscribeが例外をスローすると、サブスクリプションは取り消されます。 それ以外の場合、このSubmissionPublisherが例外的にクローズされた場合、サブスクライバのonErrorメソッドが対応する例外とともに呼び出されるか、例外なしでクローズされた場合、サブスクライバのonCompleteメソッドが呼び出されます。 サブスクライバは、新規サブスクリプションのrequestメソッドを起動してアイテムの受信を有効にし、cancelメソッドを起動してサブスクライブを解除できます。
        定義:
        インタフェースFlow.Publisher<T>subscribe
        パラメータ:
        subscriber - サブスクライバ
        例外:
        NullPointerException - サブスクライバがnullの場合
      • submit

        public int submit​(T item)
        onNextメソッドを非同期的に起動し、任意のサブスクライバのリソースが使用できない間に中断せずにブロックすることで、指定された項目を各現行のサブスクライバに公開します。 このメソッドは、現在のすべてのサブスクライバの最大ラグ(送信済だがまだ消費されていないアイテムの数)の見積りを返します。 サブスクライバが存在する場合、この値は少なくとも1つ(この発行済品目の会計処理)であり、それ以外の場合はゼロです。

        このパブリッシャのエグゼキュータが、サブスクライバへの非同期通知を試行したときにRejectedExecutionException (またはその他のRuntimeExceptionまたはエラー)をスローした場合、この例外は再スローされます。この場合、すべてのサブスクライバがこの項目を発行したわけではありません。

        パラメータ:
        item - 公開する(null以外)項目
        戻り値:
        サブスクライバ間の推定最大ラグ
        例外:
        IllegalStateException - クローズした場合
        NullPointerException - 項目がnullの場合
        RejectedExecutionException - エグゼキュータによってスローされた場合
      • offer

        public int offer​(T item,
                         BiPredicate<Flow.Subscriber<? super T>,​? super T> onDrop)
        可能な場合は、onNextメソッドを非同期に呼び出して、指定された項目を現在の各サブスクライバに公開します。 リソース制限を超えた場合、アイテムは1つ以上のサブスクライバによって削除され、その場合、指定されたハンドラ(null以外の場合)が呼び出され、trueが返された場合は1回再試行されます。 ハンドラが呼び出される間、このクラスの他のスレッドによるメソッドへのコールはブロックされます。 リカバリが保証されないかぎり、オプションは通常、エラーのロギングやサブスクライバへのonErrorシグナルの発行に制限されます。

        このメソッドは、ステータス・インジケータを返します。負の場合は、(マイナスの)ドロップ数を表します(サブスクライバへのアイテムの発行の失敗)。 それ以外の場合は、現在のすべてのサブスクライバの最大ラグ(送信済だがまだ消費されていないアイテムの数)の見積りです。 サブスクライバが存在する場合、この値は少なくとも1つ(この発行済品目の会計処理)であり、それ以外の場合はゼロです。

        このパブリッシャのエグゼキュータがサブスクライバに非同期的に通知しようとしたときにRejectedExecutionException (または他のRuntimeExceptionまたはエラー)をスローした場合、または削除されたアイテムの処理時にドロップ・ハンドラが例外をスローした場合、この例外は再スローされます。

        パラメータ:
        item - 公開する(null以外)項目
        onDrop - null以外の場合、サブスクライバおよびアイテムの引数を使用してサブスクライバへの削除時にハンドラが呼び出されます。trueが返された場合、オファーが再試行されます(1回)
        戻り値:
        負の場合、ドロップの(負)数。負の場合、最大ラグの見積り
        例外:
        IllegalStateException - クローズした場合
        NullPointerException - 項目がnullの場合
        RejectedExecutionException - エグゼキュータによってスローされた場合
      • offer

        public int offer​(T item,
                         long timeout,
                         TimeUnit unit,
                         BiPredicate<Flow.Subscriber<? super T>,​? super T> onDrop)
        可能な場合は、onNextメソッドを非同期的に起動し、サブスクリプションのリソースが使用できない間、指定されたタイムアウトまで、または呼出し側スレッドが中断されるまでブロックし、その時点で指定されたハンドラ(null以外の場合)が呼び出され、trueが返された場合は1回再試行することで、指定された項目を現在の各サブスクライバに公開します。 (削除ハンドラは、現在のスレッドが中断されているかどうかをチェックすることで、タイムアウトと割り込みを区別できます。) ハンドラが呼び出される間、このクラスの他のスレッドによるメソッドへのコールはブロックされます。 リカバリが保証されないかぎり、オプションは通常、エラーのロギングやサブスクライバへのonErrorシグナルの発行に制限されます。

        このメソッドは、ステータス・インジケータを返します。負の場合は、(マイナスの)ドロップ数を表します(サブスクライバへのアイテムの発行の失敗)。 それ以外の場合は、現在のすべてのサブスクライバの最大ラグ(送信済だがまだ消費されていないアイテムの数)の見積りです。 サブスクライバが存在する場合、この値は少なくとも1つ(この発行済品目の会計処理)であり、それ以外の場合はゼロです。

        このパブリッシャのエグゼキュータがサブスクライバに非同期的に通知しようとしたときにRejectedExecutionException (または他のRuntimeExceptionまたはエラー)をスローした場合、または削除されたアイテムの処理時にドロップ・ハンドラが例外をスローした場合、この例外は再スローされます。

        パラメータ:
        item - 公開する(null以外)項目
        timeout - 任意のサブスクライバのリソースを待機してから諦めるまでの時間(単位はunit)
        unit - timeoutパラメータの解釈方法を決定するTimeUnit
        onDrop - null以外の場合、サブスクライバおよびアイテムの引数を使用してサブスクライバへの削除時にハンドラが呼び出されます。trueが返された場合、オファーが再試行されます(1回)
        戻り値:
        負の場合、ドロップの(負)数。負の場合、最大ラグの見積り
        例外:
        IllegalStateException - クローズした場合
        NullPointerException - 項目がnullの場合
        RejectedExecutionException - エグゼキュータによってスローされた場合
      • close

        public void close()
        すでにクローズされていないかぎり、onCompleteは現行のサブスクライバにシグナルを発行し、その後のパブリッシュの試行を禁止します。 戻ると、このメソッドでは、すべてのサブスクライバがまだ完了していることは保証されません
        定義:
        close、インタフェースAutoCloseable
      • closeExceptionally

        public void closeExceptionally​(Throwable error)
        すでにクローズしていないかぎり、onErrorは、指定されたエラーで現在のサブスクライバにシグナルを発行し、その後の公開試行を禁止します。 将来のサブスクライバも、指定されたエラーを受け取ります。 戻ると、このメソッドでは、すべてのサブスクライバがまだ完了していることは保証されません
        パラメータ:
        error - サブスクライバに送信されるonError引数
        例外:
        NullPointerException - エラーがnullの場合
      • isClosed

        public boolean isClosed()
        このパブリッシャが発行を受け入れていない場合はtrueを返します。
        戻り値:
        閉じている場合はtrue
      • getClosedException

        public Throwable getClosedException()
        closeExceptionallyに関連付けられた例外を返します。閉じていない場合、または正常に閉じられている場合はnullを返します。
        戻り値:
        例外。ない場合はnull
      • hasSubscribers

        public boolean hasSubscribers()
        このパブリッシャにサブスクライバがある場合、trueを返します。
        戻り値:
        このパブリッシャにサブスクライバがある場合、true
      • getNumberOfSubscribers

        public int getNumberOfSubscribers()
        現在のサブスクライバの数を返します。
        戻り値:
        現在のサブスクライバの数
      • getExecutor

        public Executor getExecutor()
        非同期配信に使用されるエグゼキュータを返します。
        戻り値:
        非同期配信に使用されるエグゼキュータ
      • getMaxBufferCapacity

        public int getMaxBufferCapacity()
        サブスクライバ当たりの最大バッファ容量を返します。
        戻り値:
        サブスクライバ当たりの最大バッファ容量
      • getSubscribers

        public List<Flow.Subscriber<? super T>> getSubscribers()
        サブスクライバでFlow.Subscriberメソッドを起動するのではなく、監視および追跡の目的で現在のサブスクライバのリストを返します。
        戻り値:
        現在のサブスクライバのリスト
      • isSubscribed

        public boolean isSubscribed​(Flow.Subscriber<? super T> subscriber)
        指定されたサブスクライバが現在サブスクライブされている場合はtrueを返します。
        パラメータ:
        subscriber - サブスクライバ
        戻り値:
        現在サブスクライブしている場合はtrue
        例外:
        NullPointerException - サブスクライバがnullの場合
      • estimateMinimumDemand

        public long estimateMinimumDemand()
        現在のすべてのサブスクライバ間で、(requestを介して)リクエストされたが、まだ生成されていないアイテムの最小数の見積りを返します。
        戻り値:
        見積り(サブスクライバがない場合はゼロ)
      • estimateMaximumLag

        public int estimateMaximumLag()
        すべての現在のサブスクライバ間で生成済だがまだ消費されていないアイテムの最大数の見積を返します。
        戻り値:
        見積
      • consume

        public CompletableFuture<Void> consume​(Consumer<? super T> consumer)
        指定されたコンシューマ機能を使用して、すべての公開済項目を処理します。 このパブリッシャがonCompleteにシグナルを送信したとき、またはエラー発生時に例外的に完了したとき、コンシューマによって例外がスローされたとき、または戻されたCompletableFutureが取り消されたときに正常に完了したCompletableFutureを返します。この場合、これ以上の項目は処理されません。
        パラメータ:
        consumer - 各onNext項目に適用される関数
        戻り値:
        パブリッシャがonCompleteにシグナルを送信したときに正常に完了し、エラーまたは取消時に例外的に完了するCompletableFuture
        例外:
        NullPointerException - コンシューマがnullの場合