Contents

Reactor ships with four built-in scheduler types, each designed for a specific category of work. Choosing the right one prevents thread starvation and keeps latency predictable.

SchedulerThread PoolUse For
Schedulers.immediate()Current threadDefault — no thread switch, synchronous ops
Schedulers.single()1 reusable threadSingle-threaded, ordered background work
Schedulers.boundedElastic()Elastic pool (bounded — default 10× CPU)Blocking I/O — JDBC, file, legacy blocking APIs
Schedulers.parallel()Fixed pool = CPU coresCPU-intensive, non-blocking computation
Schedulers.fromExecutorService()Your own ExecutorServiceCustom tuning, integration with existing pools
// Access built-in schedulers Scheduler elastic = Schedulers.boundedElastic(); Scheduler parallel = Schedulers.parallel(); Scheduler single = Schedulers.single(); // Always dispose custom schedulers when shutting down Scheduler custom = Schedulers.newBoundedElastic(20, 100, "my-pool"); // ... custom.dispose();

publishOn places a thread-switch boundary in the middle of the pipeline. Every operator after the publishOn call runs on the given scheduler. Operators before it continue on the original thread. You can call publishOn multiple times to switch threads at different pipeline stages.

Flux.range(1, 5) // runs on the subscribing thread (e.g. Netty event-loop) .map(n -> n * 2) .publishOn(Schedulers.boundedElastic()) // everything BELOW this point runs on boundedElastic .map(n -> fetchFromDatabase(n)) // blocking call — safe on boundedElastic .doOnNext(v -> log.info("Thread: {}", Thread.currentThread().getName())) .publishOn(Schedulers.parallel()) // switch again for CPU-bound processing .map(n -> heavyComputation(n)) .subscribe(System.out::println);

subscribeOn affects where the subscription and source emission happen — i.e., the thread that calls into the source publisher. Unlike publishOn, only the first subscribeOn in a pipeline takes effect (subsequent calls are ignored). It is primarily used to move the source itself (e.g., a blocking factory) onto a different thread.

// Source runs on boundedElastic, not the calling thread Mono.fromCallable(() -> jdbcTemplate.queryForObject("SELECT COUNT(*) FROM orders", Long.class)) .subscribeOn(Schedulers.boundedElastic()) // source runs here .map(count -> "Total orders: " + count) // also runs on boundedElastic .subscribe(System.out::println); // subscribeOn applies to the whole upstream chain Flux.fromIterable(readLargeFileLines()) // blocking file read .subscribeOn(Schedulers.boundedElastic()) .filter(line -> !line.isBlank()) .map(String::trim) .subscribe(System.out::println);

The key mental model: publishOn controls where the output of the pipeline flows; subscribeOn controls where the source runs. In practice, use subscribeOn when the source itself is blocking, and publishOn when you need to switch threads mid-pipeline.

publishOnsubscribeOn
AffectsOperators downstream (after the call)Source and upstream operators
Multiple callsEach one switches thread at that pointOnly the first one wins
Typical useOffload processing mid-pipelineWrap a blocking source
// Correct pattern: subscribeOn wraps blocking source, publishOn switches for downstream Mono.fromCallable(() -> blockingDbCall()) // blocking .subscribeOn(Schedulers.boundedElastic()) // source runs on elastic .publishOn(Schedulers.parallel()) // switch for downstream CPU work .map(data -> transform(data)) .subscribe(System.out::println);

The most common threading mistake in reactive code is calling a blocking API (JDBC, REST template, file I/O) on a Netty event-loop thread. The correct pattern is to wrap the blocking call in Mono.fromCallable and offload it to Schedulers.boundedElastic().

// ❌ WRONG — blocks the Netty event-loop thread @GetMapping("/users/{id}") public Mono<User> getUser(@PathVariable Long id) { User user = userRepository.findById(id).orElseThrow(); // BLOCKING! return Mono.just(user); } // ✅ CORRECT — offload blocking JDBC call to boundedElastic @GetMapping("/users/{id}") public Mono<User> getUser(@PathVariable Long id) { return Mono.fromCallable(() -> userRepository.findById(id).orElseThrow()) .subscribeOn(Schedulers.boundedElastic()); } // ✅ Wrapping a blocking service method public Mono<Report> generateReport(ReportRequest request) { return Mono.fromCallable(() -> legacyReportService.generate(request)) .subscribeOn(Schedulers.boundedElastic()) .timeout(Duration.ofSeconds(30)); } // ✅ Calling a blocking REST client from a reactive pipeline Flux.fromIterable(userIds) .flatMap(id -> Mono.fromCallable(() -> restTemplate.getForObject("/users/" + id, User.class)) .subscribeOn(Schedulers.boundedElastic()), 10) // max 10 concurrent blocking calls .subscribe(System.out::println); Schedulers.boundedElastic() has a default cap of 10× the number of CPU cores (e.g., 80 threads on an 8-core machine). If you exhaust it — by submitting too many long-blocking tasks — new tasks queue up. Size the cap with Schedulers.newBoundedElastic(threads, queueCap, name) for production services.

The parallel() operator splits a Flux into N rails (defaulting to the number of CPU cores) and distributes items across them for concurrent processing. You must call runOn(scheduler) immediately after to assign a scheduler to the rails, then sequential() to merge them back into a single Flux.

// Process items across all CPU cores in parallel Flux.range(1, 1000) .parallel() // splits into CPU-count rails .runOn(Schedulers.parallel()) // each rail runs on a parallel thread .map(n -> heavyCpuComputation(n)) // computed concurrently .sequential() // merge results back .subscribe(System.out::println); // Parallel with custom rail count Flux.fromIterable(tasks) .parallel(4) // exactly 4 rails .runOn(Schedulers.newParallel("worker", 4)) .flatMap(task -> Mono.fromCallable(() -> task.execute()) .subscribeOn(Schedulers.boundedElastic())) .sequential() .collectList() .subscribe(results -> System.out.println("All done: " + results.size()));

For fine-grained control — separate pools for different subsystems, custom thread naming for monitoring, or queue size limits — create a named scheduler backed by your own ExecutorService.

// Named elastic pool for outbound HTTP calls Scheduler httpPool = Schedulers.newBoundedElastic( 50, // max threads 1000, // max queued tasks "http-pool" // thread name prefix (visible in thread dumps) ); // Named parallel pool for CPU processing Scheduler cpuPool = Schedulers.newParallel("cpu-pool", Runtime.getRuntime().availableProcessors()); // From an existing ExecutorService (e.g., Spring's TaskExecutor) @Bean public Scheduler reactiveScheduler(ThreadPoolTaskExecutor executor) { return Schedulers.fromExecutorService(executor.getThreadPoolExecutor(), "spring-pool"); } // Always dispose custom schedulers on shutdown @PreDestroy public void cleanup() { httpPool.dispose(); cpuPool.dispose(); }

ThreadLocal variables (used by SLF4J MDC, Spring Security's SecurityContextHolder, and Sleuth trace IDs) don't work across thread switches in reactive pipelines. Reactor's Context is the reactive equivalent — an immutable key-value map attached to the subscription that flows upstream. Spring Boot 3.x + Micrometer Tracing handle this automatically for traces; for MDC and security you need explicit propagation.

// Write to Context — use contextWrite at the end of the chain (flows upstream) Mono.deferContextual(ctx -> { String userId = ctx.get("userId"); return Mono.just("Processing for user: " + userId); }) .contextWrite(Context.of("userId", "alice123")) .subscribe(System.out::println); // MDC propagation — bridge Reactor Context to SLF4J MDC Mono.just("order-42") .doOnEach(signal -> { if (signal.isOnNext() || signal.isOnError()) { String traceId = signal.getContextView().getOrDefault("traceId", "none"); MDC.put("traceId", traceId); } }) .contextWrite(Context.of("traceId", "abc-123")) .flatMap(orderId -> processOrder(orderId)) .subscribe(); // Spring Security — propagate SecurityContext reactively (WebFlux) // Spring Security WebFlux automatically wires ReactorContextWebFilter // To read current user in reactive service: public Mono<String> getCurrentUsername() { return ReactiveSecurityContextHolder.getContext() .map(ctx -> ctx.getAuthentication().getName()); } // Micrometer Tracing — automatic propagation in Spring Boot 3 // Just add micrometer-tracing-bridge-otel and it handles Context for you