Contents
- Dependency
- Mono — Zero or One Value
- Flux — Zero to Many Values
- Cold vs Hot Publishers
- Transforming — map & flatMap
- Filtering — filter, take, skip & distinct
- Error Handling
- Backpressure Basics
- Subscribing & Blocking
Add the following to your pom.xml. When using Spring Boot, spring-boot-starter-webflux pulls in Reactor automatically — you only need the standalone dependency for non-Spring projects or unit tests.
<!-- Standalone Reactor (included automatically with spring-boot-starter-webflux) -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<!-- For testing with StepVerifier -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
A Mono represents an asynchronous sequence that will emit at most one item, then complete — or emit an error. Think of it as a reactive equivalent of CompletableFuture<T> or Optional<T>, but with full operator support and lazy evaluation. Nothing executes until someone subscribes.
// Creating a Mono with a known value
Mono<String> greeting = Mono.just("Hello, Reactor!");
// Empty Mono — completes without emitting
Mono<String> empty = Mono.empty();
// Mono from a Callable — evaluated lazily on each subscription
Mono<String> lazy = Mono.fromCallable(() -> fetchUserFromDatabase(userId));
// Mono from a CompletableFuture
Mono<User> fromFuture = Mono.fromFuture(userService.findUserAsync(id));
// Mono that always errors
Mono<String> error = Mono.error(new RuntimeException("Something went wrong"));
// Transform the value when it arrives
Mono<String> upper = Mono.just("hello")
.map(String::toUpperCase); // "HELLO"
// Subscribe and print (blocks the calling thread — only for demos)
upper.subscribe(
value -> System.out.println("Value: " + value),
err -> System.err.println("Error: " + err),
() -> System.out.println("Completed")
);
A Flux is a reactive sequence that can emit 0 to N items before completing or erroring. It is the reactive equivalent of Stream<T> or Iterable<T> — but asynchronous and non-blocking. Every operator on a Flux returns a new Flux, forming a declarative pipeline.
// Create from known values
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
// Create from a collection
Flux<String> fromList = Flux.fromIterable(List.of("a", "b", "c"));
// Create from a range
Flux<Integer> range = Flux.range(1, 10); // 1 through 10
// Emit items on a fixed interval (useful for polling)
Flux<Long> ticks = Flux.interval(Duration.ofSeconds(1)); // 0, 1, 2, ...
// Create from a Stream (consumed lazily)
Flux<String> fromStream = Flux.fromStream(() -> Stream.of("x", "y", "z"));
// Programmatic creation with FluxSink
Flux<String> generated = Flux.create(sink -> {
for (String item : externalDataSource.readAll()) {
sink.next(item);
if (sink.isCancelled()) break;
}
sink.complete();
});
// Chain operators
Flux.range(1, 10)
.filter(n -> n % 2 == 0) // keep even numbers
.map(n -> "item-" + n) // transform
.take(3) // limit to first 3
.subscribe(System.out::println); // item-2, item-4, item-6
A cold publisher re-executes its source for every new subscriber — like a file read or a database query. A hot publisher is shared: it emits items regardless of how many subscribers exist, and late subscribers only see future items. Understanding this distinction is critical for avoiding duplicate work or missed events.
// COLD — each subscriber triggers a fresh HTTP call
Flux<Order> cold = Flux.defer(() -> webClient.get().uri("/orders").retrieve()
.bodyToFlux(Order.class));
cold.subscribe(o -> System.out.println("Sub1: " + o.getId()));
cold.subscribe(o -> System.out.println("Sub2: " + o.getId()));
// Two separate HTTP requests are made
// HOT — convert to hot with share() or publish().autoConnect()
Flux<Long> hot = Flux.interval(Duration.ofMillis(500))
.share(); // shared multicast — all subscribers see the same ticks
hot.subscribe(t -> System.out.println("Sub1: " + t));
Thread.sleep(1200);
hot.subscribe(t -> System.out.println("Sub2: " + t)); // joins mid-stream
// Cache — replay the last N items to new subscribers
Flux<String> cached = Flux.just("a", "b", "c")
.cache(); // replays all items to every subscriber
map applies a synchronous 1-to-1 transformation. flatMap applies an asynchronous transformation that returns a publisher for each input, then merges all results concurrently — the most commonly used operator in reactive pipelines. concatMap does the same but preserves order by subscribing sequentially.
// map — synchronous, 1-to-1
Flux.just("alice", "bob", "carol")
.map(String::toUpperCase) // ALICE, BOB, CAROL
.map(name -> new User(name))
.subscribe(System.out::println);
// flatMap — async, concurrent, order NOT guaranteed
Flux.just("alice", "bob", "carol")
.flatMap(name ->
userRepository.findByName(name) // returns Mono<User>
.subscribeOn(Schedulers.boundedElastic()))
.subscribe(user -> System.out.println("Found: " + user));
// concatMap — async, sequential, order guaranteed
Flux.just(1, 2, 3)
.concatMap(id -> fetchOrderById(id)) // fetches 1, then 2, then 3
.subscribe(System.out::println);
// flatMapMany — Mono to Flux (one-to-many expansion)
Mono.just("orderId-42")
.flatMapMany(id -> orderItemRepository.findByOrderId(id))
.subscribe(item -> System.out.println("Item: " + item));
Reactor provides a complete set of filtering operators. These do not block — they apply the predicate as items flow through the pipeline and drop non-matching elements immediately.
Flux<Integer> source = Flux.range(1, 20);
// filter — keep items matching the predicate
source.filter(n -> n % 3 == 0)
.subscribe(System.out::println); // 3, 6, 9, 12, 15, 18
// take — limit to first N items then cancel upstream
source.take(5)
.subscribe(System.out::println); // 1, 2, 3, 4, 5
// takeWhile — take while predicate holds
source.takeWhile(n -> n < 8)
.subscribe(System.out::println); // 1, 2, 3, 4, 5, 6, 7
// skip — discard first N items
source.skip(15)
.subscribe(System.out::println); // 16, 17, 18, 19, 20
// distinct — deduplicate by natural equality
Flux.just("a", "b", "a", "c", "b")
.distinct()
.subscribe(System.out::println); // a, b, c
// distinctUntilChanged — deduplicate consecutive duplicates
Flux.just(1, 1, 2, 2, 3, 1)
.distinctUntilChanged()
.subscribe(System.out::println); // 1, 2, 3, 1
In a reactive pipeline an error terminates the sequence and propagates downstream. Reactor provides several strategies for recovering gracefully — returning a fallback value, switching to an alternative publisher, or retrying the source.
// onErrorReturn — substitute a default value on error
Mono.fromCallable(() -> riskyRemoteCall())
.onErrorReturn("default-value")
.subscribe(System.out::println);
// onErrorResume — switch to an alternative publisher
Flux.just("order-1", "order-2", "order-3")
.flatMap(id -> fetchOrder(id)
.onErrorResume(ex -> {
log.warn("Failed to fetch {}: {}", id, ex.getMessage());
return Mono.empty(); // skip this item on error
}))
.subscribe(System.out::println);
// onErrorMap — translate error type
Mono.fromCallable(() -> db.findById(id))
.onErrorMap(SQLException.class, ex ->
new DataAccessException("DB lookup failed", ex));
// retry — resubscribe on error (up to N times)
Mono.fromCallable(() -> unstableApiCall())
.retry(3)
.subscribe(System.out::println);
// retryWhen — exponential back-off retry
Mono.fromCallable(() -> unstableApiCall())
.retryWhen(Retry.backoff(3, Duration.ofMillis(200))
.maxBackoff(Duration.ofSeconds(5))
.filter(ex -> ex instanceof TimeoutException))
.subscribe(System.out::println);
// doOnError — side-effect logging without altering the error
Flux.just(1, 2, 3)
.map(n -> { if (n == 2) throw new RuntimeException("bad item"); return n; })
.doOnError(ex -> log.error("Stream error: {}", ex.getMessage()))
.onErrorReturn(-1)
.subscribe(System.out::println); // 1, -1
Backpressure is the mechanism that lets a slow subscriber signal to a fast producer to slow down — preventing out-of-memory errors when the producer emits faster than the consumer can process. Reactor implements the Reactive Streams specification, so backpressure is built into the protocol. Most operators respect it automatically, but when bridging to non-reactive sources you must choose a strategy explicitly.
// Default — subscriber requests unbounded (Long.MAX_VALUE)
Flux.range(1, 1_000_000)
.subscribe(System.out::println); // all items pushed as fast as possible
// BaseSubscriber — request items in controlled batches
Flux.range(1, 100)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // ask for first 10 items
}
@Override
protected void hookOnNext(Integer value) {
process(value);
if (value % 10 == 0) request(10); // ask for 10 more when done
}
});
// Overflow strategies for hot sources that can't be slowed down
Flux.interval(Duration.ofMillis(1)) // fast producer
.onBackpressureDrop(dropped ->
log.warn("Dropped: {}", dropped)) // discard excess items
.publishOn(Schedulers.boundedElastic())
.subscribe(n -> Thread.sleep(10)); // slow consumer
// onBackpressureBuffer — buffer up to N items, error on overflow
Flux.interval(Duration.ofMillis(1))
.onBackpressureBuffer(256,
dropped -> log.warn("Buffer full, dropped: {}", dropped),
BufferOverflowStrategy.DROP_OLDEST)
.publishOn(Schedulers.single())
.subscribe(System.out::println);
A Reactor pipeline is lazy — nothing runs until you subscribe. In production Spring WebFlux code, the framework subscribes for you when you return a Mono or Flux from a controller or service. block() is available for tests or non-reactive boundaries, but must never be called on a Reactor thread or inside a reactive pipeline.
// subscribe() variants
Flux.just(1, 2, 3)
.subscribe(); // fire-and-forget
Flux.just(1, 2, 3)
.subscribe(System.out::println); // onNext only
Flux.just(1, 2, 3)
.subscribe(System.out::println,
Throwable::printStackTrace); // onNext + onError
Flux.just(1, 2, 3)
.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("Done")); // + onComplete
// Disposable — cancel a subscription
Disposable sub = Flux.interval(Duration.ofMillis(100))
.subscribe(System.out::println);
Thread.sleep(500);
sub.dispose(); // stops the stream
// block() — only safe outside reactive threads (e.g. main, @Test)
String result = Mono.just("hello").map(String::toUpperCase).block();
// blockLast() — block until the Flux completes, return the last item
Integer last = Flux.range(1, 5).blockLast();
// collectList() — gather all Flux items into a Mono<List>
Mono<List<Integer>> all = Flux.range(1, 5).collectList();
Never call block() inside a reactive pipeline or on a Netty/Reactor event-loop thread. It causes a deadlock. If you need to integrate with blocking code, use Mono.fromCallable(...).subscribeOn(Schedulers.boundedElastic()) to offload the blocking call to a dedicated thread pool.