Contents

Name your state store explicitly so it can be looked up later via KafkaStreams.store().

@Configuration @EnableKafkaStreams public class OrderCountTopology { public static final String ORDER_COUNT_STORE = "order-count-store"; @Bean public KStream<String, OrderEvent> orderCountStream(StreamsBuilder builder) { // Aggregate order counts by customerId into a named state store KTable<String, Long> orderCounts = builder .stream("order.created", Consumed.with(Serdes.String(), orderEventSerde())) .groupBy((key, event) -> event.customerId(), Grouped.with(Serdes.String(), orderEventSerde())) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(ORDER_COUNT_STORE) .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())); // Also stream the results to an output topic orderCounts.toStream().to("order.counts", Produced.with(Serdes.String(), Serdes.Long())); return builder.stream("order.created"); // returned for bean wiring } }

Inject KafkaStreams and call store() with the store name and type. The store is only available after the application has reached RUNNING state.

import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @Service public class OrderCountQueryService { private final KafkaStreams kafkaStreams; // Query count for a single customer — local only public Long getOrderCount(String customerId) { ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store( StoreQueryParameters.fromNameAndType( ORDER_COUNT_STORE, QueryableStoreTypes.keyValueStore() ) ); Long count = store.get(customerId); return count != null ? count : 0L; } // Iterate all entries in the local store shard public Map<String, Long> getAllLocalCounts() { ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store( StoreQueryParameters.fromNameAndType(ORDER_COUNT_STORE, QueryableStoreTypes.keyValueStore()) ); Map<String, Long> result = new LinkedHashMap<>(); try (var iter = store.all()) { iter.forEachRemaining(kv -> result.put(kv.key, kv.value)); } return result; } }

The class below shows the implementation. Key points are highlighted in the inline comments.

@RestController @RequestMapping("/api/orders/stats") public class OrderStatsController { private final OrderCountQueryService queryService; private final StoreMetadataService metadataService; private final WebClient webClient; // Query for a specific customer — may proxy to another node @GetMapping("/customer/{customerId}/count") public ResponseEntity<Long> getCustomerOrderCount(@PathVariable String customerId) { // Find which node owns this key HostInfo owner = metadataService.getOwnerForKey(ORDER_COUNT_STORE, customerId); String localHost = metadataService.getLocalHost(); if (owner.host().equals(localHost) && owner.port() == metadataService.getLocalPort()) { // Key is local — query directly return ResponseEntity.ok(queryService.getOrderCount(customerId)); } else { // Key is on another node — proxy the request Long count = webClient.get() .uri("http://{}:{}/api/orders/stats/customer/{}/count/local", owner.host(), owner.port(), customerId) .retrieve() .bodyToMono(Long.class) .block(); return ResponseEntity.ok(count); } } // Local-only endpoint called by other nodes when proxying @GetMapping("/customer/{customerId}/count/local") public Long getLocalCount(@PathVariable String customerId) { return queryService.getOrderCount(customerId); } // Aggregate across all nodes — scatter/gather @GetMapping("/all") public Map<String, Long> getAllCounts() { return metadataService.getAllHosts(ORDER_COUNT_STORE).stream() .flatMap(host -> fetchFromHost(host).entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } }

Kafka Streams tracks which partitions (and thus which key ranges) each node owns. Use KafkaStreams.queryMetadataForKey() and streamsMetadataForStore() to discover node locations.

@Service public class StoreMetadataService { private final KafkaStreams kafkaStreams; @Value("${server.host:localhost}") private String localHost; @Value("${server.port:8080}") private int localPort; // Find which node (host:port) owns a specific key public HostInfo getOwnerForKey(String storeName, String key) { KeyQueryMetadata metadata = kafkaStreams.queryMetadataForKey( storeName, key, Serdes.String().serializer()); if (metadata == null || metadata.equals(KeyQueryMetadata.NOT_AVAILABLE)) { throw new StoreNotAvailableException("Store " + storeName + " not available"); } return metadata.activeHost(); } // Get all nodes hosting a given store (for scatter/gather queries) public List<HostInfo> getAllHosts(String storeName) { return kafkaStreams.streamsMetadataForStore(storeName).stream() .map(StreamsMetadata::hostInfo) .distinct() .collect(Collectors.toList()); } public String getLocalHost() { return localHost; } public int getLocalPort() { return localPort; } }

The class below shows the implementation. Key points are highlighted in the inline comments.

// Scatter/gather — fetch local shard from each node and merge results private Map<String, Long> fetchFromHost(HostInfo host) { if (host.host().equals(localHost) && host.port() == localPort) { return queryService.getAllLocalCounts(); // local — no HTTP } // Remote — proxy via WebClient return webClient.get() .uri("http://{}:{}/api/orders/stats/all/local", host.host(), host.port()) .retrieve() .bodyToMono(new ParameterizedTypeReference<Map<String, Long>>() {}) .block(Duration.ofSeconds(5)); } Always expose a separate /local endpoint that reads only from the local store shard. The public endpoint does the scatter/gather orchestration. This avoids circular proxy loops.

Window stores aggregate over time windows. Query with a time range to retrieve windowed results for a key.

// Topology — count orders per customer per 1-hour tumbling window KTable<Windowed<String>, Long> windowedCounts = builder .stream("order.created", Consumed.with(Serdes.String(), orderEventSerde())) .groupBy((k, v) -> v.customerId(), Grouped.with(Serdes.String(), orderEventSerde())) .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1))) .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("order-window-store") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())); // Query — get order count for a customer in a specific 1-hour window public List<WindowedCount> getWindowedCounts(String customerId, Instant from, Instant to) { ReadOnlyWindowStore<String, Long> windowStore = kafkaStreams.store( StoreQueryParameters.fromNameAndType("order-window-store", QueryableStoreTypes.windowStore()) ); List<WindowedCount> results = new ArrayList<>(); try (var iter = windowStore.fetch(customerId, from, to)) { iter.forEachRemaining(kv -> results.add(new WindowedCount(kv.key.window().startTime(), kv.key.window().endTime(), kv.value))); } return results; } public record WindowedCount(Instant windowStart, Instant windowEnd, Long count) {}

Kafka Streams needs to know its own host:port so that other nodes can proxy to it. Set this in application.properties or via StreamsConfig.

# application.yml spring: kafka: streams: application-id: order-aggregation-service properties: # Required for interactive queries — tells other nodes where to reach this instance application.server: ${HOSTNAME:localhost}:${server.port:8080} # RocksDB state stores are persisted here state.dir: /tmp/kafka-streams // Alternatively configure programmatically @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration streamsConfig() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-aggregation-service"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, System.getenv().getOrDefault("HOSTNAME", "localhost") + ":8080"); props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); return new KafkaStreamsConfiguration(props); }