Contents

All three operators apply an async function that returns a publisher for each input item. The difference is how they handle concurrency and what happens when a new item arrives before the previous inner publisher has finished.

OperatorConcurrencyOrder GuaranteedUse When
flatMapConcurrent (all inner publishers run in parallel)NoIndependent async calls where order doesn't matter
concatMapSequential (waits for each inner publisher to complete)YesOrder matters, or inner publishers have side effects
switchMapCancels previous inner publisher on new itemOnly latestSearch-as-you-type, only the most recent request matters
// flatMap — concurrent, results may arrive out of order Flux.just("user-1", "user-2", "user-3") .flatMap(id -> userService.findById(id) // all 3 calls run at once .subscribeOn(Schedulers.boundedElastic())) .subscribe(user -> System.out.println("Got: " + user)); // flatMap with concurrency limit — max 2 concurrent inner subscriptions Flux.range(1, 100) .flatMap(id -> fetchOrder(id), 2) // at most 2 in-flight .subscribe(System.out::println); // concatMap — sequential, guaranteed order Flux.just("step-1", "step-2", "step-3") .concatMap(step -> executeStep(step)) // step-1 completes, then step-2, then step-3 .subscribe(result -> System.out.println("Done: " + result)); // switchMap — cancels previous on new item (perfect for live search) searchInput // Flux of keystroke events .debounce(Duration.ofMillis(300)) .switchMap(query -> searchService.search(query)) // cancels previous search .subscribe(results -> displayResults(results));

Reactor provides four distinct operators for combining multiple publishers. Each has different semantics around timing and what triggers emission of a combined element.

// zip — pairs items by position, waits for all publishers to emit // Completes when the shortest publisher completes Mono<User> userMono = userService.findById(1L); Mono<Account> accountMono = accountService.findByUserId(1L); Mono<UserProfile> profile = Mono.zip(userMono, accountMono) .map(tuple -> new UserProfile(tuple.getT1(), tuple.getT2())); // zip up to 8 publishers — use Tuples Mono.zip(fetchName(), fetchEmail(), fetchRole()) .map(t -> new UserDTO(t.getT1(), t.getT2(), t.getT3())) .subscribe(System.out::println); // merge — interleaves items from multiple Flux in emission order (concurrent) Flux<String> fast = Flux.interval(Duration.ofMillis(100)).map(i -> "fast-" + i); Flux<String> slow = Flux.interval(Duration.ofMillis(300)).map(i -> "slow-" + i); Flux.merge(fast, slow) .take(10) .subscribe(System.out::println); // fast-0, fast-1, fast-2, slow-0, ... // concat — subscribes to publishers sequentially (second starts when first ends) Flux<String> first = Flux.just("a", "b", "c"); Flux<String> second = Flux.just("d", "e", "f"); Flux.concat(first, second) .subscribe(System.out::println); // a, b, c, d, e, f (guaranteed order) // combineLatest — emits whenever any publisher emits, combining with latest from others Flux<String> stockPrice = priceStream(); // emits on price change Flux<String> currency = fxStream(); // emits on FX rate change Flux.combineLatest(stockPrice, currency, (price, fx) -> convertPrice(price, fx)) .subscribe(System.out::println);

buffer collects items into List batches and emits each complete batch downstream — useful for bulk database inserts or batched API calls. window does the same but emits a Flux per batch instead of a List, letting you start processing a window before it closes.

// buffer by count — collect into batches of 10 Flux.range(1, 100) .buffer(10) // emits List<Integer> of 10 items each .subscribe(batch -> bulkInsert(batch)); // 10 lists of 10 // buffer by time — collect everything received within 1 second Flux.interval(Duration.ofMillis(200)) .buffer(Duration.ofSeconds(1)) // batches: [0,1,2,3,4], [5,6,7,8,9], ... .subscribe(System.out::println); // buffer by count OR time — whichever comes first Flux.interval(Duration.ofMillis(200)) .bufferTimeout(10, Duration.ofSeconds(1)) .subscribe(System.out::println); // window by count — each window is a Flux (start processing before window closes) Flux.range(1, 20) .window(5) .flatMap(windowFlux -> windowFlux .reduce(0, Integer::sum)) // sum each window of 5 .subscribe(System.out::println); // 15, 40, 65, 90 // Sliding window — overlapping windows Flux.range(1, 10) .buffer(3, 1) // window size 3, step 1 → [1,2,3], [2,3,4], [3,4,5], ... .subscribe(System.out::println);

groupBy partitions a Flux into multiple inner Flux streams, one per key. Each inner GroupedFlux carries its key and must be subscribed to (usually via flatMap) or items will be buffered indefinitely.

// Group orders by status and count each group Flux.fromIterable(orders) .groupBy(Order::getStatus) .flatMap(group -> group .count() .map(count -> group.key() + ": " + count)) .subscribe(System.out::println); // PENDING: 12, CONFIRMED: 8, SHIPPED: 25 // Group incoming events by type and process each type differently eventFlux .groupBy(Event::getType) .flatMap(group -> { if (group.key() == EventType.PAYMENT) { return group.flatMap(paymentHandler::handle); } else if (group.key() == EventType.REFUND) { return group.flatMap(refundHandler::handle); } return group.flatMap(defaultHandler::handle); }) .subscribe(); Every GroupedFlux must be subscribed to (e.g., via flatMap). If you ignore a group, its items buffer in memory until the upstream completes — causing memory leaks on long-running streams. Always use flatMap or concatMap directly on the groupBy result.

reduce aggregates all items into a single value and emits it when the sequence completes — like a stream reduce(). scan emits the running accumulator value after each item — useful for running totals or state machines that emit intermediate results.

// reduce — sum all items, emit final result when stream completes Flux.range(1, 10) .reduce(0, Integer::sum) // Mono<Integer> .subscribe(System.out::println); // 55 // reduce — collect into a map Flux.fromIterable(orders) .reduce(new HashMap<String, Long>(), (acc, order) -> { acc.merge(order.getCustomerId(), 1L, Long::sum); return acc; }) .subscribe(map -> System.out.println("Order counts: " + map)); // scan — emit running total after each item (Flux, not Mono) Flux.range(1, 5) .scan(0, Integer::sum) .subscribe(System.out::println); // 0, 1, 3, 6, 10, 15 // scan as a state machine — accumulate event log Flux.fromIterable(events) .scan(OrderState.CREATED, (state, event) -> state.apply(event)) .subscribe(state -> System.out.println("State: " + state));

The doOn* operators let you attach side effects (logging, metrics, debugging) at specific lifecycle points without modifying the stream. They receive the item or signal and return void — the original value passes through unchanged.

Flux.range(1, 5) .doOnSubscribe(sub -> log.info("Subscribed")) .doOnNext(item -> log.debug("Received: {}", item)) .doOnError(ex -> log.error("Error: {}", ex.getMessage())) .doOnComplete(() -> log.info("Stream completed")) .doOnTerminate(() -> log.info("Terminated (complete or error)")) .doOnCancel(() -> log.warn("Subscription cancelled")) .doFinally(signal -> log.info("Final signal: {}", signal)) // always runs .map(n -> n * 2) .subscribe(System.out::println); // doOnNext for metrics Flux.fromIterable(orders) .doOnNext(order -> meterRegistry.counter("orders.processed", "status", order.getStatus().name()).increment()) .flatMap(orderService::process) .subscribe();

Reactor integrates with time naturally — you can set deadlines on publishers, introduce artificial delays, and sample high-frequency streams to reduce processing load.

// timeout — error if no item arrives within the duration Mono.fromCallable(() -> slowRemoteCall()) .timeout(Duration.ofSeconds(3)) .onErrorReturn("fallback-value") .subscribe(System.out::println); // timeout with fallback publisher instead of error Mono.fromCallable(() -> primaryService.fetch()) .timeout(Duration.ofSeconds(2), Mono.fromCallable(() -> fallbackService.fetch())) .subscribe(System.out::println); // delayElements — introduce a pause between items Flux.just("email-1", "email-2", "email-3") .delayElements(Duration.ofMillis(500)) // 500ms between each send .flatMap(emailService::send) .subscribe(); // delaySubscription — wait before even starting Flux.range(1, 5) .delaySubscription(Duration.ofSeconds(1)) // start 1s from now .subscribe(System.out::println); // sample — emit most recent item in each time window (lossy) Flux.interval(Duration.ofMillis(100)) // emits every 100ms .sample(Duration.ofSeconds(1)) // emit latest value every 1s .subscribe(System.out::println); // 0 items per second sampled to 1 // throttleFirst — emit first item in each window, drop the rest Flux.interval(Duration.ofMillis(100)) .sample(Flux.interval(Duration.ofSeconds(1))) .subscribe(System.out::println);
GoalOperatorNotes
Transform each item (sync)map1-to-1, no async work
Async transform, order unimportantflatMapRuns concurrently
Async transform, order requiredconcatMapSequential, slower
Only last async result mattersswitchMapCancels previous
Combine two Mono resultsMono.zipWaits for both
Merge concurrent streamsFlux.mergeInterleaved by time
Append streams in orderFlux.concatSequential, ordered
Batch items for bulk opsbufferEmits List<T>
Running total / state machinescanEmits each accumulation
Final aggregatereduceEmits single Mono
Deadline on slow sourcetimeoutErrors or falls back