Contents

<!-- pom.xml --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId> </dependency> <!-- PostgreSQL R2DBC driver --> <dependency> <groupId>org.postgresql</groupId> <artifactId>r2dbc-postgresql</artifactId> <scope>runtime</scope> </dependency> <!-- For schema migrations, use Flyway/Liquibase with a JDBC URL (R2DBC has no DDL support) --> <dependency> <groupId>org.flywaydb</groupId> <artifactId>flyway-core</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> <!-- for Flyway JDBC access --> </dependency> # application.yml spring: r2dbc: url: r2dbc:postgresql://localhost:5432/mydb username: app password: secret pool: initial-size: 5 max-size: 20 max-idle-time: 30m # Flyway uses a JDBC URL for migrations (R2DBC doesn't support DDL) datasource: url: jdbc:postgresql://localhost:5432/mydb username: app password: secret flyway: enabled: true

R2DBC entities are simple POJOs — no JPA annotations, no @Entity. Use Spring Data's annotations for mapping.

import org.springframework.data.annotation.*; import org.springframework.data.relational.core.mapping.*; @Table("orders") public class Order { @Id private Long id; @Column("customer_id") private Long customerId; private String status; // maps to column "status" @Column("total_amount") private BigDecimal totalAmount; @CreatedDate private Instant createdAt; @LastModifiedDate private Instant updatedAt; @Version // optimistic locking private Long version; // Standard constructors, getters, setters (or use @Data / record) } // R2DBC does NOT support @OneToMany / @ManyToOne — use DatabaseClient for joins // Each entity maps to exactly one table; relationships must be loaded separately R2DBC intentionally has no lazy-loading, no cascades, and no relationship mappings (@OneToMany etc.). It is a lightweight SQL mapping layer. For complex object graphs, load related entities in separate reactive queries and zip/flatMap them together.
import org.springframework.data.r2dbc.repository.R2dbcRepository; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface OrderRepository extends R2dbcRepository<Order, Long> { // Query derivation — Spring generates the SQL Flux<Order> findByCustomerId(Long customerId); Flux<Order> findByStatusAndTotalAmountGreaterThan(String status, BigDecimal minAmount); Mono<Order> findFirstByCustomerIdOrderByCreatedAtDesc(Long customerId); Flux<Order> findByCreatedAtBetween(Instant from, Instant to); Mono<Long> countByStatus(String status); Mono<Void> deleteByCustomerIdAndStatus(Long customerId, String status); } // All methods return Mono or Flux — use them in WebFlux handlers @RestController @RequiredArgsConstructor @RequestMapping("/orders") public class OrderController { private final OrderRepository orderRepo; @GetMapping("/{id}") public Mono<Order> getOrder(@PathVariable Long id) { return orderRepo.findById(id) .switchIfEmpty(Mono.error(new OrderNotFoundException(id))); } @GetMapping("/customer/{customerId}") public Flux<Order> getCustomerOrders(@PathVariable Long customerId) { return orderRepo.findByCustomerId(customerId); } @PostMapping @ResponseStatus(HttpStatus.CREATED) public Mono<Order> createOrder(@RequestBody @Validated CreateOrderRequest req) { Order order = new Order(null, req.customerId(), "NEW", req.total(), null, null, null); return orderRepo.save(order); } } import org.springframework.data.r2dbc.repository.Query; public interface OrderRepository extends R2dbcRepository<Order, Long> { // Native SQL with named parameters @Query("SELECT * FROM orders WHERE status = :status ORDER BY created_at DESC LIMIT :limit") Flux<Order> findRecentByStatus(String status, int limit); // Modifying query (UPDATE / DELETE) — must annotate with @Modifying @Modifying @Query("UPDATE orders SET status = :newStatus WHERE id = :id AND status = :currentStatus") Mono<Integer> tryStatusTransition(Long id, String currentStatus, String newStatus); // Projection — map to a simpler interface/record @Query("SELECT id, status, total_amount FROM orders WHERE customer_id = :customerId") Flux<OrderSummary> findSummariesByCustomer(Long customerId); // Pagination Flux<Order> findByStatus(String status, Pageable pageable); } // OrderSummary projection interface public interface OrderSummary { Long getId(); String getStatus(); BigDecimal getTotalAmount(); }

DatabaseClient gives you direct, low-level reactive SQL execution — useful for complex joins, stored procedures, batch operations, or anything that doesn't fit query derivation.

import org.springframework.r2dbc.core.DatabaseClient; @Repository @RequiredArgsConstructor public class OrderDashboardRepository { private final DatabaseClient db; // Complex join — manual mapping public Flux<OrderWithCustomer> findOrdersWithCustomer(String status) { return db.sql(""" SELECT o.id, o.status, o.total_amount, c.name AS customer_name, c.email AS customer_email FROM orders o JOIN customers c ON o.customer_id = c.id WHERE o.status = :status ORDER BY o.created_at DESC """) .bind("status", status) .map((row, meta) -> new OrderWithCustomer( row.get("id", Long.class), row.get("status", String.class), row.get("total_amount", BigDecimal.class), row.get("customer_name", String.class), row.get("customer_email", String.class))) .all(); } // Aggregate query public Mono<OrderStats> getStats() { return db.sql("SELECT COUNT(*) AS total, SUM(total_amount) AS revenue FROM orders") .map((row, meta) -> new OrderStats( row.get("total", Long.class), row.get("revenue", BigDecimal.class))) .one(); } // Batch insert public Mono<Void> bulkInsert(List<Order> orders) { return Flux.fromIterable(orders) .flatMap(o -> db.sql("INSERT INTO orders(customer_id, status, total_amount) VALUES(:cid,:s,:t)") .bind("cid", o.getCustomerId()) .bind("s", o.getStatus()) .bind("t", o.getTotalAmount()) .fetch().rowsUpdated()) .then(); } }
// @Transactional works reactively when the return type is Mono/Flux @Service @RequiredArgsConstructor public class OrderService { private final OrderRepository orderRepo; private final InventoryRepository inventoryRepo; private final ReactiveTransactionManager txManager; // auto-configured @Transactional // reactive transaction — commits on Mono completion, rolls back on error public Mono<Order> placeOrder(CreateOrderRequest req) { return inventoryRepo.reserveStock(req.productId(), req.quantity()) .filter(reserved -> reserved) .switchIfEmpty(Mono.error(new InsufficientStockException())) .flatMap(__ -> orderRepo.save(new Order(req))) .flatMap(order -> inventoryRepo.deductStock(req.productId(), req.quantity()) .thenReturn(order)); // If any step throws, the whole transaction rolls back } // Programmatic transaction using TransactionalOperator public Mono<Order> placeOrderProgrammatic(CreateOrderRequest req) { TransactionalOperator txOp = TransactionalOperator.create(txManager); Mono<Order> work = inventoryRepo.reserveStock(req.productId(), req.quantity()) .flatMap(__ -> orderRepo.save(new Order(req))); return txOp.transactional(work); } } Never use @Transactional from a non-reactive context on a reactive method. The transaction context is stored in the Reactor Context, not ThreadLocal. Mixing reactive and blocking transaction management leads to silent transaction boundary violations. // Enable auditing @Configuration @EnableR2dbcAuditing public class R2dbcConfig { } // @CreatedDate and @LastModifiedDate on entity fields are populated automatically // (shown in the Order entity above) // Pagination — returns Flux (R2DBC has no Page wrapper like JPA) public interface OrderRepository extends R2dbcRepository<Order, Long> { Flux<Order> findByStatus(String status, Pageable pageable); Mono<Long> countByStatus(String status); } // Service layer — build a page manually public Mono<Map<String, Object>> getOrderPage(String status, int page, int size) { Pageable pageable = PageRequest.of(page, size, Sort.by("createdAt").descending()); return Mono.zip( orderRepo.findByStatus(status, pageable).collectList(), orderRepo.countByStatus(status) ).map(tuple -> Map.of( "content", tuple.getT1(), "total", tuple.getT2(), "page", page, "size", size, "totalPages", (int) Math.ceil((double) tuple.getT2() / size) )); } // @DataR2dbcTest — sliced test for R2DBC layer only @DataR2dbcTest @Testcontainers @Import(FlywayMigrationConfig.class) // run migrations before tests class OrderRepositoryTest { @Container @ServiceConnection static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16-alpine"); @Autowired OrderRepository orderRepo; @Autowired DatabaseClient db; @Test void save_and_findById() { StepVerifier.create( orderRepo.save(new Order(null, 1L, "NEW", BigDecimal.TEN, null, null, null)) .flatMap(saved -> orderRepo.findById(saved.getId()))) .assertNext(o -> { assertThat(o.getStatus()).isEqualTo("NEW"); assertThat(o.getCustomerId()).isEqualTo(1L); }) .verifyComplete(); } @Test void findByStatus_returnsMatchingOrders() { // Insert test data reactively Flux<Order> setup = orderRepo.saveAll(List.of( new Order(null, 1L, "PENDING", BigDecimal.ONE, null, null, null), new Order(null, 2L, "PENDING", BigDecimal.TEN, null, null, null), new Order(null, 3L, "SHIPPED", BigDecimal.TEN, null, null, null) )); StepVerifier.create( setup.thenMany(orderRepo.findByStatus("PENDING"))) .expectNextCount(2) .verifyComplete(); } @BeforeEach void cleanUp() { orderRepo.deleteAll().block(); // allowed in tests — not in production handlers } } Use @DataR2dbcTest for repository tests — it loads only the R2DBC slice (repositories, DatabaseClient, converters). Pair with Testcontainers and @ServiceConnection for a real PostgreSQL/MySQL database without any manual connection property wiring.