Contents

Add the following to your pom.xml. The reactor-test artifact is included automatically when you use spring-boot-starter-webflux with the test scope.

<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency>

StepVerifier.create(publisher) subscribes to the publisher and enters an assertion chain. Each expectNext() call consumes and verifies one item. The chain must always end with a terminal assertion (verifyComplete(), verifyError(), etc.) which triggers the actual subscription and throws if anything doesn't match.

// Asserting Flux items and completion StepVerifier.create(Flux.just("a", "b", "c")) .expectNext("a") .expectNext("b") .expectNext("c") .expectComplete() .verify(); // triggers subscription and blocks until done // Shorthand: expectNext accepts varargs StepVerifier.create(Flux.just(1, 2, 3)) .expectNext(1, 2, 3) .verifyComplete(); // expectComplete + verify in one call // Asserting Mono StepVerifier.create(Mono.just("hello").map(String::toUpperCase)) .expectNext("HELLO") .verifyComplete(); // Asserting empty Mono StepVerifier.create(Mono.empty()) .expectNextCount(0) .verifyComplete(); // expectNextMatches — custom predicate per item StepVerifier.create(userService.findAll()) .expectNextMatches(user -> user.getAge() >= 18) .expectNextMatches(user -> user.getEmail().contains("@")) .verifyComplete(); // assertNext — access the item for complex assertions (with AssertJ etc.) StepVerifier.create(orderService.findById(42L)) .assertNext(order -> { assertThat(order.getId()).isEqualTo(42L); assertThat(order.getStatus()).isEqualTo(OrderStatus.CONFIRMED); assertThat(order.getItems()).hasSize(3); }) .verifyComplete(); // expectNextCount — verify N items without inspecting each StepVerifier.create(Flux.range(1, 100)) .expectNextCount(100) .verifyComplete();

When a publisher terminates with an error, use verifyError() or expectError() to assert the error type and message. A failed assertion in the StepVerifier chain surfaces as a JUnit test failure with a clear message.

// Verify any error StepVerifier.create(Mono.error(new RuntimeException("oops"))) .verifyError(); // Verify specific error type StepVerifier.create(Mono.error(new IllegalArgumentException("bad input"))) .verifyError(IllegalArgumentException.class); // Verify error message StepVerifier.create(Mono.error(new RuntimeException("connection refused"))) .expectErrorMessage("connection refused") .verify(); // expectErrorMatches — custom predicate on the Throwable StepVerifier.create(userService.findById(-1L)) .expectErrorMatches(ex -> ex instanceof UserNotFoundException && ex.getMessage().contains("id=-1")) .verify(); // assertNext then error — some items before the error StepVerifier.create( Flux.just("ok-1", "ok-2") .concatWith(Mono.error(new RuntimeException("fail")))) .expectNext("ok-1", "ok-2") .verifyError(RuntimeException.class); // onErrorResume fallback — verify the recovered value StepVerifier.create( Mono.error(new RuntimeException("down")) .onErrorReturn("fallback")) .expectNext("fallback") .verifyComplete();

Operators like delayElements, timeout, interval, and retry with back-off rely on real wall-clock time. Waiting for a 5-second delay in a unit test is unacceptable. StepVerifier.withVirtualTime() replaces the real clock with a virtual one. You then call thenAwait(duration) to advance virtual time instantly — no real sleep involved.

// Test a Flux that emits every second — advance 5 seconds virtually StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofSeconds(1)).take(5)) .expectSubscription() .thenAwait(Duration.ofSeconds(5)) // advance clock 5 seconds instantly .expectNext(0L, 1L, 2L, 3L, 4L) .verifyComplete(); // Test a timeout operator StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofSeconds(10)) // source is slow .timeout(Duration.ofSeconds(3))) // but timeout is 3s .expectSubscription() .thenAwait(Duration.ofSeconds(3)) // advance past timeout .verifyError(TimeoutException.class); // Test retry with exponential back-off AtomicInteger attempts = new AtomicInteger(); StepVerifier.withVirtualTime(() -> Mono.fromCallable(() -> { if (attempts.incrementAndGet() < 3) throw new RuntimeException("fail"); return "success"; }) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))) .expectSubscription() .thenAwait(Duration.ofSeconds(1)) // first retry after 1s .thenAwait(Duration.ofSeconds(2)) // second retry after 2s .expectNext("success") .verifyComplete(); // Test delayElements StepVerifier.withVirtualTime(() -> Flux.just("a", "b", "c") .delayElements(Duration.ofMinutes(1))) .expectSubscription() .expectNoEvent(Duration.ofMinutes(1)) // nothing emitted for 1 minute .expectNext("a") .thenAwait(Duration.ofMinutes(1)) .expectNext("b") .thenAwait(Duration.ofMinutes(1)) .expectNext("c") .verifyComplete(); Always pass a lambda (not a pre-built publisher) to withVirtualTime(). The lambda is called after the virtual scheduler is installed, so time-based operators subscribe to the virtual clock. If you pass a pre-built publisher, the real scheduler is already bound and virtual time won't work.

PublisherProbe wraps a publisher and records whether it was subscribed to, whether items were requested, and whether it was cancelled. This lets you verify which branch of a conditional pipeline was actually executed — something that is impossible with StepVerifier alone when both branches produce the same output type.

// Service with an if/else reactive branch public Mono<String> process(Mono<Boolean> conditionMono, Mono<String> primary, Mono<String> fallback) { return conditionMono.flatMap(condition -> condition ? primary : fallback); } @Test void shouldUseFallbackWhenConditionIsFalse() { PublisherProbe<String> primaryProbe = PublisherProbe.of(Mono.just("primary")); PublisherProbe<String> fallbackProbe = PublisherProbe.of(Mono.just("fallback")); StepVerifier.create( process(Mono.just(false), primaryProbe.mono(), fallbackProbe.mono())) .expectNext("fallback") .verifyComplete(); primaryProbe.assertWasNotSubscribed(); // primary branch was NOT used fallbackProbe.assertWasSubscribed(); // fallback branch WAS used } @Test void shouldUsePrimaryWhenConditionIsTrue() { PublisherProbe<String> primaryProbe = PublisherProbe.of(Mono.just("primary")); PublisherProbe<String> fallbackProbe = PublisherProbe.of(Mono.just("fallback")); StepVerifier.create( process(Mono.just(true), primaryProbe.mono(), fallbackProbe.mono())) .expectNext("primary") .verifyComplete(); primaryProbe.assertWasSubscribed(); fallbackProbe.assertWasNotSubscribed(); }

When your pipeline reads from Reactor Context (e.g., for trace IDs or user information), use StepVerifier's withInitialContext() or supply it via contextWrite in the publisher under test.

// Service that reads userId from Reactor Context public Mono<String> greetCurrentUser() { return Mono.deferContextual(ctx -> Mono.just("Hello, " + ctx.get("userId") + "!")); } @Test void greetsShouldUseUserIdFromContext() { StepVerifier.create( greetCurrentUser() .contextWrite(Context.of("userId", "alice"))) .expectNext("Hello, alice!") .verifyComplete(); }

WebTestClient is the reactive equivalent of MockMvc. It sends requests to your WebFlux controller and lets you assert the HTTP response — status, headers, and body — fluently. Use @WebFluxTest for slice tests that load only the web layer, or bind directly to a router function.

@WebFluxTest(OrderController.class) class OrderControllerTest { @Autowired private WebTestClient webClient; @MockBean private OrderService orderService; @Test void getOrderById_returnsOrder() { Order order = new Order(42L, "CONFIRMED", new BigDecimal("99.99")); when(orderService.findById(42L)).thenReturn(Mono.just(order)); webClient.get().uri("/orders/42") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBody(Order.class) .value(o -> { assertThat(o.getId()).isEqualTo(42L); assertThat(o.getStatus()).isEqualTo("CONFIRMED"); }); } @Test void getOrder_notFound_returns404() { when(orderService.findById(99L)) .thenReturn(Mono.error(new OrderNotFoundException(99L))); webClient.get().uri("/orders/99") .exchange() .expectStatus().isNotFound(); } @Test void streamOrders_returnsEventStream() { Flux<Order> stream = Flux.just(new Order(1L), new Order(2L)); when(orderService.streamAll()).thenReturn(stream); webClient.get().uri("/orders/stream") .accept(MediaType.TEXT_EVENT_STREAM) .exchange() .expectStatus().isOk() .expectBodyList(Order.class) .hasSize(2); } }