Contents
- Built-in Schedulers
- publishOn — Switch Downstream Thread
- subscribeOn — Switch Upstream Thread
- publishOn vs subscribeOn
- Offloading Blocking Calls
- Parallel Processing with parallel()
- Custom Thread Pools
- Context Propagation — MDC & Security
Reactor ships with four built-in scheduler types, each designed for a specific category of work. Choosing the right one prevents thread starvation and keeps latency predictable.
| Scheduler | Thread Pool | Use For |
| Schedulers.immediate() | Current thread | Default — no thread switch, synchronous ops |
| Schedulers.single() | 1 reusable thread | Single-threaded, ordered background work |
| Schedulers.boundedElastic() | Elastic pool (bounded — default 10× CPU) | Blocking I/O — JDBC, file, legacy blocking APIs |
| Schedulers.parallel() | Fixed pool = CPU cores | CPU-intensive, non-blocking computation |
| Schedulers.fromExecutorService() | Your own ExecutorService | Custom tuning, integration with existing pools |
// Access built-in schedulers
Scheduler elastic = Schedulers.boundedElastic();
Scheduler parallel = Schedulers.parallel();
Scheduler single = Schedulers.single();
// Always dispose custom schedulers when shutting down
Scheduler custom = Schedulers.newBoundedElastic(20, 100, "my-pool");
// ...
custom.dispose();
publishOn places a thread-switch boundary in the middle of the pipeline. Every operator after the publishOn call runs on the given scheduler. Operators before it continue on the original thread. You can call publishOn multiple times to switch threads at different pipeline stages.
Flux.range(1, 5)
// runs on the subscribing thread (e.g. Netty event-loop)
.map(n -> n * 2)
.publishOn(Schedulers.boundedElastic())
// everything BELOW this point runs on boundedElastic
.map(n -> fetchFromDatabase(n)) // blocking call — safe on boundedElastic
.doOnNext(v -> log.info("Thread: {}", Thread.currentThread().getName()))
.publishOn(Schedulers.parallel())
// switch again for CPU-bound processing
.map(n -> heavyComputation(n))
.subscribe(System.out::println);
subscribeOn affects where the subscription and source emission happen — i.e., the thread that calls into the source publisher. Unlike publishOn, only the first subscribeOn in a pipeline takes effect (subsequent calls are ignored). It is primarily used to move the source itself (e.g., a blocking factory) onto a different thread.
// Source runs on boundedElastic, not the calling thread
Mono.fromCallable(() -> jdbcTemplate.queryForObject("SELECT COUNT(*) FROM orders", Long.class))
.subscribeOn(Schedulers.boundedElastic()) // source runs here
.map(count -> "Total orders: " + count) // also runs on boundedElastic
.subscribe(System.out::println);
// subscribeOn applies to the whole upstream chain
Flux.fromIterable(readLargeFileLines()) // blocking file read
.subscribeOn(Schedulers.boundedElastic())
.filter(line -> !line.isBlank())
.map(String::trim)
.subscribe(System.out::println);
The key mental model: publishOn controls where the output of the pipeline flows; subscribeOn controls where the source runs. In practice, use subscribeOn when the source itself is blocking, and publishOn when you need to switch threads mid-pipeline.
| publishOn | subscribeOn |
| Affects | Operators downstream (after the call) | Source and upstream operators |
| Multiple calls | Each one switches thread at that point | Only the first one wins |
| Typical use | Offload processing mid-pipeline | Wrap a blocking source |
// Correct pattern: subscribeOn wraps blocking source, publishOn switches for downstream
Mono.fromCallable(() -> blockingDbCall()) // blocking
.subscribeOn(Schedulers.boundedElastic()) // source runs on elastic
.publishOn(Schedulers.parallel()) // switch for downstream CPU work
.map(data -> transform(data))
.subscribe(System.out::println);
The most common threading mistake in reactive code is calling a blocking API (JDBC, REST template, file I/O) on a Netty event-loop thread. The correct pattern is to wrap the blocking call in Mono.fromCallable and offload it to Schedulers.boundedElastic().
// ❌ WRONG — blocks the Netty event-loop thread
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
User user = userRepository.findById(id).orElseThrow(); // BLOCKING!
return Mono.just(user);
}
// ✅ CORRECT — offload blocking JDBC call to boundedElastic
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return Mono.fromCallable(() -> userRepository.findById(id).orElseThrow())
.subscribeOn(Schedulers.boundedElastic());
}
// ✅ Wrapping a blocking service method
public Mono<Report> generateReport(ReportRequest request) {
return Mono.fromCallable(() -> legacyReportService.generate(request))
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(30));
}
// ✅ Calling a blocking REST client from a reactive pipeline
Flux.fromIterable(userIds)
.flatMap(id ->
Mono.fromCallable(() -> restTemplate.getForObject("/users/" + id, User.class))
.subscribeOn(Schedulers.boundedElastic()),
10) // max 10 concurrent blocking calls
.subscribe(System.out::println);
Schedulers.boundedElastic() has a default cap of 10× the number of CPU cores (e.g., 80 threads on an 8-core machine). If you exhaust it — by submitting too many long-blocking tasks — new tasks queue up. Size the cap with Schedulers.newBoundedElastic(threads, queueCap, name) for production services.
The parallel() operator splits a Flux into N rails (defaulting to the number of CPU cores) and distributes items across them for concurrent processing. You must call runOn(scheduler) immediately after to assign a scheduler to the rails, then sequential() to merge them back into a single Flux.
// Process items across all CPU cores in parallel
Flux.range(1, 1000)
.parallel() // splits into CPU-count rails
.runOn(Schedulers.parallel()) // each rail runs on a parallel thread
.map(n -> heavyCpuComputation(n)) // computed concurrently
.sequential() // merge results back
.subscribe(System.out::println);
// Parallel with custom rail count
Flux.fromIterable(tasks)
.parallel(4) // exactly 4 rails
.runOn(Schedulers.newParallel("worker", 4))
.flatMap(task -> Mono.fromCallable(() -> task.execute())
.subscribeOn(Schedulers.boundedElastic()))
.sequential()
.collectList()
.subscribe(results -> System.out.println("All done: " + results.size()));
For fine-grained control — separate pools for different subsystems, custom thread naming for monitoring, or queue size limits — create a named scheduler backed by your own ExecutorService.
// Named elastic pool for outbound HTTP calls
Scheduler httpPool = Schedulers.newBoundedElastic(
50, // max threads
1000, // max queued tasks
"http-pool" // thread name prefix (visible in thread dumps)
);
// Named parallel pool for CPU processing
Scheduler cpuPool = Schedulers.newParallel("cpu-pool", Runtime.getRuntime().availableProcessors());
// From an existing ExecutorService (e.g., Spring's TaskExecutor)
@Bean
public Scheduler reactiveScheduler(ThreadPoolTaskExecutor executor) {
return Schedulers.fromExecutorService(executor.getThreadPoolExecutor(), "spring-pool");
}
// Always dispose custom schedulers on shutdown
@PreDestroy
public void cleanup() {
httpPool.dispose();
cpuPool.dispose();
}
ThreadLocal variables (used by SLF4J MDC, Spring Security's SecurityContextHolder, and Sleuth trace IDs) don't work across thread switches in reactive pipelines. Reactor's Context is the reactive equivalent — an immutable key-value map attached to the subscription that flows upstream. Spring Boot 3.x + Micrometer Tracing handle this automatically for traces; for MDC and security you need explicit propagation.
// Write to Context — use contextWrite at the end of the chain (flows upstream)
Mono.deferContextual(ctx -> {
String userId = ctx.get("userId");
return Mono.just("Processing for user: " + userId);
})
.contextWrite(Context.of("userId", "alice123"))
.subscribe(System.out::println);
// MDC propagation — bridge Reactor Context to SLF4J MDC
Mono.just("order-42")
.doOnEach(signal -> {
if (signal.isOnNext() || signal.isOnError()) {
String traceId = signal.getContextView().getOrDefault("traceId", "none");
MDC.put("traceId", traceId);
}
})
.contextWrite(Context.of("traceId", "abc-123"))
.flatMap(orderId -> processOrder(orderId))
.subscribe();
// Spring Security — propagate SecurityContext reactively (WebFlux)
// Spring Security WebFlux automatically wires ReactorContextWebFilter
// To read current user in reactive service:
public Mono<String> getCurrentUsername() {
return ReactiveSecurityContextHolder.getContext()
.map(ctx -> ctx.getAuthentication().getName());
}
// Micrometer Tracing — automatic propagation in Spring Boot 3
// Just add micrometer-tracing-bridge-otel and it handles Context for you