Contents
- Built-in Gatherers
- Windowing — Fixed and Sliding
- fold and scan — Running Aggregations
- Writing a Custom Gatherer
- Composing Gatherers
The java.util.stream.Gatherers class (Java 22+) provides factory methods for the most commonly needed custom intermediate operations: windowFixed(n) for non-overlapping batches, windowSliding(n) for overlapping windows, fold() for a single accumulated result, scan() for a running total at each step, and mapConcurrent(n, mapper) for concurrent mapping with a bounded parallelism level. These replace patterns that previously required custom Collector implementations or external libraries.
import java.util.stream.*;
import static java.util.stream.Gatherers.*;
// Gatherers class — built-in implementations
// windowFixed(n) — partition into non-overlapping windows of size n
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7)
.gather(windowFixed(3))
.toList();
System.out.println(windows);
// [[1, 2, 3], [4, 5, 6], [7]] — last window may be smaller
// windowSliding(n) — overlapping windows of size n
List<List<Integer>> sliding = Stream.of(1, 2, 3, 4, 5)
.gather(windowSliding(3))
.toList();
System.out.println(sliding);
// [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
// fold(initial, folder) — running aggregation (stateful reduce, emits one result)
// Equivalent to a stateful reduce, outputs the final accumulated value
int sum = Stream.of(1, 2, 3, 4, 5)
.gather(fold(() -> 0, (acc, x) -> acc + x))
.findFirst().orElse(0);
System.out.println(sum); // 15
// scan(initial, folder) — like fold but emits EACH intermediate state
List<Integer> runningSum = Stream.of(1, 2, 3, 4, 5)
.gather(scan(() -> 0, (acc, x) -> acc + x))
.toList();
System.out.println(runningSum); // [1, 3, 6, 10, 15]
// mapConcurrent(n, mapper) — concurrent map with at most n tasks in flight
// Preserves encounter order; uses virtual threads for concurrency
List<String> results = Stream.of("url1", "url2", "url3", "url4")
.gather(mapConcurrent(4, url -> fetch(url))) // 4 concurrent fetches
.toList();
Gatherers are finalized in Java 24 (JEP 485). They may also be available as preview in Java 22–23 with --enable-preview. Always check your JDK version before using gatherers in production.
windowFixed(n) groups elements into non-overlapping lists of exactly n elements (the final window may be smaller if elements do not divide evenly). windowSliding(n) produces overlapping windows where each successive window advances by one element — a window of [1,2,3] is followed by [2,3,4]. Fixed windows are ideal for batch processing and pagination. Sliding windows are ideal for moving averages, trend detection, and sequence analysis where context from adjacent elements matters.
// windowFixed — batch processing
// Perfect for: chunk API requests, process CSV in batches, paging
List<String> emails = getEmailList(); // 1000 emails
emails.stream()
.gather(windowFixed(50)) // groups of 50
.forEach(batch -> sendBulkEmail(batch)); // 20 API calls instead of 1000
// windowSliding — rolling window calculations
// Moving average of temperature readings
List<Double> temperatures = List.of(20.0, 22.0, 19.5, 23.0, 21.5, 24.0, 22.5);
List<Double> movingAvg = temperatures.stream()
.gather(windowSliding(3))
.map(window -> window.stream()
.mapToDouble(Double::doubleValue)
.average()
.orElse(0.0))
.toList();
System.out.println(movingAvg);
// [20.5, 21.5, 21.33, 22.83, 22.67]
// Consecutive duplicate groups (run-length encoding)
// windowSliding(2) + filter to find where consecutive elements differ
List<Integer> data = List.of(1, 1, 2, 2, 2, 3, 1, 1);
// Group consecutive identical elements using a custom gatherer (see below)
// windowFixed in parallel stream — note: parallel may produce different groupings
List<List<Integer>> parallelWindows = IntStream.rangeClosed(1, 100)
.boxed()
.parallel()
.gather(windowFixed(10))
.toList(); // may have windows from different threads — order not guaranteed
fold() behaves like reduce() integrated into a gather pipeline: it accumulates all elements into a single value and emits exactly one output element at the end of the stream. scan() is like fold() but emits the accumulated state after every element — producing a running total, running maximum, or any other prefix aggregation. Both take an initial-value supplier and a combiner function. scan() is particularly useful for computing balance sheets, cumulative statistics, or prefix-sum arrays in a single stream pass.
// scan — prefix sums, running min/max, cumulative products
// Running maximum
List<Integer> nums = List.of(3, 1, 4, 1, 5, 9, 2, 6);
List<Integer> runningMax = nums.stream()
.gather(scan(() -> Integer.MIN_VALUE, Math::max))
.toList();
System.out.println(runningMax); // [3, 3, 4, 4, 5, 9, 9, 9]
// Running concatenation
List<String> words = List.of("the", "quick", "brown", "fox");
List<String> sentences = words.stream()
.gather(scan(() -> "", (acc, w) -> acc.isEmpty() ? w : acc + " " + w))
.toList();
System.out.println(sentences);
// [the, the quick, the quick brown, the quick brown fox]
// fold — like reduce but always produces exactly one element in the output stream
// Useful when you want a single accumulated value at the end of a pipeline stage
Optional<String> combined = Stream.of("a", "b", "c")
.gather(fold(() -> new StringBuilder(),
(sb, s) -> { sb.append(s); return sb; }))
.map(StringBuilder::toString)
.findFirst();
System.out.println(combined.orElse("")); // "abc"
// Practical: running balance for bank transactions
record Transaction(String type, double amount) {}
List<Transaction> txns = List.of(
new Transaction("credit", 1000),
new Transaction("debit", 200),
new Transaction("credit", 500),
new Transaction("debit", 150)
);
List<Double> balance = txns.stream()
.map(t -> t.type().equals("credit") ? t.amount() : -t.amount())
.gather(scan(() -> 0.0, Double::sum))
.toList();
System.out.println(balance); // [1000.0, 800.0, 1300.0, 1150.0]
A custom gatherer implements Gatherer<T,A,R> with three type parameters: T is the input element type, A is the mutable state type (the accumulator), and R is the output element type. The three main components are: initializer() which creates the initial state, integrator() which processes each input element and optionally pushes output elements downstream, and finisher() which can emit final output from the accumulated state when the stream ends. For sequential-only gatherers that do not support parallelism, use Gatherer.ofSequential().
import java.util.stream.*;
import java.util.function.*;
// Gatherer<T, A, R> — input T, accumulator/state A, output R
// Components:
// initializer() — creates the mutable state A
// integrator() — processes each element T, updates state A, emits R via Downstream
// combiner() — merges two states for parallel operation (optional)
// finisher() — called at stream end to emit final state (optional)
// Example 1: distinct by key (like distinctByKey from common libraries)
static <T, K> Gatherer<T, ?, T> distinctByKey(Function<T, K> keyExtractor) {
return Gatherer.ofSequential(
HashSet::new, // initializer: empty set of seen keys
Gatherer.Integrator.ofGreedy((seen, element, downstream) -> {
K key = keyExtractor.apply(element);
if (seen.add(key)) { // if key not seen before
downstream.push(element); // emit the element
}
return true; // continue processing
})
);
}
// Usage
record Person(String name, String city) {}
List<Person> people = List.of(
new Person("Alice", "NYC"),
new Person("Bob", "LA"),
new Person("Carol", "NYC"),
new Person("Dave", "LA")
);
List<Person> firstPerCity = people.stream()
.gather(distinctByKey(Person::city))
.toList();
System.out.println(firstPerCity);
// [Person[name=Alice, city=NYC], Person[name=Bob, city=LA]]
// Example 2: take while consecutive (emit elements while condition holds on adjacent pair)
static Gatherer<Integer, ?, Integer> takeWhileIncreasing() {
return Gatherer.ofSequential(
() -> new int[]{Integer.MIN_VALUE}, // state: last seen value
Gatherer.Integrator.of((state, element, downstream) -> {
if (element > state[0]) {
state[0] = element;
return downstream.push(element); // continue if downstream wants more
}
return false; // stop — sequence no longer increasing
})
);
}
Stream.of(1, 3, 5, 4, 7, 9)
.gather(takeWhileIncreasing())
.forEach(System.out::print); // 135 — stops at 4 (not > 5)
andThen() chains two gatherers: the output of the first becomes the input of the second. This enables building complex stateful pipelines from small, focused, reusable pieces — similar to how Collector can be composed with collectingAndThen(). Multiple gatherers can also be applied as sequential gather() calls in the stream pipeline without andThen(); both approaches produce the same result but andThen() keeps related transformations together.
// Gatherers can be composed with andThen()
// Result: elements go through first gatherer, then second
// Sliding window of running sums
Gatherer<Integer, ?, List<Integer>> windowedScan =
Gatherers.scan(() -> 0, Integer::sum)
.andThen(Gatherers.windowSliding(3));
List<List<Integer>> result = Stream.of(1, 2, 3, 4, 5)
.gather(windowedScan)
.toList();
// scan first: [1, 3, 6, 10, 15]
// windowSliding(3): [[1,3,6], [3,6,10], [6,10,15]]
System.out.println(result);
// Chaining in a stream pipeline
List<String> processed = employeeStream
.gather(distinctByKey(Employee::department)) // first by department
.gather(Gatherers.windowFixed(3)) // batch into groups of 3
.filter(batch -> batch.size() == 3) // only full batches
.flatMap(Collection::stream)
.map(Employee::name)
.toList();
// Parallel-compatible gatherer must implement combiner()
// Sequential-only gatherer: Gatherer.ofSequential(...)
// Parallel-compatible: Gatherer.of(..., combiner, ...)
// Without combiner, parallel stream falls back to sequential for gather step
Use Gatherer.ofSequential() for stateful gatherers that are not safe to parallelize (like distinct-by-key with a shared set). For parallel-safe gatherers, implement a combiner that merges two independent state instances from different stream segments.