Contents
- Topology with a Named State Store
- Querying a Local KeyValueStore
- Exposing Store Data via REST
- Store Metadata & Multi-Node Discovery
- Proxying to Remote Nodes
- Querying a WindowStore
- application.server Configuration
Name your state store explicitly so it can be looked up later via KafkaStreams.store().
@Configuration
@EnableKafkaStreams
public class OrderCountTopology {
public static final String ORDER_COUNT_STORE = "order-count-store";
@Bean
public KStream<String, OrderEvent> orderCountStream(StreamsBuilder builder) {
// Aggregate order counts by customerId into a named state store
KTable<String, Long> orderCounts = builder
.stream("order.created", Consumed.with(Serdes.String(), orderEventSerde()))
.groupBy((key, event) -> event.customerId(),
Grouped.with(Serdes.String(), orderEventSerde()))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(ORDER_COUNT_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
// Also stream the results to an output topic
orderCounts.toStream().to("order.counts", Produced.with(Serdes.String(), Serdes.Long()));
return builder.stream("order.created"); // returned for bean wiring
}
}
Inject KafkaStreams and call store() with the store name and type. The store is only available after the application has reached RUNNING state.
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@Service
public class OrderCountQueryService {
private final KafkaStreams kafkaStreams;
// Query count for a single customer — local only
public Long getOrderCount(String customerId) {
ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
ORDER_COUNT_STORE,
QueryableStoreTypes.keyValueStore()
)
);
Long count = store.get(customerId);
return count != null ? count : 0L;
}
// Iterate all entries in the local store shard
public Map<String, Long> getAllLocalCounts() {
ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(
StoreQueryParameters.fromNameAndType(ORDER_COUNT_STORE, QueryableStoreTypes.keyValueStore())
);
Map<String, Long> result = new LinkedHashMap<>();
try (var iter = store.all()) {
iter.forEachRemaining(kv -> result.put(kv.key, kv.value));
}
return result;
}
}
The class below shows the implementation. Key points are highlighted in the inline comments.
@RestController
@RequestMapping("/api/orders/stats")
public class OrderStatsController {
private final OrderCountQueryService queryService;
private final StoreMetadataService metadataService;
private final WebClient webClient;
// Query for a specific customer — may proxy to another node
@GetMapping("/customer/{customerId}/count")
public ResponseEntity<Long> getCustomerOrderCount(@PathVariable String customerId) {
// Find which node owns this key
HostInfo owner = metadataService.getOwnerForKey(ORDER_COUNT_STORE, customerId);
String localHost = metadataService.getLocalHost();
if (owner.host().equals(localHost) && owner.port() == metadataService.getLocalPort()) {
// Key is local — query directly
return ResponseEntity.ok(queryService.getOrderCount(customerId));
} else {
// Key is on another node — proxy the request
Long count = webClient.get()
.uri("http://{}:{}/api/orders/stats/customer/{}/count/local",
owner.host(), owner.port(), customerId)
.retrieve()
.bodyToMono(Long.class)
.block();
return ResponseEntity.ok(count);
}
}
// Local-only endpoint called by other nodes when proxying
@GetMapping("/customer/{customerId}/count/local")
public Long getLocalCount(@PathVariable String customerId) {
return queryService.getOrderCount(customerId);
}
// Aggregate across all nodes — scatter/gather
@GetMapping("/all")
public Map<String, Long> getAllCounts() {
return metadataService.getAllHosts(ORDER_COUNT_STORE).stream()
.flatMap(host -> fetchFromHost(host).entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Kafka Streams tracks which partitions (and thus which key ranges) each node owns. Use KafkaStreams.queryMetadataForKey() and streamsMetadataForStore() to discover node locations.
@Service
public class StoreMetadataService {
private final KafkaStreams kafkaStreams;
@Value("${server.host:localhost}")
private String localHost;
@Value("${server.port:8080}")
private int localPort;
// Find which node (host:port) owns a specific key
public HostInfo getOwnerForKey(String storeName, String key) {
KeyQueryMetadata metadata = kafkaStreams.queryMetadataForKey(
storeName, key, Serdes.String().serializer());
if (metadata == null || metadata.equals(KeyQueryMetadata.NOT_AVAILABLE)) {
throw new StoreNotAvailableException("Store " + storeName + " not available");
}
return metadata.activeHost();
}
// Get all nodes hosting a given store (for scatter/gather queries)
public List<HostInfo> getAllHosts(String storeName) {
return kafkaStreams.streamsMetadataForStore(storeName).stream()
.map(StreamsMetadata::hostInfo)
.distinct()
.collect(Collectors.toList());
}
public String getLocalHost() { return localHost; }
public int getLocalPort() { return localPort; }
}
The class below shows the implementation. Key points are highlighted in the inline comments.
// Scatter/gather — fetch local shard from each node and merge results
private Map<String, Long> fetchFromHost(HostInfo host) {
if (host.host().equals(localHost) && host.port() == localPort) {
return queryService.getAllLocalCounts(); // local — no HTTP
}
// Remote — proxy via WebClient
return webClient.get()
.uri("http://{}:{}/api/orders/stats/all/local", host.host(), host.port())
.retrieve()
.bodyToMono(new ParameterizedTypeReference<Map<String, Long>>() {})
.block(Duration.ofSeconds(5));
}
Always expose a separate /local endpoint that reads only from the local store shard. The public endpoint does the scatter/gather orchestration. This avoids circular proxy loops.
Window stores aggregate over time windows. Query with a time range to retrieve windowed results for a key.
// Topology — count orders per customer per 1-hour tumbling window
KTable<Windowed<String>, Long> windowedCounts = builder
.stream("order.created", Consumed.with(Serdes.String(), orderEventSerde()))
.groupBy((k, v) -> v.customerId(), Grouped.with(Serdes.String(), orderEventSerde()))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("order-window-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
// Query — get order count for a customer in a specific 1-hour window
public List<WindowedCount> getWindowedCounts(String customerId,
Instant from, Instant to) {
ReadOnlyWindowStore<String, Long> windowStore = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("order-window-store",
QueryableStoreTypes.windowStore())
);
List<WindowedCount> results = new ArrayList<>();
try (var iter = windowStore.fetch(customerId, from, to)) {
iter.forEachRemaining(kv ->
results.add(new WindowedCount(kv.key.window().startTime(),
kv.key.window().endTime(), kv.value)));
}
return results;
}
public record WindowedCount(Instant windowStart, Instant windowEnd, Long count) {}
Kafka Streams needs to know its own host:port so that other nodes can proxy to it. Set this in application.properties or via StreamsConfig.
# application.yml
spring:
kafka:
streams:
application-id: order-aggregation-service
properties:
# Required for interactive queries — tells other nodes where to reach this instance
application.server: ${HOSTNAME:localhost}:${server.port:8080}
# RocksDB state stores are persisted here
state.dir: /tmp/kafka-streams
// Alternatively configure programmatically
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration streamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-aggregation-service");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
System.getenv().getOrDefault("HOSTNAME", "localhost") + ":8080");
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
return new KafkaStreamsConfiguration(props);
}