15 Stream Gatherers

Stream Gatherersを使用すると、カスタムの中間操作を作成できます。これにより、ストリーム・パイプラインは、既存の組込み中間操作では困難だった方法でデータを変換できるようになります。

ノート:

これはプレビュー機能です。プレビュー機能は、設計、仕様および実装が完了したが、永続的でない機能です。プレビュー機能は、将来のJava SEリリースで、異なる形式で存在することもあれば、まったく存在しないこともあります。プレビュー機能が含まれているコードをコンパイルして実行するには、追加のコマンド行オプションを指定する必要があります。『Preview Language and VM Features』を参照してください。

Stream Gatherersの背景情報は、JEP 473を参照してください。

ギャザラとは

ギャザラは、入力要素のストリームを出力要素のストリームに変換する中間操作であり、入力要素のストリームの最後に到達したときに、オプションで最終アクションを適用します。

Stream.map(Function)などの中間操作では新しいストリームが生成され、Stream.forEach(Consumer)などの終端操作ではストリーム以外の結果が生成されることに注意してください。ストリーム以外の結果は、プリミティブ値(double値など)やコレクションのこともあれば、forEachの場合など、値がまったく存在しないこともあります。

Gatherersは、次のことを実行できます。

  • 要素を1対1、1対多、多対1、多対多の方法で変換します。
  • 以前に確認した要素を追跡して、後の要素の変換に影響を与えます。
  • 短絡、または入力要素の処理を停止して、無限ストリームを有限ストリームに変換します
  • ストリームをパラレルで処理します

    ノート:

    収集では、ギャザラの作成時にコンバイナ関数を指定した場合にのみ、ストリームがパラレルで処理されます。「ギャザラの作成」「コンバイナ関数」を参照してください。parallel()をコールした場合でも、ギャザラのデフォルト・コンバイナによってパラレル化がオフになります。

収集操作の例を次に示します。

  • 要素のバッチへのグループ化
  • 連続した類似要素の重複除外
  • 増分累積関数
  • 増分順序変更関数

ギャザラの作成

ギャザラを作成するには、Gathererインタフェースを実装します。

次の例では、整数のストリームから最大の整数を返すギャザラを作成します。ただし、ギャザラがその引数limit以上の整数を検出した場合は、その整数を返し、ストリームの整数の処理を停止します。

    record BiggestInt(int limit) implements Gatherer<Integer, List<Integer>, Integer> {

        // The initializer creates a new private ArrayList to keep track of the
        // largest integer across elements.
        
        @Override
        public Supplier<List<Integer>> initializer() {
            return () -> new ArrayList<Integer>(1);
        }
        
        // The integrator 
       
        @Override
        public Integrator<List<Integer>, Integer, Integer> integrator() {
            return Integrator.of(
                (max, element, downstream) -> {
                    
                    // Save the integer if it's the largest so far.
                    if (max.isEmpty()) max.addFirst(element);
                    else if (element > max.getFirst()) max.set(0, element);
                
                    // If the integer is equal or greater to the limit,
                    // "short-circuit": emit the current integer downstream
                    // and return false to stop processing stream elements
                    if (element >= limit) {
                        downstream.push(element);
                        return false;
                    }
                    
                    // Return true to continue processing stream elements
                    return true;
                }
            );
        }
        
        // The combiner, which is used during parallel evaluation
        
        @Override
        public BinaryOperator<List<Integer>> combiner() {
            return (leftMax, rightMax) -> {
                
                // If either the "left" or "right" ArrayLists contain
                // no value, then return the other
                if (leftMax.isEmpty()) return rightMax;
                if (rightMax.isEmpty()) return leftMax;
                
                // Return the ArrayList that contains the larger integer
                int leftVal = leftMax.getFirst();
                int rightVal = rightMax.getFirst();
                if (leftVal > rightVal) return leftMax;
                else return rightMax;
            };
        }
        
        @Override
        public BiConsumer<List<Integer>, Downstream<? super Integer>> finisher() {
            
            // Emit the largest integer, if there is one, downstream
            return (max, downstream) -> {
                if (!max.isEmpty()) {
                    downstream.push(max.getFirst());
                }
            };
        }
    }

この収集は、次のように使用できます。

    System.out.println(Stream.of(5,4,2,1,6,12,8,9)
                             .gather(new BiggestInt(11))
                             .findFirst()
                             .get());

次のように出力されます。

12

このギャザラをパラレルで使用することもできます。

    System.out.println(Stream.of(5,4,2,1,6,12,8,9)
                             .gather(new BiggestInt(11))
                             .parallel()
                             .findFirst()
                             .get());

Gatherer<T,A,R>インタフェースには、次の3つの型パラメータがあります。

  • T: 収集操作の入力要素の型。この例では、Integer要素のストリームを処理します。
  • A: ギャザラの非公開状態オブジェクトの型。ギャザラは、以前に確認した要素を追跡して、後の要素の変換に影響を与えるために使用できます。この例では、List<Integer>を使用して、これまでに検出した最大のIntegerを入力ストリームに格納します。
  • R: 収集操作からの出力要素の型。この例では、Integer値を返します。

ギャザラを作成するには、入力要素を処理する連携した4つの関数を定義します。これらの関数の一部は、ギャザラの操作に応じてオプションとなります。

  • initializer(): ギャザラの非公開状態オブジェクトを作成します
  • integrator(): 入力ストリームから新しい要素を統合し、場合によっては非公開状態オブジェクトを検査して、出力ストリームに要素を発行します。
  • combiner(): ギャザラがストリームをパラレルで処理しているときに、2つの非公開状態オブジェクトを1つに結合します。
  • finisher(): オプションで、ギャザラがすべての入力要素を処理した後、アクションを実行します。これにより、非公開状態オブジェクトを検査したり、追加の出力要素を発行できます。

イニシャライザ関数

オプションのイニシャライザ関数は、ギャザラの非公開状態オブジェクトを作成します。この例では、1つのIntegerのみの容量を持つ空のArrayListを作成します。これは、ギャザラがこれまでに検出した最大のIntegerを格納するためのものです。

        @Override
        public Supplier<List<Integer>> initializer() {
            return () -> new ArrayList<Integer>(1);
        }

インテグレータ関数

すべてのギャザラにはインテグレータ関数が必要です。インテグレータ関数を作成するには、Gatherer.Integrator.of(Gatherer.Integrator)またはGatherer.Integrator.ofGreedy(Gatherer.Integrator)をコールします。これらのメソッドは、引数として3つのパラメータを含むラムダ式を取ります。この例では、次のラムダ式を使用します。

 (max, element, downstream) -> {
                    
                    // Save the integer if it's the largest so far.
                    if (max.isEmpty()) max.addFirst(element);
                    else if (element > max.getFirst()) max.set(0, element);
                
                    // If the integer is equal or greater to the limit,
                    // "short-circuit": emit the current integer downstream
                    // and return false to stop processing stream elements
                    if (element >= limit) {
                        downstream.push(element);
                        return false;
                    }
                    
                    // Return true to continue processing stream elements
                    return true;
                }

パラメータmaxは非公開状態オブジェクトです。

パラメータelementは、インテグレータ関数が現在処理している入力要素です。

パラメータdownstreamは、Gatherer.Downstreamオブジェクトです。pushメソッドをコールすると、その引数がパイプラインの次のステージに渡されます。

インテグレータ関数は、boolean値を返します。trueを返す場合、入力ストリームの次の要素が処理されます。falseを返す場合、入力要素の処理は短絡され、停止されます。

ヒント:

Downstream::pushメソッドは、下流で追加の要素をプッシュする場合はtrueを返すため、ストリーム要素の処理を続行する場合は、インテグレータ関数がその戻り値を返すことができます。

この例では、elementlimitと等しいか大きい場合、インテグレータ関数はパイプラインの次のステージにelementを渡し、falseを返します。インテグレータはこれ以上入力要素を処理せず、下流のオブジェクトは値をプッシュできなくなります。

ノート:

インテグレータ関数が短絡することを想定せず、入力ストリームのすべての要素を処理する場合は、Integrator::ofのかわりにIntegrator::ofGreedyを使用します。

コンバイナ関数

オプションのコンバイナ関数は、ギャザラをパラレルに実行している場合にのみコールされます。コンバイナ関数は、2つの非公開状態オブジェクトを表す2つのパラメータを含むラムダ式です。

        @Override
        public BinaryOperator<List<Integer>> combiner() {
            return (leftMax, rightMax) -> {
                
                // If either the "left" or "right" ArrayLists contain
                // no value, then return the other
                if (leftMax.isEmpty()) return rightMax;
                if (rightMax.isEmpty()) return leftMax;
                
                // Return the ArrayList that contains the larger integer
                int leftVal = leftMax.getFirst();
                int rightVal = rightMax.getFirst();
                if (leftVal > rightVal) return leftMax;
                else return rightMax;
            };
        }

この例では、最大の整数を含む非公開状態オブジェクト(ArrayList)を返します。

フィニッシャ関数

オプションのフィニッシャ関数は、次の2つのパラメータを含むラムダ式です。

        @Override
        public BiConsumer<List<Integer>, Downstream<? super Integer>> finisher() {
            
            // Emit the largest integer, if there is one, downstream
            return (max, downstream) -> {
                if (!max.isEmpty()) {
                    downstream.push(max.getFirst());
                }
            };
        }

パラメータmaxは非公開状態オブジェクトで、downstreamGatherer.Downstreamオブジェクトです。

この例では、フィニッシャ関数は非公開状態オブジェクトに含まれる値をプッシュします。インテグレータ関数がfalseを返した場合、この値はプッシュされません。下流のオブジェクトが入力要素を処理していないかどうかを確認するには、メソッドGatherer.Downstream::isRejectingをコールします。trueが返されると、入力要素は処理されなくなります。

ノート:

フィニッシャ関数が値を下流にプッシュすると、その値はOptionalオブジェクトに含まれます。

ファクトリ・メソッドを使用したギャザラの作成

Gathererインタフェースを実装するかわりに、Gathererインタフェースでファクトリ・メソッドの1つをコールして、ギャザラを作成できます。

次の例は、Gatherer::ofメソッドをコールする点を除き、「ギャザラの作成」で説明した例と同じです。

    static Gatherer<Integer, List<Integer>, Integer> biggestInt(int limit) {
    
        return Gatherer.of(
        
            // Supplier
            
            () -> { return new ArrayList<Integer>(1); },
            
            // Integrator
            
            Gatherer.Integrator.of(
                (max, element, downstream) -> {
                    System.out.println("Processing " + element);
                    if (max.isEmpty()) max.addFirst(element);
                    else if (element > max.getFirst()) max.set(0, element);
                
                    if (element >= limit) {
                        downstream.push(element);
                        return false;
                    }
                    return true;
                }            
            ),
            
            // Combiner
            
            (leftMax, rightMax) -> {
                if (leftMax.isEmpty()) return rightMax;
                if (rightMax.isEmpty()) return leftMax;
                int leftVal = leftMax.getFirst();
                int rightVal = rightMax.getFirst();
                if (leftVal > rightVal) return leftMax;
                else return rightMax;
            },
            
            // Finisher
            
            (max, downstream) -> {
                if (!max.isEmpty()) {
                    downstream.push(max.getFirst());
                }
            }
        );
    }

このギャザラは、次のようにコールできます。

    System.out.println(Stream.of(5,4,2,1,6,12,8,9)
                             .gather(biggestInt(11))
                             .parallel()
                             .findFirst()
                             .get());

組込みのギャザラ

Gatherersクラスには、次の組込みのギャザラが含まれています。

  • fold(Supplier initial, BiFunction folder): これは、入力要素がなくなるまで集計を増分的に構築する、多対1のギャザラです。これには2つのパラメータがあります。

    • initial: これは、入力ストリームに要素が含まれていない場合にギャザラが発行するID値または値です。
    • folder: これは、2つのパラメータを含むラムダ式です: 1つ目はギャザラが構築している集計、2つ目は現在処理中の要素です。

    次の例では、foldギャザラを使用して、数値のストリームをセミコロンで区切られた文字列に変換します。

        var semicolonSeparated =
            Stream.of(1,2,3,4,5,6,7,8,9)
                  .gather(
                       Gatherers.fold(
                           () -> "",
                           (result, element) -> {
                               if (result.equals("")) return element.toString();
                               return result + ";" + element;
                           }
                       )
                   )
                  .findFirst()
                  .get();
        
        System.out.println(semicolonSeparated);

    出力は次のようになります。

    1;2;3;4;5;6;7;8;9
  • mapConcurrent(int maxConcurrency, Function mapper): これは、ストリーム内の入力要素ごとに、mapperを、maxConcurrencyで指定された制限まで同時に呼び出す1対1のギャザラです。この制限は、次の場合に使用できます。

    • 外部サービスやデータベースなどに対して、ギャザラが同時リクエストを発行しすぎないようにするレート制限構成として。
    • ストリーム全体をパラレル・ストリームに変換せずに、複数の個別の操作を同時に実行できるようにするパフォーマンス・エンハンサとして。

    このギャザラは、ストリームの順序を保持します。

  • scan(Supplier initial, BiFunction scanner): これは、接頭辞スキャン(増分累積)を実行する1対1のギャザラです。パラメータinitialから取得した初期値から始まり、scannerを現在の値と次の入力要素に適用することで、後続の値を取得します。その後、ギャザラは値を下流に発行します。次の例は、このギャザラを示しています。

        Stream.of(1,2,3,4,5,6,7,8,9)
              .gather(Gatherers.scan(() -> 100,
                                     (current, next) -> current + next))
              .forEach(System.out::println);

    次のように出力されます。

    101
    103
    106
    110
    115
    121
    128
    136
    145
  • windowFixed(int windowSize): これは、ウィンドウ内の要素を収集する多対多のギャザラで、検出順の要素のグループです。パラメータwindowSizeは、ウィンドウのサイズを指定します。次の例は、このギャザラを示しています。

        List<List<Integer>> windows =
            Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList();
        windows.forEach(System.out::println);

    次のように出力されます。

    [1, 2, 3]
    [4, 5, 6]
    [7, 8]
  • windowSliding(int windowSize): windowFixedと同様に、これはウィンドウ内の要素を収集する多対多のギャザラです。ただし、後続の各ウィンドウには、最初の要素を除く前のウィンドウのすべての要素が含まれ、ストリームに次の要素が追加されます。次の例は、このギャザラを示しています。

        List<List<Integer>> moreWindows =
            Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(3)).toList();
        moreWindows.forEach(System.out::println);

    次のように出力されます。

    [1, 2, 3]
    [2, 3, 4]
    [3, 4, 5]
    [4, 5, 6]
    [5, 6, 7]
    [6, 7, 8]

ギャザラの構成

Gatherer.andThen(Gatherer)を使用して、2つ以上のギャザラを1つのギャザラに構成できます。

次の例では、前の項「組込みのギャザラ」で説明したように、scanおよびfoldギャザラの例を使用して新しいギャザラを構成します。

    Gatherer<Integer, ?, Integer> sc =
        Gatherers.scan(() -> 100,
                       (current, next) -> current + next);
                       
    Gatherer<Integer, ?, String> fo =
        Gatherers.fold(() -> "",
                       (result, element) -> {
                           if (result.equals("")) return element.toString();
                           return result + ";" + element;
                       });

    var t = Stream.of(1,2,3,4,5,6,7,8,9)
          .gather(sc.andThen(fo))
          .findFirst()
          .get();
          
    System.out.println(t);   

この例の出力は次のとおりです。

101;103;106;110;115;121;128;136;145

tの値を生成する次の文は、前述の例と同じです。

    var t = Stream.of(1,2,3,4,5,6,7,8,9)
          .gather(sc)
          .gather(fo)
          .findFirst()
          .get();

gatherメソッドの連続したコールは、andThen(Gatherer)のコールと同じです。次の2つの文は同等です。ここで、aおよびbはギャザラです。

stream.gather(a).gather(b);
stream.gather(a.andThen(b));