Module java.base

Interface Gatherer<T,A,R>

Type Parameters:
T - the type of input elements to the gatherer operation
A - the potentially mutable state type of the gatherer operation (often hidden as an implementation detail)
R - the type of output elements from the gatherer operation

public interface Gatherer<T,A,R>
Gatherer is a preview API of the Java platform.
Programs can only use Gatherer when preview features are enabled.
Preview features may be removed in a future release, or upgraded to permanent features of the Java platform.
An intermediate operation that transforms a stream of input elements into a stream of output elements, optionally applying a final action when the end of the upstream is reached. The transformation may be stateless or stateful, and may buffer input before producing any output.

Gatherer operations can be performed either sequentially, or be parallelized -- if a combiner function is supplied.

There are many examples of gathering operations, including but not limited to: grouping elements into batches (windowing functions); de-duplicating consecutively similar elements; incremental accumulation functions (prefix scan); incremental reordering functions, etc. The class GatherersPREVIEW provides implementations of common gathering operations.

API Note:

A Gatherer is specified by four functions that work together to process input elements, optionally using intermediate state, and optionally perform a final action at the end of input. They are:

Each invocation of initializer(), integrator(), combiner(), and finisher() must return a semantically identical result.

Implementations of Gatherer must not capture, retain, or expose to other threads, the references to the state instance, or the downstream Gatherer.DownstreamPREVIEW for longer than the invocation duration of the method which they are passed to.

Performing a gathering operation with a Gatherer should produce a result equivalent to:

Gatherer.Downstream<? super R> downstream = ...;
A state = gatherer.initializer().get();
for (T t : data) {
    gatherer.integrator().integrate(state, t, downstream);
}
gatherer.finisher().accept(state, downstream);

However, the library is free to partition the input, perform the integrations on the partitions, and then use the combiner function to combine the partial results to achieve a gathering operation. (Depending on the specific gathering operation, this may perform better or worse, depending on the relative cost of the integrator and combiner functions.)

In addition to the predefined implementations in GatherersPREVIEW, the static factory methods of(...) and ofSequential(...) can be used to construct gatherers. For example, you could create a gatherer that implements the equivalent of Stream.map(java.util.function.Function) with:

public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {
    return Gatherer.of(
        (unused, element, downstream) -> // integrator
            downstream.push(mapper.apply(element))
    );
}

Gatherers are designed to be composed; two or more Gatherers can be composed into a single Gatherer using the andThen(Gatherer) method.

// using the implementation of `map` as seen above
Gatherer<Integer, ?, Integer> increment = map(i -> i + 1);

Gatherer<Object, ?, String> toString = map(i -> i.toString());

Gatherer<Integer, ?, String> incrementThenToString = increment.andThen(toString);

As an example, a Gatherer implementing a sequential Prefix Scan could be done the following way:

public static <T, R> Gatherer<T, ?, R> scan(
    Supplier<R> initial,
    BiFunction<? super R, ? super T, ? extends R> scanner) {

    class State {
        R current = initial.get();
    }

    return Gatherer.<T, State, R>ofSequential(
         State::new,
         Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
             state.current = scanner.apply(state.current, element);
             return downstream.push(state.current);
         })
    );
}

Example of usage:

// will contain: ["1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789"]
List<String> numberStrings =
    Stream.of(1,2,3,4,5,6,7,8,9)
          .gather(
              scan(() -> "", (string, number) -> string + number)
           )
          .toList();

Implementation Requirements:
Libraries that implement transformations based on Gatherer, such as Stream.gather(Gatherer)PREVIEW, must adhere to the following constraints:
  • Gatherers whose initializer is defaultInitializer() are considered to be stateless, and invoking their initializer is optional.
  • Gatherers whose integrator is an instance of Gatherer.Integrator.GreedyPREVIEW can be assumed not to short-circuit, and the return value of invoking Gatherer.Integrator.integrate(Object, Object, Downstream)PREVIEW does not need to be inspected.
  • The first argument passed to the integration function, both arguments passed to the combiner function, and the argument passed to the finisher function must be the result of a previous invocation of the initializer or combiner functions.
  • The implementation should not do anything with the result of any of the initializer or combiner functions other than to pass them again to the integrator, combiner, or finisher functions.
  • Once a state object is passed to the combiner or finisher function, it is never passed to the integrator function again.
  • When the integrator function returns false, it shall be interpreted just as if there were no more elements to pass it.
  • For parallel evaluation, the gathering implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after integration is complete for both partitions.
  • Gatherers whose combiner is defaultCombiner() may only be evaluated sequentially. All other combiners allow the operation to be parallelized by initializing each partition in separation, invoking the integrator until it returns false, and then joining each partitions state using the combiner, and then invoking the finisher on the joined state. Outputs and state later in the input sequence will be discarded if processing an earlier partition short-circuits.
  • Gatherers whose finisher is defaultFinisher() are considered to not have an end-of-stream hook and invoking their finisher is optional.
Since:
22
See Also: