Contents

Choose the right transform operator based on whether the mapping function is synchronous or itself returns a reactive publisher, and whether order matters.

// map — synchronous 1-to-1 transform (stays on same thread) Flux<String> upper = Flux.just("alice", "bob", "carol") .map(String::toUpperCase); // "ALICE", "BOB", "CAROL" // flatMap — async transform, subscriptions run concurrently (order NOT preserved) Flux<User> users = Flux.just(1L, 2L, 3L) .flatMap(id -> userService.findById(id)); // all 3 DB calls fire in parallel // concatMap — async transform, subscriptions are sequential (order preserved) Flux<Order> orders = Flux.just("u1", "u2", "u3") .concatMap(userId -> orderService.getLatestOrder(userId)); // sequential, in order // flatMapSequential — runs concurrently but re-orders output to preserve input order Flux<Report> reports = ids.flatMapSequential(id -> reportService.generate(id)); flatMap concurrency can be limited with the concurrency parameter: flatMap(fn, 4) limits to 4 concurrent inner subscriptions — essential when calling downstream services to prevent overwhelming them.
// filter — pass only elements matching the predicate Flux<Integer> evens = Flux.range(1, 10).filter(n -> n % 2 == 0); // defaultIfEmpty — emit a fallback value if the source completes empty Mono<User> user = userRepo.findById(id) .defaultIfEmpty(User.anonymous()); // switchIfEmpty — subscribe to a fallback publisher if source is empty Mono<Product> product = cache.get(sku) .switchIfEmpty(db.findBySku(sku) // miss: query DB .flatMap(p -> cache.put(sku, p).thenReturn(p))); // populate cache // take / skip — limit or skip elements Flux<Event> firstTen = eventStream.take(10); Flux<Event> afterFirst = eventStream.skip(5); Flux<Event> window = eventStream.take(Duration.ofSeconds(5)); // time window // distinct / distinctUntilChanged Flux<String> unique = tags.distinct(); Flux<Status> changes = statusStream.distinctUntilChanged(); // zip — combine elements pairwise (waits for both to emit, then pairs) Mono<OrderDetail> detail = Mono.zip( orderRepo.findById(orderId), // Mono<Order> userRepo.findById(userId) // Mono<User> ).map(tuple -> new OrderDetail(tuple.getT1(), tuple.getT2())); // zipWith — zip on the instance Mono<ProductInfo> info = product .zipWith(inventory.getStock(productId)) .map(t -> new ProductInfo(t.getT1(), t.getT2())); // merge — interleave multiple Fluxes, subscribe to all at once (no order guarantee) Flux<Event> allEvents = Flux.merge(userEvents, orderEvents, paymentEvents); // concat — subscribe to sources sequentially (first completes, then second starts) Flux<Message> timeline = Flux.concat(archiveMessages, liveMessages); // combineLatest — emit a combined value whenever ANY source emits Flux<Dashboard> dashboard = Flux.combineLatest( metricsFlux, alertsFlux, (metrics, alerts) -> Dashboard.of(metrics, alerts)); // onErrorReturn — emit a static fallback value on error Mono<Integer> safe = riskyCall() .onErrorReturn(-1); // onErrorResume — switch to a fallback publisher on error Mono<Product> withFallback = primaryService.getProduct(id) .onErrorResume(ServiceUnavailableException.class, ex -> fallbackService.getProduct(id)); // onErrorMap — translate error type Mono<User> mapped = repo.findById(id) .onErrorMap(DataAccessException.class, ex -> new ServiceException("DB unavailable", ex)); // doOnError — side effect (logging) without altering the error signal Mono<Order> logged = orderService.create(request) .doOnError(ex -> log.error("Order creation failed", ex)); // retry — resubscribe on error (use with caution on non-idempotent operations) Mono<String> withRetry = httpClient.get(url) .retry(3); // retry up to 3 times immediately // retryWhen — exponential backoff using Retry spec Mono<String> backoff = httpClient.get(url) .retryWhen(Retry.backoff(3, Duration.ofMillis(200)) .maxBackoff(Duration.ofSeconds(2)) .filter(ex -> ex instanceof IOException)); // timeout — fail if no item within duration Mono<Response> timeLimited = service.call() .timeout(Duration.ofSeconds(3)) .onErrorReturn(TimeoutException.class, Response.empty());

Backpressure is the mechanism by which a slow consumer signals to a fast producer to slow down. Reactor respects this automatically when subscribers request elements. For hot sources (WebSocket, SSE, message queues) you must choose an overflow strategy.

// onBackpressureBuffer — buffer ALL excess elements (can OOM if unbounded) Flux<Event> buffered = hotSource.onBackpressureBuffer(); // onBackpressureBuffer with bounded queue and overflow handler Flux<Event> safeBuf = hotSource .onBackpressureBuffer(1000, dropped -> log.warn("Dropped event: {}", dropped), BufferOverflowStrategy.DROP_OLDEST); // onBackpressureDrop — silently discard elements when consumer can't keep up Flux<Tick> dropped = marketDataFeed.onBackpressureDrop(); // onBackpressureLatest — keep only the most recent element (like a 1-slot cache) Flux<Price> latest = priceFeed.onBackpressureLatest(); // limitRate — request elements in batches from upstream (reduces round trips) Flux<Record> batched = largeStream .limitRate(256); // request 256 at a time (replenish at 75%) // Controlling request size in a subscriber largeStream.subscribe(new BaseSubscriber<Record>() { @Override protected void hookOnSubscribe(Subscription s) { request(100); // ask for first 100 } @Override protected void hookOnNext(Record r) { process(r); request(1); // ask for one more after each processed } });

Reactor is threading-agnostic — operators run on whichever thread subscribes by default. Use subscribeOn and publishOn to switch threads explicitly.

// subscribeOn — affects the subscription thread (where the source runs) // Useful for wrapping blocking calls into a dedicated thread pool Mono<String> fromBlockingIO = Mono.fromCallable(() -> blockingDbCall()) .subscribeOn(Schedulers.boundedElastic()); // elastic pool for blocking I/O // publishOn — affects everything DOWNSTREAM of the operator (hand off thread) Flux.range(1, 100) .map(i -> heavyCpuWork(i)) // runs on subscriber's thread .publishOn(Schedulers.parallel()) // switch to parallel pool here .map(i -> lightTransform(i)) // runs on parallel pool .subscribe(); // Built-in Schedulers: // Schedulers.immediate() — current thread (no switch) // Schedulers.single() — single reusable thread // Schedulers.boundedElastic() — elastic pool for blocking I/O (capped, queued) // Schedulers.parallel() — fixed pool = CPU cores, for CPU-bound work // Schedulers.fromExecutor(e) — wrap any ExecutorService // Spring WebFlux uses Netty's event loop by default for I/O — // NEVER block on those threads; always offload to boundedElastic Mono<byte[]> fileBytes = Mono.fromCallable(() -> Files.readAllBytes(path)) .subscribeOn(Schedulers.boundedElastic()); Blocking on a Netty event loop thread (e.g., calling .block(), Thread.sleep(), or a synchronous JDBC call inside a WebFlux handler) will deadlock or starve the server. Always wrap blocking code with subscribeOn(Schedulers.boundedElastic()).

Reactor's Context is an immutable key-value store that flows upstream through the reactive chain — the reactive equivalent of ThreadLocal. It is used by Spring Security, Micrometer, and OpenTelemetry to propagate security principals and trace IDs across async boundaries.

// Writing to Context — contextWrite is called at subscription time Mono<String> withCtx = Mono.deferContextual(ctx -> { String tenantId = ctx.getOrDefault("tenantId", "default"); return service.getDataForTenant(tenantId); }) .contextWrite(Context.of("tenantId", "acme")); // Reading Context inside an operator — use deferContextual or transformDeferredContextual Flux<Order> orders = Flux.deferContextual(ctx -> { String userId = ctx.get("userId"); return orderRepo.findByUser(userId); }); // Spring Security reactive context — automatically propagated by Security filter chain // Access the authenticated principal reactively: Mono<String> currentUser = ReactiveSecurityContextHolder.getContext() .map(SecurityContext::getAuthentication) .map(Authentication::getName); // Propagating MDC (SLF4J) across reactive boundaries with Reactor Context Mono<Void> traced = Mono.deferContextual(ctx -> { MDC.put("traceId", ctx.getOrDefault("traceId", "none")); return doWork().doFinally(s -> MDC.clear()); }).contextWrite(Context.of("traceId", UUID.randomUUID().toString()));

StepVerifier from reactor-test is the standard tool for asserting reactive pipeline behaviour in unit tests without blocking threads.

import reactor.test.StepVerifier; import reactor.test.scheduler.VirtualTimeScheduler; // Basic assertions StepVerifier.create(Flux.just("a", "b", "c")) .expectNext("a") .expectNext("b") .expectNext("c") .verifyComplete(); // Error assertions StepVerifier.create(Mono.error(new IllegalArgumentException("bad"))) .expectErrorMatches(ex -> ex instanceof IllegalArgumentException && ex.getMessage().equals("bad")) .verify(); // Asserting counts (useful for large streams) StepVerifier.create(Flux.range(1, 1000)) .expectNextCount(1000) .verifyComplete(); // Virtual time — test time-dependent operators without waiting StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofSeconds(1)).take(3)) .expectSubscription() .thenAwait(Duration.ofSeconds(3)) // advance virtual clock 3 seconds .expectNextCount(3) .verifyComplete(); // Test with Context StepVerifier.create( Mono.deferContextual(ctx -> Mono.just(ctx.get("user"))) .contextWrite(Context.of("user", "alice"))) .expectNext("alice") .verifyComplete(); // Asserting backpressure StepVerifier.create(Flux.range(1, 10), 3) // request only 3 elements .expectNext(1, 2, 3) .thenRequest(2) .expectNext(4, 5) .thenCancel() .verify(); Add reactor-test to your test scope: testImplementation 'io.projectreactor:reactor-test'. It is included transitively when you use spring-boot-starter-webflux in tests.