Contents
- Dependency
- StepVerifier Basics
- Asserting Errors
- Virtual Time — Testing Timer-Based Operators
- PublisherProbe — Verifying Branch Execution
- Testing with Context
- Testing WebFlux Controllers with WebTestClient
Add the following to your pom.xml. The reactor-test artifact is included automatically when you use spring-boot-starter-webflux with the test scope.
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
StepVerifier.create(publisher) subscribes to the publisher and enters an assertion chain. Each expectNext() call consumes and verifies one item. The chain must always end with a terminal assertion (verifyComplete(), verifyError(), etc.) which triggers the actual subscription and throws if anything doesn't match.
// Asserting Flux items and completion
StepVerifier.create(Flux.just("a", "b", "c"))
.expectNext("a")
.expectNext("b")
.expectNext("c")
.expectComplete()
.verify(); // triggers subscription and blocks until done
// Shorthand: expectNext accepts varargs
StepVerifier.create(Flux.just(1, 2, 3))
.expectNext(1, 2, 3)
.verifyComplete(); // expectComplete + verify in one call
// Asserting Mono
StepVerifier.create(Mono.just("hello").map(String::toUpperCase))
.expectNext("HELLO")
.verifyComplete();
// Asserting empty Mono
StepVerifier.create(Mono.empty())
.expectNextCount(0)
.verifyComplete();
// expectNextMatches — custom predicate per item
StepVerifier.create(userService.findAll())
.expectNextMatches(user -> user.getAge() >= 18)
.expectNextMatches(user -> user.getEmail().contains("@"))
.verifyComplete();
// assertNext — access the item for complex assertions (with AssertJ etc.)
StepVerifier.create(orderService.findById(42L))
.assertNext(order -> {
assertThat(order.getId()).isEqualTo(42L);
assertThat(order.getStatus()).isEqualTo(OrderStatus.CONFIRMED);
assertThat(order.getItems()).hasSize(3);
})
.verifyComplete();
// expectNextCount — verify N items without inspecting each
StepVerifier.create(Flux.range(1, 100))
.expectNextCount(100)
.verifyComplete();
When a publisher terminates with an error, use verifyError() or expectError() to assert the error type and message. A failed assertion in the StepVerifier chain surfaces as a JUnit test failure with a clear message.
// Verify any error
StepVerifier.create(Mono.error(new RuntimeException("oops")))
.verifyError();
// Verify specific error type
StepVerifier.create(Mono.error(new IllegalArgumentException("bad input")))
.verifyError(IllegalArgumentException.class);
// Verify error message
StepVerifier.create(Mono.error(new RuntimeException("connection refused")))
.expectErrorMessage("connection refused")
.verify();
// expectErrorMatches — custom predicate on the Throwable
StepVerifier.create(userService.findById(-1L))
.expectErrorMatches(ex ->
ex instanceof UserNotFoundException &&
ex.getMessage().contains("id=-1"))
.verify();
// assertNext then error — some items before the error
StepVerifier.create(
Flux.just("ok-1", "ok-2")
.concatWith(Mono.error(new RuntimeException("fail"))))
.expectNext("ok-1", "ok-2")
.verifyError(RuntimeException.class);
// onErrorResume fallback — verify the recovered value
StepVerifier.create(
Mono.error(new RuntimeException("down"))
.onErrorReturn("fallback"))
.expectNext("fallback")
.verifyComplete();
Operators like delayElements, timeout, interval, and retry with back-off rely on real wall-clock time. Waiting for a 5-second delay in a unit test is unacceptable. StepVerifier.withVirtualTime() replaces the real clock with a virtual one. You then call thenAwait(duration) to advance virtual time instantly — no real sleep involved.
// Test a Flux that emits every second — advance 5 seconds virtually
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofSeconds(1)).take(5))
.expectSubscription()
.thenAwait(Duration.ofSeconds(5)) // advance clock 5 seconds instantly
.expectNext(0L, 1L, 2L, 3L, 4L)
.verifyComplete();
// Test a timeout operator
StepVerifier.withVirtualTime(() ->
Mono.delay(Duration.ofSeconds(10)) // source is slow
.timeout(Duration.ofSeconds(3))) // but timeout is 3s
.expectSubscription()
.thenAwait(Duration.ofSeconds(3)) // advance past timeout
.verifyError(TimeoutException.class);
// Test retry with exponential back-off
AtomicInteger attempts = new AtomicInteger();
StepVerifier.withVirtualTime(() ->
Mono.fromCallable(() -> {
if (attempts.incrementAndGet() < 3) throw new RuntimeException("fail");
return "success";
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))))
.expectSubscription()
.thenAwait(Duration.ofSeconds(1)) // first retry after 1s
.thenAwait(Duration.ofSeconds(2)) // second retry after 2s
.expectNext("success")
.verifyComplete();
// Test delayElements
StepVerifier.withVirtualTime(() ->
Flux.just("a", "b", "c")
.delayElements(Duration.ofMinutes(1)))
.expectSubscription()
.expectNoEvent(Duration.ofMinutes(1)) // nothing emitted for 1 minute
.expectNext("a")
.thenAwait(Duration.ofMinutes(1))
.expectNext("b")
.thenAwait(Duration.ofMinutes(1))
.expectNext("c")
.verifyComplete();
Always pass a lambda (not a pre-built publisher) to withVirtualTime(). The lambda is called after the virtual scheduler is installed, so time-based operators subscribe to the virtual clock. If you pass a pre-built publisher, the real scheduler is already bound and virtual time won't work.
PublisherProbe wraps a publisher and records whether it was subscribed to, whether items were requested, and whether it was cancelled. This lets you verify which branch of a conditional pipeline was actually executed — something that is impossible with StepVerifier alone when both branches produce the same output type.
// Service with an if/else reactive branch
public Mono<String> process(Mono<Boolean> conditionMono,
Mono<String> primary,
Mono<String> fallback) {
return conditionMono.flatMap(condition ->
condition ? primary : fallback);
}
@Test
void shouldUseFallbackWhenConditionIsFalse() {
PublisherProbe<String> primaryProbe = PublisherProbe.of(Mono.just("primary"));
PublisherProbe<String> fallbackProbe = PublisherProbe.of(Mono.just("fallback"));
StepVerifier.create(
process(Mono.just(false), primaryProbe.mono(), fallbackProbe.mono()))
.expectNext("fallback")
.verifyComplete();
primaryProbe.assertWasNotSubscribed(); // primary branch was NOT used
fallbackProbe.assertWasSubscribed(); // fallback branch WAS used
}
@Test
void shouldUsePrimaryWhenConditionIsTrue() {
PublisherProbe<String> primaryProbe = PublisherProbe.of(Mono.just("primary"));
PublisherProbe<String> fallbackProbe = PublisherProbe.of(Mono.just("fallback"));
StepVerifier.create(
process(Mono.just(true), primaryProbe.mono(), fallbackProbe.mono()))
.expectNext("primary")
.verifyComplete();
primaryProbe.assertWasSubscribed();
fallbackProbe.assertWasNotSubscribed();
}
When your pipeline reads from Reactor Context (e.g., for trace IDs or user information), use StepVerifier's withInitialContext() or supply it via contextWrite in the publisher under test.
// Service that reads userId from Reactor Context
public Mono<String> greetCurrentUser() {
return Mono.deferContextual(ctx ->
Mono.just("Hello, " + ctx.get("userId") + "!"));
}
@Test
void greetsShouldUseUserIdFromContext() {
StepVerifier.create(
greetCurrentUser()
.contextWrite(Context.of("userId", "alice")))
.expectNext("Hello, alice!")
.verifyComplete();
}
WebTestClient is the reactive equivalent of MockMvc. It sends requests to your WebFlux controller and lets you assert the HTTP response — status, headers, and body — fluently. Use @WebFluxTest for slice tests that load only the web layer, or bind directly to a router function.
@WebFluxTest(OrderController.class)
class OrderControllerTest {
@Autowired
private WebTestClient webClient;
@MockBean
private OrderService orderService;
@Test
void getOrderById_returnsOrder() {
Order order = new Order(42L, "CONFIRMED", new BigDecimal("99.99"));
when(orderService.findById(42L)).thenReturn(Mono.just(order));
webClient.get().uri("/orders/42")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isOk()
.expectBody(Order.class)
.value(o -> {
assertThat(o.getId()).isEqualTo(42L);
assertThat(o.getStatus()).isEqualTo("CONFIRMED");
});
}
@Test
void getOrder_notFound_returns404() {
when(orderService.findById(99L))
.thenReturn(Mono.error(new OrderNotFoundException(99L)));
webClient.get().uri("/orders/99")
.exchange()
.expectStatus().isNotFound();
}
@Test
void streamOrders_returnsEventStream() {
Flux<Order> stream = Flux.just(new Order(1L), new Order(2L));
when(orderService.streamAll()).thenReturn(stream);
webClient.get().uri("/orders/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk()
.expectBodyList(Order.class)
.hasSize(2);
}
}