Contents

TermDefinition
CommandAn intent to change state — e.g. PlaceOrderCommand. Has no return value (or returns an ID).
EventA fact that happened — immutable, past tense — e.g. OrderPlacedEvent. Stored permanently.
AggregateThe write-side entity. Validates commands and emits events. State is rebuilt by replaying its events.
Event StoreAppend-only database of events ordered by aggregate ID + version number.
ProjectionListens for events and maintains a denormalised read model optimised for queries.
QueryReads from the projection (read model) — never touches the event store directly.

The class below shows the implementation. Key points are highlighted in the inline comments.

// Base marker interface public interface DomainEvent { String aggregateId(); Instant occurredAt(); } // Concrete events — sealed hierarchy gives exhaustive switch in projections public sealed interface OrderEvent extends DomainEvent permits OrderPlacedEvent, OrderConfirmedEvent, OrderCancelledEvent, OrderItemAddedEvent {} public record OrderPlacedEvent( String aggregateId, Instant occurredAt, String customerId, BigDecimal totalAmount) implements OrderEvent {} public record OrderConfirmedEvent( String aggregateId, Instant occurredAt, String paymentTransactionId) implements OrderEvent {} public record OrderCancelledEvent( String aggregateId, Instant occurredAt, String reason) implements OrderEvent {}

The aggregate validates incoming commands against business rules, emits events (but doesn't persist them directly), and applies those events to update its own state via apply() methods.

public class OrderAggregate { private String id; private String customerId; private OrderStatus status; private BigDecimal totalAmount; private int version = 0; private final List<DomainEvent> pendingEvents = new ArrayList<>(); // ── Reconstitute from event history ────────────────────────────── public static OrderAggregate reconstitute(List<DomainEvent> history) { OrderAggregate agg = new OrderAggregate(); history.forEach(agg::apply); return agg; } // ── Command handlers ────────────────────────────────────────────── public void handle(PlaceOrderCommand cmd) { if (status != null) throw new IllegalStateException("Order already exists"); raise(new OrderPlacedEvent(cmd.orderId(), Instant.now(), cmd.customerId(), cmd.totalAmount())); } public void handle(ConfirmOrderCommand cmd) { if (status != OrderStatus.PENDING) throw new IllegalStateException("Only PENDING orders can be confirmed"); raise(new OrderConfirmedEvent(id, Instant.now(), cmd.transactionId())); } public void handle(CancelOrderCommand cmd) { if (status == OrderStatus.CONFIRMED) throw new IllegalStateException("Cannot cancel a confirmed order"); raise(new OrderCancelledEvent(id, Instant.now(), cmd.reason())); } // ── Event application — pure state mutation, no side effects ───── private void apply(DomainEvent event) { switch (event) { case OrderPlacedEvent e -> { this.id = e.aggregateId(); this.customerId = e.customerId(); this.totalAmount = e.totalAmount(); this.status = OrderStatus.PENDING; } case OrderConfirmedEvent e -> this.status = OrderStatus.CONFIRMED; case OrderCancelledEvent e -> this.status = OrderStatus.CANCELLED; default -> {} } this.version++; } private void raise(DomainEvent event) { apply(event); pendingEvents.add(event); } public List<DomainEvent> getPendingEvents() { return List.copyOf(pendingEvents); } public void clearPendingEvents() { pendingEvents.clear(); } public int getVersion() { return version; } }

Execute the SQL statements below against your target database. Review each statement before running in production.

CREATE TABLE event_store ( id BIGSERIAL PRIMARY KEY, aggregate_id VARCHAR(100) NOT NULL, aggregate_type VARCHAR(100) NOT NULL, event_type VARCHAR(200) NOT NULL, payload JSONB NOT NULL, version INT NOT NULL, occurred_at TIMESTAMPTZ NOT NULL, UNIQUE (aggregate_id, version) -- optimistic concurrency ); CREATE INDEX idx_es_aggregate ON event_store(aggregate_id, version); @Service public class EventStore { private final JdbcTemplate jdbc; private final ObjectMapper mapper; public void append(String aggregateId, String aggregateType, List<DomainEvent> events, int expectedVersion) { int version = expectedVersion; for (DomainEvent event : events) { try { jdbc.update(""" INSERT INTO event_store (aggregate_id, aggregate_type, event_type, payload, version, occurred_at) VALUES (?, ?, ?, ?::jsonb, ?, ?) """, aggregateId, aggregateType, event.getClass().getName(), mapper.writeValueAsString(event), ++version, event.occurredAt()); } catch (DuplicateKeyException e) { throw new OptimisticConcurrencyException( "Concurrent modification of aggregate " + aggregateId); } } } public List<DomainEvent> load(String aggregateId) { return jdbc.query( "SELECT event_type, payload FROM event_store WHERE aggregate_id = ? ORDER BY version", (rs, i) -> deserialize(rs.getString("event_type"), rs.getString("payload")), aggregateId); } private DomainEvent deserialize(String type, String json) { try { return (DomainEvent) mapper.readValue(json, Class.forName(type)); } catch (Exception e) { throw new RuntimeException(e); } } }

The class below shows the implementation. Key points are highlighted in the inline comments.

@Service @Transactional public class OrderCommandHandler { private final EventStore eventStore; private final ApplicationEventPublisher eventPublisher; public String handle(PlaceOrderCommand cmd) { OrderAggregate order = new OrderAggregate(); order.handle(cmd); eventStore.append(cmd.orderId(), "Order", order.getPendingEvents(), 0); // Publish for projections (can also be done via Outbox pattern) order.getPendingEvents().forEach(eventPublisher::publishEvent); order.clearPendingEvents(); return cmd.orderId(); } public void handle(ConfirmOrderCommand cmd) { List<DomainEvent> history = eventStore.load(cmd.orderId()); OrderAggregate order = OrderAggregate.reconstitute(history); int currentVersion = order.getVersion(); order.handle(cmd); eventStore.append(cmd.orderId(), "Order", order.getPendingEvents(), currentVersion); order.getPendingEvents().forEach(eventPublisher::publishEvent); order.clearPendingEvents(); } }

The projection listens for domain events (via Spring's @EventListener or Kafka) and updates a denormalised read table optimised for queries.

@Component public class OrderSummaryProjection { private final OrderSummaryRepository repo; @EventListener @Transactional public void on(OrderPlacedEvent event) { repo.save(new OrderSummary( event.aggregateId(), event.customerId(), event.totalAmount(), "PENDING", event.occurredAt())); } @EventListener @Transactional public void on(OrderConfirmedEvent event) { repo.findById(event.aggregateId()).ifPresent(summary -> { summary.setStatus("CONFIRMED"); summary.setUpdatedAt(event.occurredAt()); repo.save(summary); }); } @EventListener @Transactional public void on(OrderCancelledEvent event) { repo.findById(event.aggregateId()).ifPresent(summary -> { summary.setStatus("CANCELLED"); summary.setCancellationReason(event.reason()); repo.save(summary); }); } }

The class below shows the implementation. Key points are highlighted in the inline comments.

@RestController @RequestMapping("/api/orders") public class OrderQueryController { private final OrderSummaryRepository repo; // Reads from the read model — fast, denormalised, no event replay needed @GetMapping("/{id}") public OrderSummary getOrder(@PathVariable String id) { return repo.findById(id).orElseThrow(() -> new OrderNotFoundException(id)); } @GetMapping public Page<OrderSummary> listOrders( @RequestParam(defaultValue = "PENDING") String status, Pageable pageable) { return repo.findByStatus(status, pageable); } } @RestController @RequestMapping("/api/orders") public class OrderCommandController { private final OrderCommandHandler handler; @PostMapping public ResponseEntity<Map<String, String>> placeOrder(@RequestBody PlaceOrderRequest req) { String orderId = handler.handle(new PlaceOrderCommand( UUID.randomUUID().toString(), req.customerId(), req.totalAmount())); return ResponseEntity.accepted().body(Map.of("orderId", orderId)); } }