Contents
- flatMap vs concatMap vs switchMap
- Combining Streams — zip, merge, concat & combineLatest
- Batching — buffer & window
- groupBy — Splitting by Key
- Aggregation — reduce & scan
- Side-Effect Operators — doOn*
- Time-Based Operators — timeout, delayElements & sample
- Choosing the Right Operator
All three operators apply an async function that returns a publisher for each input item. The difference is how they handle concurrency and what happens when a new item arrives before the previous inner publisher has finished.
| Operator | Concurrency | Order Guaranteed | Use When |
| flatMap | Concurrent (all inner publishers run in parallel) | No | Independent async calls where order doesn't matter |
| concatMap | Sequential (waits for each inner publisher to complete) | Yes | Order matters, or inner publishers have side effects |
| switchMap | Cancels previous inner publisher on new item | Only latest | Search-as-you-type, only the most recent request matters |
// flatMap — concurrent, results may arrive out of order
Flux.just("user-1", "user-2", "user-3")
.flatMap(id -> userService.findById(id) // all 3 calls run at once
.subscribeOn(Schedulers.boundedElastic()))
.subscribe(user -> System.out.println("Got: " + user));
// flatMap with concurrency limit — max 2 concurrent inner subscriptions
Flux.range(1, 100)
.flatMap(id -> fetchOrder(id), 2) // at most 2 in-flight
.subscribe(System.out::println);
// concatMap — sequential, guaranteed order
Flux.just("step-1", "step-2", "step-3")
.concatMap(step -> executeStep(step)) // step-1 completes, then step-2, then step-3
.subscribe(result -> System.out.println("Done: " + result));
// switchMap — cancels previous on new item (perfect for live search)
searchInput // Flux of keystroke events
.debounce(Duration.ofMillis(300))
.switchMap(query -> searchService.search(query)) // cancels previous search
.subscribe(results -> displayResults(results));
Reactor provides four distinct operators for combining multiple publishers. Each has different semantics around timing and what triggers emission of a combined element.
// zip — pairs items by position, waits for all publishers to emit
// Completes when the shortest publisher completes
Mono<User> userMono = userService.findById(1L);
Mono<Account> accountMono = accountService.findByUserId(1L);
Mono<UserProfile> profile = Mono.zip(userMono, accountMono)
.map(tuple -> new UserProfile(tuple.getT1(), tuple.getT2()));
// zip up to 8 publishers — use Tuples
Mono.zip(fetchName(), fetchEmail(), fetchRole())
.map(t -> new UserDTO(t.getT1(), t.getT2(), t.getT3()))
.subscribe(System.out::println);
// merge — interleaves items from multiple Flux in emission order (concurrent)
Flux<String> fast = Flux.interval(Duration.ofMillis(100)).map(i -> "fast-" + i);
Flux<String> slow = Flux.interval(Duration.ofMillis(300)).map(i -> "slow-" + i);
Flux.merge(fast, slow)
.take(10)
.subscribe(System.out::println); // fast-0, fast-1, fast-2, slow-0, ...
// concat — subscribes to publishers sequentially (second starts when first ends)
Flux<String> first = Flux.just("a", "b", "c");
Flux<String> second = Flux.just("d", "e", "f");
Flux.concat(first, second)
.subscribe(System.out::println); // a, b, c, d, e, f (guaranteed order)
// combineLatest — emits whenever any publisher emits, combining with latest from others
Flux<String> stockPrice = priceStream(); // emits on price change
Flux<String> currency = fxStream(); // emits on FX rate change
Flux.combineLatest(stockPrice, currency,
(price, fx) -> convertPrice(price, fx))
.subscribe(System.out::println);
buffer collects items into List batches and emits each complete batch downstream — useful for bulk database inserts or batched API calls. window does the same but emits a Flux per batch instead of a List, letting you start processing a window before it closes.
// buffer by count — collect into batches of 10
Flux.range(1, 100)
.buffer(10) // emits List<Integer> of 10 items each
.subscribe(batch -> bulkInsert(batch)); // 10 lists of 10
// buffer by time — collect everything received within 1 second
Flux.interval(Duration.ofMillis(200))
.buffer(Duration.ofSeconds(1)) // batches: [0,1,2,3,4], [5,6,7,8,9], ...
.subscribe(System.out::println);
// buffer by count OR time — whichever comes first
Flux.interval(Duration.ofMillis(200))
.bufferTimeout(10, Duration.ofSeconds(1))
.subscribe(System.out::println);
// window by count — each window is a Flux (start processing before window closes)
Flux.range(1, 20)
.window(5)
.flatMap(windowFlux -> windowFlux
.reduce(0, Integer::sum)) // sum each window of 5
.subscribe(System.out::println); // 15, 40, 65, 90
// Sliding window — overlapping windows
Flux.range(1, 10)
.buffer(3, 1) // window size 3, step 1 → [1,2,3], [2,3,4], [3,4,5], ...
.subscribe(System.out::println);
groupBy partitions a Flux into multiple inner Flux streams, one per key. Each inner GroupedFlux carries its key and must be subscribed to (usually via flatMap) or items will be buffered indefinitely.
// Group orders by status and count each group
Flux.fromIterable(orders)
.groupBy(Order::getStatus)
.flatMap(group -> group
.count()
.map(count -> group.key() + ": " + count))
.subscribe(System.out::println);
// PENDING: 12, CONFIRMED: 8, SHIPPED: 25
// Group incoming events by type and process each type differently
eventFlux
.groupBy(Event::getType)
.flatMap(group -> {
if (group.key() == EventType.PAYMENT) {
return group.flatMap(paymentHandler::handle);
} else if (group.key() == EventType.REFUND) {
return group.flatMap(refundHandler::handle);
}
return group.flatMap(defaultHandler::handle);
})
.subscribe();
Every GroupedFlux must be subscribed to (e.g., via flatMap). If you ignore a group, its items buffer in memory until the upstream completes — causing memory leaks on long-running streams. Always use flatMap or concatMap directly on the groupBy result.
reduce aggregates all items into a single value and emits it when the sequence completes — like a stream reduce(). scan emits the running accumulator value after each item — useful for running totals or state machines that emit intermediate results.
// reduce — sum all items, emit final result when stream completes
Flux.range(1, 10)
.reduce(0, Integer::sum) // Mono<Integer>
.subscribe(System.out::println); // 55
// reduce — collect into a map
Flux.fromIterable(orders)
.reduce(new HashMap<String, Long>(), (acc, order) -> {
acc.merge(order.getCustomerId(), 1L, Long::sum);
return acc;
})
.subscribe(map -> System.out.println("Order counts: " + map));
// scan — emit running total after each item (Flux, not Mono)
Flux.range(1, 5)
.scan(0, Integer::sum)
.subscribe(System.out::println); // 0, 1, 3, 6, 10, 15
// scan as a state machine — accumulate event log
Flux.fromIterable(events)
.scan(OrderState.CREATED, (state, event) -> state.apply(event))
.subscribe(state -> System.out.println("State: " + state));
The doOn* operators let you attach side effects (logging, metrics, debugging) at specific lifecycle points without modifying the stream. They receive the item or signal and return void — the original value passes through unchanged.
Flux.range(1, 5)
.doOnSubscribe(sub -> log.info("Subscribed"))
.doOnNext(item -> log.debug("Received: {}", item))
.doOnError(ex -> log.error("Error: {}", ex.getMessage()))
.doOnComplete(() -> log.info("Stream completed"))
.doOnTerminate(() -> log.info("Terminated (complete or error)"))
.doOnCancel(() -> log.warn("Subscription cancelled"))
.doFinally(signal -> log.info("Final signal: {}", signal)) // always runs
.map(n -> n * 2)
.subscribe(System.out::println);
// doOnNext for metrics
Flux.fromIterable(orders)
.doOnNext(order -> meterRegistry.counter("orders.processed",
"status", order.getStatus().name()).increment())
.flatMap(orderService::process)
.subscribe();
Reactor integrates with time naturally — you can set deadlines on publishers, introduce artificial delays, and sample high-frequency streams to reduce processing load.
// timeout — error if no item arrives within the duration
Mono.fromCallable(() -> slowRemoteCall())
.timeout(Duration.ofSeconds(3))
.onErrorReturn("fallback-value")
.subscribe(System.out::println);
// timeout with fallback publisher instead of error
Mono.fromCallable(() -> primaryService.fetch())
.timeout(Duration.ofSeconds(2),
Mono.fromCallable(() -> fallbackService.fetch()))
.subscribe(System.out::println);
// delayElements — introduce a pause between items
Flux.just("email-1", "email-2", "email-3")
.delayElements(Duration.ofMillis(500)) // 500ms between each send
.flatMap(emailService::send)
.subscribe();
// delaySubscription — wait before even starting
Flux.range(1, 5)
.delaySubscription(Duration.ofSeconds(1)) // start 1s from now
.subscribe(System.out::println);
// sample — emit most recent item in each time window (lossy)
Flux.interval(Duration.ofMillis(100)) // emits every 100ms
.sample(Duration.ofSeconds(1)) // emit latest value every 1s
.subscribe(System.out::println); // 0 items per second sampled to 1
// throttleFirst — emit first item in each window, drop the rest
Flux.interval(Duration.ofMillis(100))
.sample(Flux.interval(Duration.ofSeconds(1)))
.subscribe(System.out::println);
| Goal | Operator | Notes |
| Transform each item (sync) | map | 1-to-1, no async work |
| Async transform, order unimportant | flatMap | Runs concurrently |
| Async transform, order required | concatMap | Sequential, slower |
| Only last async result matters | switchMap | Cancels previous |
| Combine two Mono results | Mono.zip | Waits for both |
| Merge concurrent streams | Flux.merge | Interleaved by time |
| Append streams in order | Flux.concat | Sequential, ordered |
| Batch items for bulk ops | buffer | Emits List<T> |
| Running total / state machine | scan | Emits each accumulation |
| Final aggregate | reduce | Emits single Mono |
| Deadline on slow source | timeout | Errors or falls back |