15 Stream Gatherers
Stream gatherers enable you to create custom intermediate operations, which enables stream pipelines to transform data in ways that aren't easily achievable with exisiting built-in intermediate operations.
This is a preview feature. A preview feature is a feature whose design, specification, and implementation are complete, but is not permanent. A preview feature may exist in a different form or not at all in future Java SE releases. To compile and run code that contains preview features, you must specify additional command-line options. See Preview Language and VM Features.For background information about stream gatherers, see JEP 461.
What Is a Gatherer?
A gatherer is an intermediate operation that transforms a stream of input elements into a stream of output elements, optionally applying a final action when it reaches the end of the stream of input elements.
Remember that an intermediate operation, such as Stream.map(Function), produces a new stream, while a terminal operation, such as Stream.forEach(Consumer), produces a non-stream result. A non-stream result could be a primitive value (like a double value), a collection, or in the case of forEach, no value at all.
Gatherers can do the following:
- Transform elements in a one-to-one, one-to-many, many-to-one, or many-to-many fashion
- Track previously seen elements to influence the transformation of later elements
- Short-circuit, or stop processing input elements to transform infinite streams to finite ones
- Process a stream in parallel
A gather will process a stream in parallel only if you specify a combiner function when you create the gatherer. See The Combiner Function in Creating a Gatherer. A gatherer's default combiner turns parallelization off even if you callparallel()
Examples of gathering operations include the following:
- Grouping elements into batches
- Deduplicating consecutively similar elements
- Incremental accumulation functions
- Incremental reordering functions
Creating a Gatherer
To create a gatherer, implement the Gatherer interface.
The following example creates a gatherer that returns the largest integer from a
stream of integers. However, if the gatherer encounters an integer equal or larger to
its argument limit
, then it returns that integer and stops processing
the stream's integers.
record BiggestInt(int limit) implements Gatherer<Integer, List<Integer>, Integer> {
// The initializer creates a new private ArrayList to keep track of the
// largest integer across elements.
public Supplier<List<Integer>> initializer() {
return () -> new ArrayList<Integer>(1);
// The integrator
public Integrator<List<Integer>, Integer, Integer> integrator() {
return Integrator.of(
(max, element, downstream) -> {
// Save the integer if it's the largest so far.
if (max.isEmpty()) max.addFirst(element);
else if (element > max.getFirst()) max.set(0, element);
// If the integer is equal or greater to the limit,
// "short-circuit": emit the current integer downstream
// and return false to stop processing stream elements
if (element >= limit) {
return false;
// Return true to continue processing stream elements
return true;
// The combiner, which is used during parallel evaluation
public BinaryOperator<List<Integer>> combiner() {
return (leftMax, rightMax) -> {
// If either the "left" or "right" ArrayLists contain
// no value, then return the other
if (leftMax.isEmpty()) return rightMax;
if (rightMax.isEmpty()) return leftMax;
// Return the ArrayList that contains the larger integer
int leftVal = leftMax.getFirst();
int rightVal = rightMax.getFirst();
if (leftVal > rightVal) return leftMax;
else return rightMax;
public BiConsumer<List<Integer>, Downstream<? super Integer>> finisher() {
// Emit the largest integer, if there is one, downstream
return (max, downstream) -> {
if (!max.isEmpty()) {
You can use this gather as follows:
.gather(new BiggestInt(11))
It prints the following output:
You can also use this gatherer in parallel:
.gather(new BiggestInt(11))
The Gatherer<T,A,R>
interface has three type parameters:
: The type of input elements to the gather operation. This example process a stream of Integer elements.A
: The type of the gatherer's private state object, which the gatherer can use to track previously seen elements to influence the transformation of later elements. This example uses a List<Integer> to store the largestInteger
it has encountered so far in the input stream.R
: The type of output elements from the gatherer operation. This example returns anInteger
You create a gatherer by defining four functions that work together that process input elements. Some of these functions are optional depending on your gatherer's operation:
- initializer(): Creates the gatherer's private state object
- integrator(): Integrates a new element from the input stream, possibly inspects the private state object, and possibly emits elements to the output stream
- combiner(): Combines two private state objects into one when the gatherer is processing the stream in parallel
: Optionally performs an action after the gatherer has processed all input elements; it could inspect the private state object or emit additional output elements
The Initializer Function
The optional initializer function creates the gatherer's private state
object. This example creates an empty ArrayList
with a capacity of
only one Integer
as its meant to store the largest
the gatherer has encountered so far.
public Supplier<List<Integer>> initializer() {
return () -> new ArrayList<Integer>(1);
The Integrator Function
Every gatherer requires an integrator function. To create an integrator function, call either Gatherer.Integrator.of(Gatherer.Integrator) or Gatherer.Integrator.ofGreedy(Gatherer.Integrator). These methods take as an argument a lambda expression that contains three parameters. This example uses the following lamda expression:
(max, element, downstream) -> {
// Save the integer if it's the largest so far.
if (max.isEmpty()) max.addFirst(element);
else if (element > max.getFirst()) max.set(0, element);
// If the integer is equal or greater to the limit,
// "short-circuit": emit the current integer downstream
// and return false to stop processing stream elements
if (element >= limit) {
return false;
// Return true to continue processing stream elements
return true;
The parameter max
is the private state object.
The parameter element
is the input element that the integrator
function is currently processing.
The parameter downstream
is a Gatherer.Downstream object. When you call its
method, it passes its argument to the next stage in the
An integrator function returns a boolean
value. If it returns
, then it will process the next element of the input
stream. if it returns false
, then it will short-circuit and stop
processing input elements.
method returns
if the downstream is willing to push additional elements,
so your integrator function can return its return value if you want to continue
processing stream elements.
In this example, if element
is equal or greather than
, the integrator function passes element
to the next stage in the pipeline, then returns false
. The
integrator won't process any more input elements, and the
Downstream object can no longer push values.
If you don't expect your integrator function to short-circuit and you want it to process all elements of your input stream, use Integrator::ofGreedy instead of Integrator::of.The Combiner Function
The optional combiner function is called only if you're running the gatherer in parallel. The combiner function is a lambda expression that contains two parameters, which represent two private state objects.
public BinaryOperator<List<Integer>> combiner() {
return (leftMax, rightMax) -> {
// If either the "left" or "right" ArrayLists contain
// no value, then return the other
if (leftMax.isEmpty()) return rightMax;
if (rightMax.isEmpty()) return leftMax;
// Return the ArrayList that contains the larger integer
int leftVal = leftMax.getFirst();
int rightVal = rightMax.getFirst();
if (leftVal > rightVal) return leftMax;
else return rightMax;
This example returns the private state object (an ArrayList) that contains the largest integer.
The Finisher Function
The optional finisher function is a lambda expression that contains two parameters:
public BiConsumer<List<Integer>, Downstream<? super Integer>> finisher() {
// Emit the largest integer, if there is one, downstream
return (max, downstream) -> {
if (!max.isEmpty()) {
The parameter max
is the private state object and
is a Gatherer.Downstream
In this example, the finisher function pushes the value contained in the
private state object. Note that this value won't be pushed if the integrator
function returned false
. You can check whether a Downstream object is no longer processing input elements
by calling the method Gatherer.Downstream::isRejecting. If it returns
, it's no longer processing input elements.
If the finisher function pushes a value downstream, then that value is contained in anOptional
Creating Gatherers with Factory Methods
Instead of implementing the Gatherer interface, you can call one of the factory methods in the Gatherer interface to create a gatherer.
The following example is the same one as described in Creating a Gatherer except it calls the Gatherer::of method:
static Gatherer<Integer, List<Integer>, Integer> biggestInt(int limit) {
return Gatherer.of(
// Supplier
() -> { return new ArrayList<Integer>(1); },
// Integrator
(max, element, downstream) -> {
System.out.println("Processing " + element);
if (max.isEmpty()) max.addFirst(element);
else if (element > max.getFirst()) max.set(0, element);
if (element >= limit) {
return false;
return true;
// Combiner
(leftMax, rightMax) -> {
if (leftMax.isEmpty()) return rightMax;
if (rightMax.isEmpty()) return leftMax;
int leftVal = leftMax.getFirst();
int rightVal = rightMax.getFirst();
if (leftVal > rightVal) return leftMax;
else return rightMax;
// Finisher
(max, downstream) -> {
if (!max.isEmpty()) {
You can call this gatherer as follows:
Built-In Gatherers
The Gatherers class contains the following built-in gatherers:
fold(Supplier initial, BiFunction folder): This is an many-to-one gatherer that constructs an aggregate incrementally until no more input elements exist. It has two parameters:
: This is the identity value or the value that the gatherer emits if the input stream contains no elements.folder
: This is a lambda expression that contains two parameters: the first is the aggregate the gatherer is constructing and the second is the element that's currently being processed.
The following example uses the fold gatherer to convert a stream of numbers to a semicolon-separated string:
var semicolonSeparated = Stream.of(1,2,3,4,5,6,7,8,9) .gather( Gatherers.fold( () -> "", (result, element) -> { if (result.equals("")) return element.toString(); return result + ";" + element; } ) ) .findFirst() .get(); System.out.println(semicolonSeparated);
It prints the following:
mapConcurrent(int maxConcurrency, Function mapper): This is a one-to-one gatherer that invokes
for each input element in the stream concurrently, up to the limit specified bymaxConcurrency
. You can use this limit for the following:- As a rate-limiting construct to prevent the gatherer from issuing too many concurrent requests to things like an external service or a database
- As a performance-enhancer to enable multiple, separate operations to be performed concurrenty while avoiding converting the entire stream into a parallel stream
This gatherer preserves the ordering of the stream.
scan(Supplier initial, BiFunction scanner): This is a one-to-one gatherer that performs a prefix scan, which is an incremental accumulation. Starting with an initial value obtained from the parameter
, it obtains subsequent values by applyingscanner
to the current value and the next input element. The gatherer then emits the value downstream. The following example demonstrates this gatherer:Stream.of(1,2,3,4,5,6,7,8,9) .gather(Gatherers.scan(() -> 100, (current, next) -> current + next)) .forEach(System.out::println);
It prints the following output:
101 103 106 110 115 121 128 136 145
windowFixed(int windowSize): This is a many-to-many gatherer that gathers elements in windows, which are encounter-ordered groups of elements. The parameter
specifies the size of the windows. The following example demonstrates this gatherer:List<List<Integer>> windows = Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList(); windows.forEach(System.out::println);
It prints the following output:
[1, 2, 3] [4, 5, 6] [7, 8]
windowSliding(int windowSize): Similar to windowFixed, this is a many-to-many gatherer that gathers elements in windows. However, each subsequent window includes all elements of the previous window except for its first element, and adds the next element in the stream. The following example demonstrates this gatherer:
List<List<Integer>> moreWindows = Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(3)).toList(); moreWindows.forEach(System.out::println);
It prints the following output:
[1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 6] [5, 6, 7] [6, 7, 8]
Composing Gatherers
You can compose two or more gatherers into a single gatherer with the Gatherer.andThen(Gatherer).
The following example composes a new gatherer with the scan
gatherers examples as described in the previous section Built-In Gatherers:
Gatherer<Integer, ?, Integer> sc =
Gatherers.scan(() -> 100,
(current, next) -> current + next);
Gatherer<Integer, ?, String> fo =
Gatherers.fold(() -> "",
(result, element) -> {
if (result.equals("")) return element.toString();
return result + ";" + element;
var t = Stream.of(1,2,3,4,5,6,7,8,9)
This example prints the following output:
Note that the following statement to generate the value of t
is the same as the previous example:
var t = Stream.of(1,2,3,4,5,6,7,8,9)
Successively calling the gather
method is the same as calling
. The following two statements are equivalent,
where a
and b
are gatherers: