Contents
- Core Concepts
- Domain Events
- Aggregate — Applying Events to Rebuild State
- Event Store
- Command Handler
- Read-Model Projection
- Query Side — REST Controller
- When to Use CQRS + Event Sourcing
| Term | Definition |
| Command | An intent to change state — e.g. PlaceOrderCommand. Has no return value (or returns an ID). |
| Event | A fact that happened — immutable, past tense — e.g. OrderPlacedEvent. Stored permanently. |
| Aggregate | The write-side entity. Validates commands and emits events. State is rebuilt by replaying its events. |
| Event Store | Append-only database of events ordered by aggregate ID + version number. |
| Projection | Listens for events and maintains a denormalised read model optimised for queries. |
| Query | Reads from the projection (read model) — never touches the event store directly. |
The class below shows the implementation. Key points are highlighted in the inline comments.
// Base marker interface
public interface DomainEvent {
String aggregateId();
Instant occurredAt();
}
// Concrete events — sealed hierarchy gives exhaustive switch in projections
public sealed interface OrderEvent extends DomainEvent
permits OrderPlacedEvent, OrderConfirmedEvent, OrderCancelledEvent, OrderItemAddedEvent {}
public record OrderPlacedEvent(
String aggregateId, Instant occurredAt,
String customerId, BigDecimal totalAmount) implements OrderEvent {}
public record OrderConfirmedEvent(
String aggregateId, Instant occurredAt,
String paymentTransactionId) implements OrderEvent {}
public record OrderCancelledEvent(
String aggregateId, Instant occurredAt,
String reason) implements OrderEvent {}
The aggregate validates incoming commands against business rules, emits events (but doesn't persist them directly), and applies those events to update its own state via apply() methods.
public class OrderAggregate {
private String id;
private String customerId;
private OrderStatus status;
private BigDecimal totalAmount;
private int version = 0;
private final List<DomainEvent> pendingEvents = new ArrayList<>();
// ── Reconstitute from event history ──────────────────────────────
public static OrderAggregate reconstitute(List<DomainEvent> history) {
OrderAggregate agg = new OrderAggregate();
history.forEach(agg::apply);
return agg;
}
// ── Command handlers ──────────────────────────────────────────────
public void handle(PlaceOrderCommand cmd) {
if (status != null) throw new IllegalStateException("Order already exists");
raise(new OrderPlacedEvent(cmd.orderId(), Instant.now(),
cmd.customerId(), cmd.totalAmount()));
}
public void handle(ConfirmOrderCommand cmd) {
if (status != OrderStatus.PENDING)
throw new IllegalStateException("Only PENDING orders can be confirmed");
raise(new OrderConfirmedEvent(id, Instant.now(), cmd.transactionId()));
}
public void handle(CancelOrderCommand cmd) {
if (status == OrderStatus.CONFIRMED)
throw new IllegalStateException("Cannot cancel a confirmed order");
raise(new OrderCancelledEvent(id, Instant.now(), cmd.reason()));
}
// ── Event application — pure state mutation, no side effects ─────
private void apply(DomainEvent event) {
switch (event) {
case OrderPlacedEvent e -> {
this.id = e.aggregateId();
this.customerId = e.customerId();
this.totalAmount = e.totalAmount();
this.status = OrderStatus.PENDING;
}
case OrderConfirmedEvent e -> this.status = OrderStatus.CONFIRMED;
case OrderCancelledEvent e -> this.status = OrderStatus.CANCELLED;
default -> {}
}
this.version++;
}
private void raise(DomainEvent event) {
apply(event);
pendingEvents.add(event);
}
public List<DomainEvent> getPendingEvents() { return List.copyOf(pendingEvents); }
public void clearPendingEvents() { pendingEvents.clear(); }
public int getVersion() { return version; }
}
Execute the SQL statements below against your target database. Review each statement before running in production.
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
aggregate_id VARCHAR(100) NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(200) NOT NULL,
payload JSONB NOT NULL,
version INT NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
UNIQUE (aggregate_id, version) -- optimistic concurrency
);
CREATE INDEX idx_es_aggregate ON event_store(aggregate_id, version);
@Service
public class EventStore {
private final JdbcTemplate jdbc;
private final ObjectMapper mapper;
public void append(String aggregateId, String aggregateType,
List<DomainEvent> events, int expectedVersion) {
int version = expectedVersion;
for (DomainEvent event : events) {
try {
jdbc.update("""
INSERT INTO event_store
(aggregate_id, aggregate_type, event_type, payload, version, occurred_at)
VALUES (?, ?, ?, ?::jsonb, ?, ?)
""",
aggregateId, aggregateType, event.getClass().getName(),
mapper.writeValueAsString(event), ++version, event.occurredAt());
} catch (DuplicateKeyException e) {
throw new OptimisticConcurrencyException(
"Concurrent modification of aggregate " + aggregateId);
}
}
}
public List<DomainEvent> load(String aggregateId) {
return jdbc.query(
"SELECT event_type, payload FROM event_store WHERE aggregate_id = ? ORDER BY version",
(rs, i) -> deserialize(rs.getString("event_type"), rs.getString("payload")),
aggregateId);
}
private DomainEvent deserialize(String type, String json) {
try {
return (DomainEvent) mapper.readValue(json, Class.forName(type));
} catch (Exception e) { throw new RuntimeException(e); }
}
}
The class below shows the implementation. Key points are highlighted in the inline comments.
@Service
@Transactional
public class OrderCommandHandler {
private final EventStore eventStore;
private final ApplicationEventPublisher eventPublisher;
public String handle(PlaceOrderCommand cmd) {
OrderAggregate order = new OrderAggregate();
order.handle(cmd);
eventStore.append(cmd.orderId(), "Order", order.getPendingEvents(), 0);
// Publish for projections (can also be done via Outbox pattern)
order.getPendingEvents().forEach(eventPublisher::publishEvent);
order.clearPendingEvents();
return cmd.orderId();
}
public void handle(ConfirmOrderCommand cmd) {
List<DomainEvent> history = eventStore.load(cmd.orderId());
OrderAggregate order = OrderAggregate.reconstitute(history);
int currentVersion = order.getVersion();
order.handle(cmd);
eventStore.append(cmd.orderId(), "Order", order.getPendingEvents(), currentVersion);
order.getPendingEvents().forEach(eventPublisher::publishEvent);
order.clearPendingEvents();
}
}
The projection listens for domain events (via Spring's @EventListener or Kafka) and updates a denormalised read table optimised for queries.
@Component
public class OrderSummaryProjection {
private final OrderSummaryRepository repo;
@EventListener
@Transactional
public void on(OrderPlacedEvent event) {
repo.save(new OrderSummary(
event.aggregateId(), event.customerId(),
event.totalAmount(), "PENDING", event.occurredAt()));
}
@EventListener
@Transactional
public void on(OrderConfirmedEvent event) {
repo.findById(event.aggregateId()).ifPresent(summary -> {
summary.setStatus("CONFIRMED");
summary.setUpdatedAt(event.occurredAt());
repo.save(summary);
});
}
@EventListener
@Transactional
public void on(OrderCancelledEvent event) {
repo.findById(event.aggregateId()).ifPresent(summary -> {
summary.setStatus("CANCELLED");
summary.setCancellationReason(event.reason());
repo.save(summary);
});
}
}
The class below shows the implementation. Key points are highlighted in the inline comments.
@RestController
@RequestMapping("/api/orders")
public class OrderQueryController {
private final OrderSummaryRepository repo;
// Reads from the read model — fast, denormalised, no event replay needed
@GetMapping("/{id}")
public OrderSummary getOrder(@PathVariable String id) {
return repo.findById(id).orElseThrow(() -> new OrderNotFoundException(id));
}
@GetMapping
public Page<OrderSummary> listOrders(
@RequestParam(defaultValue = "PENDING") String status, Pageable pageable) {
return repo.findByStatus(status, pageable);
}
}
@RestController
@RequestMapping("/api/orders")
public class OrderCommandController {
private final OrderCommandHandler handler;
@PostMapping
public ResponseEntity<Map<String, String>> placeOrder(@RequestBody PlaceOrderRequest req) {
String orderId = handler.handle(new PlaceOrderCommand(
UUID.randomUUID().toString(), req.customerId(), req.totalAmount()));
return ResponseEntity.accepted().body(Map.of("orderId", orderId));
}
}
- Use it when you need a full audit trail, the ability to replay history, or time-travel queries.
- Use it when read and write workloads are asymmetric and need independent scaling.
- Avoid it for simple CRUD applications — the complexity is rarely justified.
- Avoid it when the team is new to the pattern — start with CQRS alone (separate read/write models without event sourcing) and add event sourcing when you actually need it.