Contents
- Dependencies & Configuration
- Entity Mapping
- R2dbcRepository — CRUD & Query Derivation
- Custom @Query Methods
- DatabaseClient — Fluent SQL
- Reactive Transactions
- Auditing & Pagination
- Testing with R2DBC & Testcontainers
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<!-- PostgreSQL R2DBC driver -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<!-- For schema migrations, use Flyway/Liquibase with a JDBC URL (R2DBC has no DDL support) -->
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId> <!-- for Flyway JDBC access -->
</dependency>
# application.yml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/mydb
username: app
password: secret
pool:
initial-size: 5
max-size: 20
max-idle-time: 30m
# Flyway uses a JDBC URL for migrations (R2DBC doesn't support DDL)
datasource:
url: jdbc:postgresql://localhost:5432/mydb
username: app
password: secret
flyway:
enabled: true
R2DBC entities are simple POJOs — no JPA annotations, no @Entity. Use Spring Data's annotations for mapping.
import org.springframework.data.annotation.*;
import org.springframework.data.relational.core.mapping.*;
@Table("orders")
public class Order {
@Id
private Long id;
@Column("customer_id")
private Long customerId;
private String status; // maps to column "status"
@Column("total_amount")
private BigDecimal totalAmount;
@CreatedDate
private Instant createdAt;
@LastModifiedDate
private Instant updatedAt;
@Version // optimistic locking
private Long version;
// Standard constructors, getters, setters (or use @Data / record)
}
// R2DBC does NOT support @OneToMany / @ManyToOne — use DatabaseClient for joins
// Each entity maps to exactly one table; relationships must be loaded separately
R2DBC intentionally has no lazy-loading, no cascades, and no relationship mappings (@OneToMany etc.). It is a lightweight SQL mapping layer. For complex object graphs, load related entities in separate reactive queries and zip/flatMap them together.
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface OrderRepository extends R2dbcRepository<Order, Long> {
// Query derivation — Spring generates the SQL
Flux<Order> findByCustomerId(Long customerId);
Flux<Order> findByStatusAndTotalAmountGreaterThan(String status, BigDecimal minAmount);
Mono<Order> findFirstByCustomerIdOrderByCreatedAtDesc(Long customerId);
Flux<Order> findByCreatedAtBetween(Instant from, Instant to);
Mono<Long> countByStatus(String status);
Mono<Void> deleteByCustomerIdAndStatus(Long customerId, String status);
}
// All methods return Mono or Flux — use them in WebFlux handlers
@RestController
@RequiredArgsConstructor
@RequestMapping("/orders")
public class OrderController {
private final OrderRepository orderRepo;
@GetMapping("/{id}")
public Mono<Order> getOrder(@PathVariable Long id) {
return orderRepo.findById(id)
.switchIfEmpty(Mono.error(new OrderNotFoundException(id)));
}
@GetMapping("/customer/{customerId}")
public Flux<Order> getCustomerOrders(@PathVariable Long customerId) {
return orderRepo.findByCustomerId(customerId);
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<Order> createOrder(@RequestBody @Validated CreateOrderRequest req) {
Order order = new Order(null, req.customerId(), "NEW", req.total(), null, null, null);
return orderRepo.save(order);
}
}
import org.springframework.data.r2dbc.repository.Query;
public interface OrderRepository extends R2dbcRepository<Order, Long> {
// Native SQL with named parameters
@Query("SELECT * FROM orders WHERE status = :status ORDER BY created_at DESC LIMIT :limit")
Flux<Order> findRecentByStatus(String status, int limit);
// Modifying query (UPDATE / DELETE) — must annotate with @Modifying
@Modifying
@Query("UPDATE orders SET status = :newStatus WHERE id = :id AND status = :currentStatus")
Mono<Integer> tryStatusTransition(Long id, String currentStatus, String newStatus);
// Projection — map to a simpler interface/record
@Query("SELECT id, status, total_amount FROM orders WHERE customer_id = :customerId")
Flux<OrderSummary> findSummariesByCustomer(Long customerId);
// Pagination
Flux<Order> findByStatus(String status, Pageable pageable);
}
// OrderSummary projection interface
public interface OrderSummary {
Long getId();
String getStatus();
BigDecimal getTotalAmount();
}
DatabaseClient gives you direct, low-level reactive SQL execution — useful for complex joins, stored procedures, batch operations, or anything that doesn't fit query derivation.
import org.springframework.r2dbc.core.DatabaseClient;
@Repository
@RequiredArgsConstructor
public class OrderDashboardRepository {
private final DatabaseClient db;
// Complex join — manual mapping
public Flux<OrderWithCustomer> findOrdersWithCustomer(String status) {
return db.sql("""
SELECT o.id, o.status, o.total_amount,
c.name AS customer_name, c.email AS customer_email
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = :status
ORDER BY o.created_at DESC
""")
.bind("status", status)
.map((row, meta) -> new OrderWithCustomer(
row.get("id", Long.class),
row.get("status", String.class),
row.get("total_amount", BigDecimal.class),
row.get("customer_name", String.class),
row.get("customer_email", String.class)))
.all();
}
// Aggregate query
public Mono<OrderStats> getStats() {
return db.sql("SELECT COUNT(*) AS total, SUM(total_amount) AS revenue FROM orders")
.map((row, meta) -> new OrderStats(
row.get("total", Long.class),
row.get("revenue", BigDecimal.class)))
.one();
}
// Batch insert
public Mono<Void> bulkInsert(List<Order> orders) {
return Flux.fromIterable(orders)
.flatMap(o -> db.sql("INSERT INTO orders(customer_id, status, total_amount) VALUES(:cid,:s,:t)")
.bind("cid", o.getCustomerId())
.bind("s", o.getStatus())
.bind("t", o.getTotalAmount())
.fetch().rowsUpdated())
.then();
}
}
// @Transactional works reactively when the return type is Mono/Flux
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepo;
private final InventoryRepository inventoryRepo;
private final ReactiveTransactionManager txManager; // auto-configured
@Transactional // reactive transaction — commits on Mono completion, rolls back on error
public Mono<Order> placeOrder(CreateOrderRequest req) {
return inventoryRepo.reserveStock(req.productId(), req.quantity())
.filter(reserved -> reserved)
.switchIfEmpty(Mono.error(new InsufficientStockException()))
.flatMap(__ -> orderRepo.save(new Order(req)))
.flatMap(order -> inventoryRepo.deductStock(req.productId(), req.quantity())
.thenReturn(order));
// If any step throws, the whole transaction rolls back
}
// Programmatic transaction using TransactionalOperator
public Mono<Order> placeOrderProgrammatic(CreateOrderRequest req) {
TransactionalOperator txOp = TransactionalOperator.create(txManager);
Mono<Order> work = inventoryRepo.reserveStock(req.productId(), req.quantity())
.flatMap(__ -> orderRepo.save(new Order(req)));
return txOp.transactional(work);
}
}
Never use @Transactional from a non-reactive context on a reactive method. The transaction context is stored in the Reactor Context, not ThreadLocal. Mixing reactive and blocking transaction management leads to silent transaction boundary violations.
// Enable auditing
@Configuration
@EnableR2dbcAuditing
public class R2dbcConfig { }
// @CreatedDate and @LastModifiedDate on entity fields are populated automatically
// (shown in the Order entity above)
// Pagination — returns Flux (R2DBC has no Page wrapper like JPA)
public interface OrderRepository extends R2dbcRepository<Order, Long> {
Flux<Order> findByStatus(String status, Pageable pageable);
Mono<Long> countByStatus(String status);
}
// Service layer — build a page manually
public Mono<Map<String, Object>> getOrderPage(String status, int page, int size) {
Pageable pageable = PageRequest.of(page, size, Sort.by("createdAt").descending());
return Mono.zip(
orderRepo.findByStatus(status, pageable).collectList(),
orderRepo.countByStatus(status)
).map(tuple -> Map.of(
"content", tuple.getT1(),
"total", tuple.getT2(),
"page", page,
"size", size,
"totalPages", (int) Math.ceil((double) tuple.getT2() / size)
));
}
// @DataR2dbcTest — sliced test for R2DBC layer only
@DataR2dbcTest
@Testcontainers
@Import(FlywayMigrationConfig.class) // run migrations before tests
class OrderRepositoryTest {
@Container
@ServiceConnection
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16-alpine");
@Autowired OrderRepository orderRepo;
@Autowired DatabaseClient db;
@Test
void save_and_findById() {
StepVerifier.create(
orderRepo.save(new Order(null, 1L, "NEW", BigDecimal.TEN, null, null, null))
.flatMap(saved -> orderRepo.findById(saved.getId())))
.assertNext(o -> {
assertThat(o.getStatus()).isEqualTo("NEW");
assertThat(o.getCustomerId()).isEqualTo(1L);
})
.verifyComplete();
}
@Test
void findByStatus_returnsMatchingOrders() {
// Insert test data reactively
Flux<Order> setup = orderRepo.saveAll(List.of(
new Order(null, 1L, "PENDING", BigDecimal.ONE, null, null, null),
new Order(null, 2L, "PENDING", BigDecimal.TEN, null, null, null),
new Order(null, 3L, "SHIPPED", BigDecimal.TEN, null, null, null)
));
StepVerifier.create(
setup.thenMany(orderRepo.findByStatus("PENDING")))
.expectNextCount(2)
.verifyComplete();
}
@BeforeEach
void cleanUp() {
orderRepo.deleteAll().block(); // allowed in tests — not in production handlers
}
}
Use @DataR2dbcTest for repository tests — it loads only the R2DBC slice (repositories, DatabaseClient, converters). Pair with Testcontainers and @ServiceConnection for a real PostgreSQL/MySQL database without any manual connection property wiring.