Contents
- Transforming: map, flatMap, concatMap
- Filtering & Defaulting: filter, switchIfEmpty, defaultIfEmpty
- Combining Streams: zip, merge, concat
- Error Handling: onErrorReturn, onErrorResume, retry
- Backpressure Strategies
- Schedulers — Threading Model
- Context Propagation
- Testing with StepVerifier
Choose the right transform operator based on whether the mapping function is synchronous or itself returns a reactive publisher, and whether order matters.
// map — synchronous 1-to-1 transform (stays on same thread)
Flux<String> upper = Flux.just("alice", "bob", "carol")
.map(String::toUpperCase); // "ALICE", "BOB", "CAROL"
// flatMap — async transform, subscriptions run concurrently (order NOT preserved)
Flux<User> users = Flux.just(1L, 2L, 3L)
.flatMap(id -> userService.findById(id)); // all 3 DB calls fire in parallel
// concatMap — async transform, subscriptions are sequential (order preserved)
Flux<Order> orders = Flux.just("u1", "u2", "u3")
.concatMap(userId -> orderService.getLatestOrder(userId)); // sequential, in order
// flatMapSequential — runs concurrently but re-orders output to preserve input order
Flux<Report> reports = ids.flatMapSequential(id -> reportService.generate(id));
flatMap concurrency can be limited with the concurrency parameter: flatMap(fn, 4) limits to 4 concurrent inner subscriptions — essential when calling downstream services to prevent overwhelming them.
// filter — pass only elements matching the predicate
Flux<Integer> evens = Flux.range(1, 10).filter(n -> n % 2 == 0);
// defaultIfEmpty — emit a fallback value if the source completes empty
Mono<User> user = userRepo.findById(id)
.defaultIfEmpty(User.anonymous());
// switchIfEmpty — subscribe to a fallback publisher if source is empty
Mono<Product> product = cache.get(sku)
.switchIfEmpty(db.findBySku(sku) // miss: query DB
.flatMap(p -> cache.put(sku, p).thenReturn(p))); // populate cache
// take / skip — limit or skip elements
Flux<Event> firstTen = eventStream.take(10);
Flux<Event> afterFirst = eventStream.skip(5);
Flux<Event> window = eventStream.take(Duration.ofSeconds(5)); // time window
// distinct / distinctUntilChanged
Flux<String> unique = tags.distinct();
Flux<Status> changes = statusStream.distinctUntilChanged();
// zip — combine elements pairwise (waits for both to emit, then pairs)
Mono<OrderDetail> detail = Mono.zip(
orderRepo.findById(orderId), // Mono<Order>
userRepo.findById(userId) // Mono<User>
).map(tuple -> new OrderDetail(tuple.getT1(), tuple.getT2()));
// zipWith — zip on the instance
Mono<ProductInfo> info = product
.zipWith(inventory.getStock(productId))
.map(t -> new ProductInfo(t.getT1(), t.getT2()));
// merge — interleave multiple Fluxes, subscribe to all at once (no order guarantee)
Flux<Event> allEvents = Flux.merge(userEvents, orderEvents, paymentEvents);
// concat — subscribe to sources sequentially (first completes, then second starts)
Flux<Message> timeline = Flux.concat(archiveMessages, liveMessages);
// combineLatest — emit a combined value whenever ANY source emits
Flux<Dashboard> dashboard = Flux.combineLatest(
metricsFlux, alertsFlux,
(metrics, alerts) -> Dashboard.of(metrics, alerts));
// onErrorReturn — emit a static fallback value on error
Mono<Integer> safe = riskyCall()
.onErrorReturn(-1);
// onErrorResume — switch to a fallback publisher on error
Mono<Product> withFallback = primaryService.getProduct(id)
.onErrorResume(ServiceUnavailableException.class,
ex -> fallbackService.getProduct(id));
// onErrorMap — translate error type
Mono<User> mapped = repo.findById(id)
.onErrorMap(DataAccessException.class,
ex -> new ServiceException("DB unavailable", ex));
// doOnError — side effect (logging) without altering the error signal
Mono<Order> logged = orderService.create(request)
.doOnError(ex -> log.error("Order creation failed", ex));
// retry — resubscribe on error (use with caution on non-idempotent operations)
Mono<String> withRetry = httpClient.get(url)
.retry(3); // retry up to 3 times immediately
// retryWhen — exponential backoff using Retry spec
Mono<String> backoff = httpClient.get(url)
.retryWhen(Retry.backoff(3, Duration.ofMillis(200))
.maxBackoff(Duration.ofSeconds(2))
.filter(ex -> ex instanceof IOException));
// timeout — fail if no item within duration
Mono<Response> timeLimited = service.call()
.timeout(Duration.ofSeconds(3))
.onErrorReturn(TimeoutException.class, Response.empty());
Backpressure is the mechanism by which a slow consumer signals to a fast producer to slow down. Reactor respects this automatically when subscribers request elements. For hot sources (WebSocket, SSE, message queues) you must choose an overflow strategy.
// onBackpressureBuffer — buffer ALL excess elements (can OOM if unbounded)
Flux<Event> buffered = hotSource.onBackpressureBuffer();
// onBackpressureBuffer with bounded queue and overflow handler
Flux<Event> safeBuf = hotSource
.onBackpressureBuffer(1000,
dropped -> log.warn("Dropped event: {}", dropped),
BufferOverflowStrategy.DROP_OLDEST);
// onBackpressureDrop — silently discard elements when consumer can't keep up
Flux<Tick> dropped = marketDataFeed.onBackpressureDrop();
// onBackpressureLatest — keep only the most recent element (like a 1-slot cache)
Flux<Price> latest = priceFeed.onBackpressureLatest();
// limitRate — request elements in batches from upstream (reduces round trips)
Flux<Record> batched = largeStream
.limitRate(256); // request 256 at a time (replenish at 75%)
// Controlling request size in a subscriber
largeStream.subscribe(new BaseSubscriber<Record>() {
@Override
protected void hookOnSubscribe(Subscription s) {
request(100); // ask for first 100
}
@Override
protected void hookOnNext(Record r) {
process(r);
request(1); // ask for one more after each processed
}
});
Reactor is threading-agnostic — operators run on whichever thread subscribes by default. Use subscribeOn and publishOn to switch threads explicitly.
// subscribeOn — affects the subscription thread (where the source runs)
// Useful for wrapping blocking calls into a dedicated thread pool
Mono<String> fromBlockingIO = Mono.fromCallable(() -> blockingDbCall())
.subscribeOn(Schedulers.boundedElastic()); // elastic pool for blocking I/O
// publishOn — affects everything DOWNSTREAM of the operator (hand off thread)
Flux.range(1, 100)
.map(i -> heavyCpuWork(i)) // runs on subscriber's thread
.publishOn(Schedulers.parallel()) // switch to parallel pool here
.map(i -> lightTransform(i)) // runs on parallel pool
.subscribe();
// Built-in Schedulers:
// Schedulers.immediate() — current thread (no switch)
// Schedulers.single() — single reusable thread
// Schedulers.boundedElastic() — elastic pool for blocking I/O (capped, queued)
// Schedulers.parallel() — fixed pool = CPU cores, for CPU-bound work
// Schedulers.fromExecutor(e) — wrap any ExecutorService
// Spring WebFlux uses Netty's event loop by default for I/O —
// NEVER block on those threads; always offload to boundedElastic
Mono<byte[]> fileBytes = Mono.fromCallable(() -> Files.readAllBytes(path))
.subscribeOn(Schedulers.boundedElastic());
Blocking on a Netty event loop thread (e.g., calling .block(), Thread.sleep(), or a synchronous JDBC call inside a WebFlux handler) will deadlock or starve the server. Always wrap blocking code with subscribeOn(Schedulers.boundedElastic()).
Reactor's Context is an immutable key-value store that flows upstream through the reactive chain — the reactive equivalent of ThreadLocal. It is used by Spring Security, Micrometer, and OpenTelemetry to propagate security principals and trace IDs across async boundaries.
// Writing to Context — contextWrite is called at subscription time
Mono<String> withCtx = Mono.deferContextual(ctx -> {
String tenantId = ctx.getOrDefault("tenantId", "default");
return service.getDataForTenant(tenantId);
})
.contextWrite(Context.of("tenantId", "acme"));
// Reading Context inside an operator — use deferContextual or transformDeferredContextual
Flux<Order> orders = Flux.deferContextual(ctx -> {
String userId = ctx.get("userId");
return orderRepo.findByUser(userId);
});
// Spring Security reactive context — automatically propagated by Security filter chain
// Access the authenticated principal reactively:
Mono<String> currentUser = ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication)
.map(Authentication::getName);
// Propagating MDC (SLF4J) across reactive boundaries with Reactor Context
Mono<Void> traced = Mono.deferContextual(ctx -> {
MDC.put("traceId", ctx.getOrDefault("traceId", "none"));
return doWork().doFinally(s -> MDC.clear());
}).contextWrite(Context.of("traceId", UUID.randomUUID().toString()));
StepVerifier from reactor-test is the standard tool for asserting reactive pipeline behaviour in unit tests without blocking threads.
import reactor.test.StepVerifier;
import reactor.test.scheduler.VirtualTimeScheduler;
// Basic assertions
StepVerifier.create(Flux.just("a", "b", "c"))
.expectNext("a")
.expectNext("b")
.expectNext("c")
.verifyComplete();
// Error assertions
StepVerifier.create(Mono.error(new IllegalArgumentException("bad")))
.expectErrorMatches(ex ->
ex instanceof IllegalArgumentException && ex.getMessage().equals("bad"))
.verify();
// Asserting counts (useful for large streams)
StepVerifier.create(Flux.range(1, 1000))
.expectNextCount(1000)
.verifyComplete();
// Virtual time — test time-dependent operators without waiting
StepVerifier.withVirtualTime(() ->
Flux.interval(Duration.ofSeconds(1)).take(3))
.expectSubscription()
.thenAwait(Duration.ofSeconds(3)) // advance virtual clock 3 seconds
.expectNextCount(3)
.verifyComplete();
// Test with Context
StepVerifier.create(
Mono.deferContextual(ctx -> Mono.just(ctx.get("user")))
.contextWrite(Context.of("user", "alice")))
.expectNext("alice")
.verifyComplete();
// Asserting backpressure
StepVerifier.create(Flux.range(1, 10), 3) // request only 3 elements
.expectNext(1, 2, 3)
.thenRequest(2)
.expectNext(4, 5)
.thenCancel()
.verify();
Add reactor-test to your test scope: testImplementation 'io.projectreactor:reactor-test'. It is included transitively when you use spring-boot-starter-webflux in tests.