15 Stream Gatherers

Stream gatherers enable you to create custom intermediate operations, which enables stream pipelines to transform data in ways that aren't easily achievable with exisiting built-in intermediate operations.

Note:

This is a preview feature. A preview feature is a feature whose design, specification, and implementation are complete, but is not permanent. A preview feature may exist in a different form or not at all in future Java SE releases. To compile and run code that contains preview features, you must specify additional command-line options. See Preview Language and VM Features.

For background information about stream gatherers, see JEP 461.

What Is a Gatherer?

A gatherer is an intermediate operation that transforms a stream of input elements into a stream of output elements, optionally applying a final action when it reaches the end of the stream of input elements.

Remember that an intermediate operation, such as Stream.map(Function), produces a new stream, while a terminal operation, such as Stream.forEach(Consumer), produces a non-stream result. A non-stream result could be a primitive value (like a double value), a collection, or in the case of forEach, no value at all.

Gatherers can do the following:

  • Transform elements in a one-to-one, one-to-many, many-to-one, or many-to-many fashion
  • Track previously seen elements to influence the transformation of later elements
  • Short-circuit, or stop processing input elements to transform infinite streams to finite ones
  • Process a stream in parallel

    Note:

    A gather will process a stream in parallel only if you specify a combiner function when you create the gatherer. See The Combiner Function in Creating a Gatherer. A gatherer's default combiner turns parallelization off even if you call parallel().

Examples of gathering operations include the following:

  • Grouping elements into batches
  • Deduplicating consecutively similar elements
  • Incremental accumulation functions
  • Incremental reordering functions

Creating a Gatherer

To create a gatherer, implement the Gatherer interface.

The following example creates a gatherer that returns the largest integer from a stream of integers. However, if the gatherer encounters an integer equal or larger to its argument limit, then it returns that integer and stops processing the stream's integers.

    record BiggestInt(int limit) implements Gatherer<Integer, List<Integer>, Integer> {

        // The initializer creates a new private ArrayList to keep track of the
        // largest integer across elements.
        
        @Override
        public Supplier<List<Integer>> initializer() {
            return () -> new ArrayList<Integer>(1);
        }
        
        // The integrator 
       
        @Override
        public Integrator<List<Integer>, Integer, Integer> integrator() {
            return Integrator.of(
                (max, element, downstream) -> {
                    
                    // Save the integer if it's the largest so far.
                    if (max.isEmpty()) max.addFirst(element);
                    else if (element > max.getFirst()) max.set(0, element);
                
                    // If the integer is equal or greater to the limit,
                    // "short-circuit": emit the current integer downstream
                    // and return false to stop processing stream elements
                    if (element >= limit) {
                        downstream.push(element);
                        return false;
                    }
                    
                    // Return true to continue processing stream elements
                    return true;
                }
            );
        }
        
        // The combiner, which is used during parallel evaluation
        
        @Override
        public BinaryOperator<List<Integer>> combiner() {
            return (leftMax, rightMax) -> {
                
                // If either the "left" or "right" ArrayLists contain
                // no value, then return the other
                if (leftMax.isEmpty()) return rightMax;
                if (rightMax.isEmpty()) return leftMax;
                
                // Return the ArrayList that contains the larger integer
                int leftVal = leftMax.getFirst();
                int rightVal = rightMax.getFirst();
                if (leftVal > rightVal) return leftMax;
                else return rightMax;
            };
        }
        
        @Override
        public BiConsumer<List<Integer>, Downstream<? super Integer>> finisher() {
            
            // Emit the largest integer, if there is one, downstream
            return (max, downstream) -> {
                if (!max.isEmpty()) {
                    downstream.push(max.getFirst());
                }
            };
        }
    }

You can use this gather as follows:

    System.out.println(Stream.of(5,4,2,1,6,12,8,9)
                             .gather(new BiggestInt(11))
                             .findFirst()
                             .get());

It prints the following output:

12

You can also use this gatherer in parallel:

    System.out.println(Stream.of(5,4,2,1,6,12,8,9)
                             .gather(new BiggestInt(11))
                             .parallel()
                             .findFirst()
                             .get());

The Gatherer<T,A,R> interface has three type parameters:

  • T: The type of input elements to the gather operation. This example process a stream of Integer elements.
  • A: The type of the gatherer's private state object, which the gatherer can use to track previously seen elements to influence the transformation of later elements. This example uses a List<Integer> to store the largest Integer it has encountered so far in the input stream.
  • R: The type of output elements from the gatherer operation. This example returns an Integer value.

You create a gatherer by defining four functions that work together that process input elements. Some of these functions are optional depending on your gatherer's operation:

  • initializer(): Creates the gatherer's private state object
  • integrator(): Integrates a new element from the input stream, possibly inspects the private state object, and possibly emits elements to the output stream
  • combiner(): Combines two private state objects into one when the gatherer is processing the stream in parallel
  • finisher(): Optionally performs an action after the gatherer has processed all input elements; it could inspect the private state object or emit additional output elements

The Initializer Function

The optional initializer function creates the gatherer's private state object. This example creates an empty ArrayList with a capacity of only one Integer as its meant to store the largest Integer the gatherer has encountered so far.

        @Override
        public Supplier<List<Integer>> initializer() {
            return () -> new ArrayList<Integer>(1);
        }

The Integrator Function

Every gatherer requires an integrator function. To create an integrator function, call either Gatherer.Integrator.of(Gatherer.Integrator) or Gatherer.Integrator.ofGreedy(Gatherer.Integrator). These methods take as an argument a lambda expression that contains three parameters. This example uses the following lamda expression:

 (max, element, downstream) -> {
                    
                    // Save the integer if it's the largest so far.
                    if (max.isEmpty()) max.addFirst(element);
                    else if (element > max.getFirst()) max.set(0, element);
                
                    // If the integer is equal or greater to the limit,
                    // "short-circuit": emit the current integer downstream
                    // and return false to stop processing stream elements
                    if (element >= limit) {
                        downstream.push(element);
                        return false;
                    }
                    
                    // Return true to continue processing stream elements
                    return true;
                }

The parameter max is the private state object.

The parameter element is the input element that the integrator function is currently processing.

The parameter downstream is a Gatherer.Downstream object. When you call its push method, it passes its argument to the next stage in the pipeline.

An integrator function returns a boolean value. If it returns true, then it will process the next element of the input stream. if it returns false, then it will short-circuit and stop processing input elements.

Tip:

The Downstream::push method returns true if the downstream is willing to push additional elements, so your integrator function can return its return value if you want to continue processing stream elements.

In this example, if element is equal or greather than limit, the integrator function passes element to the next stage in the pipeline, then returns false. The integrator won't process any more input elements, and the Downstream object can no longer push values.

Note:

If you don't expect your integrator function to short-circuit and you want it to process all elements of your input stream, use Integrator::ofGreedy instead of Integrator::of.

The Combiner Function

The optional combiner function is called only if you're running the gatherer in parallel. The combiner function is a lambda expression that contains two parameters, which represent two private state objects.

        @Override
        public BinaryOperator<List<Integer>> combiner() {
            return (leftMax, rightMax) -> {
                
                // If either the "left" or "right" ArrayLists contain
                // no value, then return the other
                if (leftMax.isEmpty()) return rightMax;
                if (rightMax.isEmpty()) return leftMax;
                
                // Return the ArrayList that contains the larger integer
                int leftVal = leftMax.getFirst();
                int rightVal = rightMax.getFirst();
                if (leftVal > rightVal) return leftMax;
                else return rightMax;
            };
        }

This example returns the private state object (an ArrayList) that contains the largest integer.

The Finisher Function

The optional finisher function is a lambda expression that contains two parameters:

        @Override
        public BiConsumer<List<Integer>, Downstream<? super Integer>> finisher() {
            
            // Emit the largest integer, if there is one, downstream
            return (max, downstream) -> {
                if (!max.isEmpty()) {
                    downstream.push(max.getFirst());
                }
            };
        }

The parameter max is the private state object and downstream is a Gatherer.Downstream object.

In this example, the finisher function pushes the value contained in the private state object. Note that this value won't be pushed if the integrator function returned false. You can check whether a Downstream object is no longer processing input elements by calling the method Gatherer.Downstream::isRejecting. If it returns true, it's no longer processing input elements.

Note:

If the finisher function pushes a value downstream, then that value is contained in an Optional object.

Creatomg Gatherers with Factory Methods

Instead of implementing the Gatherer interface, you can call one of the factory methods in the Gatherer interface to create a gatherer.

The following example is the same one as described in Creating a Gatherer except it calls the Gatherer::of method:

    static Gatherer<Integer, List<Integer>, Integer> biggestInt(int limit) {
    
        return Gatherer.of(
        
            // Supplier
            
            () -> { return new ArrayList<Integer>(1); },
            
            // Integrator
            
            Gatherer.Integrator.of(
                (max, element, downstream) -> {
                    System.out.println("Processing " + element);
                    if (max.isEmpty()) max.addFirst(element);
                    else if (element > max.getFirst()) max.set(0, element);
                
                    if (element >= limit) {
                        downstream.push(element);
                        return false;
                    }
                    return true;
                }            
            ),
            
            // Combiner
            
            (leftMax, rightMax) -> {
                if (leftMax.isEmpty()) return rightMax;
                if (rightMax.isEmpty()) return leftMax;
                int leftVal = leftMax.getFirst();
                int rightVal = rightMax.getFirst();
                if (leftVal > rightVal) return leftMax;
                else return rightMax;
            },
            
            // Finisher
            
            (max, downstream) -> {
                if (!max.isEmpty()) {
                    downstream.push(max.getFirst());
                }
            }
        );
    }

You can call this gatherer as follows:

    System.out.println(Stream.of(5,4,2,1,6,12,8,9)
                             .gather(biggestInt(11))
                             .parallel()
                             .findFirst()
                             .get());

Built-In Gatherers

The Gatherers class contains the following built-in gatherers:

  • fold(Supplier initial, BiFunction folder): This is an many-to-one gatherer that constructs an aggregate incrementally until no more input elements exist. It has two parameters:

    • initial: This is the identity value or the value that the gatherer emits if the input stream contains no elements.
    • folder: This is a lambda expression that contains two parameters: the first is the aggregate the gatherer is constructing and the second is the element that's currently being processed.

    The following example uses the fold gatherer to convert a stream of numbers to a semicolon-separated string:

        var semicolonSeparated =
            Stream.of(1,2,3,4,5,6,7,8,9)
                  .gather(
                       Gatherers.fold(
                           () -> "",
                           (result, element) -> {
                               if (result.equals("")) return element.toString();
                               return result + ";" + element;
                           }
                       )
                   )
                  .findFirst()
                  .get();
        
        System.out.println(semicolonSeparated);

    It prints the following:

    1;2;3;4;5;6;7;8;9
  • mapConcurrent(int maxConcurrency, Function mapper): This is a one-to-one gatherer that invokes mapper for each input element in the stream concurrently, up to the limit specified by maxConcurrency. You can use this limit for the following:

    • As a rate-limiting construct to prevent the gatherer from issuing too many concurrent requests to things like an external service or a database
    • As a performance-enhancer to enable multiple, separate operations to be performed concurrenty while avoiding converting the entire stream into a parallel stream

    This gatherer preserves the ordering of the stream.

  • scan(Supplier initial, BiFunction scanner): This is a one-to-one gatherer that performs a prefix scan, which is an incremental accumulation. Starting with an initial value obtained from the parameter initial, it obtains subsequent values by applying scanner to the current value and the next input element. The gatherer then emits the value downstream. The following example demonstrates this gatherer:

        Stream.of(1,2,3,4,5,6,7,8,9)
              .gather(Gatherers.scan(() -> 100,
                                     (current, next) -> current + next))
              .forEach(System.out::println);

    It prints the following output:

    101
    103
    106
    110
    115
    121
    128
    136
    145
  • windowFixed(int windowSize): This is a many-to-many gatherer that gathers elements in windows, which are encounter-ordered groups of elements. The parameter windowSize specifies the size of the windows. The following example demonstrates this gatherer:

        List<List<Integer>> windows =
            Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList();
        windows.forEach(System.out::println);

    It prints the following output:

    [1, 2, 3]
    [4, 5, 6]
    [7, 8]
  • windowSliding(int windowSize): Similar to windowFixed, this is a many-to-many gatherer that gathers elements in windows. However, each subsequent window includes all elements of the previous window except for its first element, and adds the next element in the stream. The following example demonstrates this gatherer:

        List<List<Integer>> moreWindows =
            Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(3)).toList();
        moreWindows.forEach(System.out::println);

    It prints the following output:

    [1, 2, 3]
    [2, 3, 4]
    [3, 4, 5]
    [4, 5, 6]
    [5, 6, 7]
    [6, 7, 8]

Composing Gatherers

You can compose two or more gatherers into a single gatherer with the Gatherer.andThen(Gatherer).

The following example composes a new gatherer with the scan and fold gatherers examples as described in the previous section Built-In Gatherers:

    Gatherer<Integer, ?, Integer> sc =
        Gatherers.scan(() -> 100,
                       (current, next) -> current + next);
                       
    Gatherer<Integer, ?, String> fo =
        Gatherers.fold(() -> "",
                       (result, element) -> {
                           if (result.equals("")) return element.toString();
                           return result + ";" + element;
                       });

    var t = Stream.of(1,2,3,4,5,6,7,8,9)
          .gather(sc.andThen(fo))
          .findFirst()
          .get();
          
    System.out.println(t);   

This example prints the following output:

101;103;106;110;115;121;128;136;145

Note that the following statement to generate the value of t is the same as the previous example:

    var t = Stream.of(1,2,3,4,5,6,7,8,9)
          .gather(sc)
          .gather(fo)
          .findFirst()
          .get();

Successively calling the gather method is the same as calling andThen(Gatherer). The following two statements are equivalent, where a and b are gatherers:

stream.gather(a).gather(b);
stream.gather(a.andThen(b));