Contents

Stateless operations such as filter() and mapValues() process each record in isolation — the output depends only on the single record being processed. Stateful operations, on the other hand, accumulate information across many records and can answer questions like "how many page views has user X had in the last five minutes?" or "what is the running total revenue per product category?".

State Stores and RocksDB

Every stateful operation in Kafka Streams creates a state store — a local, embedded key-value store maintained on the same JVM as the stream task. By default Kafka Streams uses RocksDB as the storage engine, which keeps the active working set in memory and spills larger data to disk. The state directory defaults to /tmp/kafka-streams and should be changed to a stable, fast disk in production.

Changelog Topics for Fault Tolerance

Every persistent state store is backed by a changelog topic — a compacted Kafka topic that records every mutation to the store. If a task crashes or is migrated to another instance, Kafka Streams restores the state store by replaying the changelog before resuming processing. This gives stateful applications the same durability guarantees as Kafka itself.

The changelog topic name follows the pattern <application.id>-<store-name>-changelog. Do not delete these topics manually — they are the source of truth for state recovery.
CharacteristicStatelessStateful
Memory / disk usageNone (beyond processing buffer)State store on local disk (RocksDB)
Fault toleranceReplay input topic on restartRestore changelog topic into state store
ScalabilityAny partition can process any recordA given key must always route to the same partition/task
Typical outputKStreamKTable (or windowed KTable)

Every stateful aggregation starts with a grouping step that tells Kafka Streams which key to aggregate on. Two methods exist, and choosing incorrectly can silently introduce correctness issues or unnecessary performance overhead.

groupByKey()

groupByKey() groups records by their existing message key. Because the key does not change, Kafka Streams knows that the data is already correctly partitioned and does not trigger a repartition. This is the preferred method whenever the current key is already the grouping dimension.

KStream<String, String> pageViews = builder.stream( "page-views", Consumed.with(Serdes.String(), Serdes.String()) ); // Key is already userId — no repartition needed KGroupedStream<String, String> groupedByUser = pageViews.groupByKey();
groupBy()

groupBy() accepts a KeyValueMapper to derive a new key from the record. Because the key changes, Kafka Streams must repartition the stream to ensure all records with the same new key land in the same task. This repartition is implemented internally by writing to an intermediate Kafka topic (named <application.id>-<name>-repartition).

// Repartition by page URL extracted from the value JSON KGroupedStream<String, String> groupedByPage = pageViews .groupBy( (userId, eventJson) -> extractField(eventJson, "page"), Grouped.with(Serdes.String(), Serdes.String()) ); Every groupBy() call creates an additional internal Kafka topic and doubles the I/O for that portion of the topology. Prefer groupByKey() and use selectKey() upstream if you need to re-key before grouping.
Grouped and Serialized Configuration

Both grouping methods accept an optional Grouped parameter that configures the SerDes for the grouped stream and optionally assigns a name to the repartition topic. Naming the repartition topic makes topology debugging and monitoring much easier.

// Named repartition topic and explicit SerDes KGroupedStream<String, PageView> grouped = pageViews .mapValues(json -> parsePageView(json)) .groupBy( (key, pv) -> pv.getPage(), Grouped.<String, PageView>as("by-page-repartition") .withKeySerde(Serdes.String()) .withValueSerde(pageViewSerde) ); If you omit explicit SerDes in Grouped, Kafka Streams falls back to the application-level defaults configured via StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG and DEFAULT_VALUE_SERDE_CLASS_CONFIG.
count()

count() is the simplest stateful aggregation — it increments a Long counter for every record in the grouped stream. The result is a KTable<K, Long> where each entry holds the current running count for its key.

// Count page views per user (key = userId) KTable<String, Long> viewCountPerUser = pageViews .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .count(Materialized.as("page-view-count-store")); // Publish counts downstream viewCountPerUser .toStream() .to("page-view-counts", Produced.with(Serdes.String(), Serdes.Long()));

The Materialized.as("page-view-count-store") argument names the state store so it can be queried later via the Interactive Queries API. Omitting it causes Kafka Streams to auto-generate an internal name that changes with topology changes.

KTable Result Semantics

A KTable is a changelog stream — when you call toStream() on a KTable you get a stream of update records, not raw events. Each record emitted represents the new current value for a key after the latest aggregation update. Downstream consumers reading from a KTable-backed topic should treat records as upserts.

reduce()

reduce() is a special form of aggregation where the accumulator and the input share the same type. You supply a Reducer — a two-argument function that merges a new value into the running aggregate. A common pattern is keeping the latest value, or summing numeric values.

KStream<String, Long> sensorReadings = builder.stream( "sensor-readings", Consumed.with(Serdes.String(), Serdes.Long()) ); // Running maximum reading per sensor KTable<String, Long> maxReading = sensorReadings .groupByKey() .reduce( (currentMax, newValue) -> Math.max(currentMax, newValue), Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("max-sensor-store") .withValueSerde(Serdes.Long()) ); // Running sum of revenues per merchant KStream<String, Double> sales = builder.stream( "sales-events", Consumed.with(Serdes.String(), Serdes.Double()) ); KTable<String, Double> runningRevenue = sales .groupByKey() .reduce( Double::sum, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>> as("revenue-store") .withValueSerde(Serdes.Double()) ); reduce() cannot handle tombstone (null-value) records for subtraction. If you need to handle deletions or a different accumulator type, use aggregate() instead.

aggregate() is the most flexible stateful operation. Unlike reduce(), the accumulator can be a completely different type from the input value — which makes it ideal for building summaries, histograms, custom POJOs, and any aggregation that requires intermediate state richer than a single number.

Method Signature
// On a KGroupedStream<K, V>: KTable<K, VR> aggregate( Initializer<VR> initializer, // () -> new VR instance Aggregator<K, V, VR> aggregator, // (key, value, aggregate) -> updated VR Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized )
Running Sum with aggregate()
KStream<String, String> orderEvents = builder.stream( "orders", Consumed.with(Serdes.String(), Serdes.String()) ); // Accumulate total order amount per customerId KTable<String, Double> totalPerCustomer = orderEvents .mapValues(json -> parseOrderAmount(json)) // KStream<String, Double> .groupByKey() .aggregate( () -> 0.0, // initializer (customerId, amount, runningTotal) -> // aggregator runningTotal + amount, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>> as("customer-total-store") .withValueSerde(Serdes.Double()) );
Custom POJO Aggregator

When you need to track multiple statistics in a single pass, model the accumulator as a POJO and register a custom Serde for it. The example below tracks both event count and total amount per customer in a single state store entry.

// --- Accumulator POJO --- public class OrderStats { public long count; public double totalAmount; public double maxAmount; public OrderStats() {} // required for deserialization public OrderStats update(double amount) { this.count++; this.totalAmount += amount; this.maxAmount = Math.max(this.maxAmount, amount); return this; } } // --- Topology --- Serde<OrderStats> orderStatsSerde = /* JSON or Avro serde */; KTable<String, OrderStats> statsPerCustomer = orderEvents .mapValues(json -> parseOrderAmount(json)) .groupByKey() .aggregate( OrderStats::new, // initializer (customerId, amount, stats) -> stats.update(amount), // aggregator Materialized.<String, OrderStats, KeyValueStore<Bytes, byte[]>> as("order-stats-store") .withKeySerde(Serdes.String()) .withValueSerde(orderStatsSerde) );
Named State Stores

Giving state stores explicit names via Materialized.as("my-store-name") has several important benefits:

Store names must be unique within an application. If two operations share the same name, the topology build step throws an TopologyException at startup.

A KTable is more than just an aggregation output — it supports its own set of transformation and join operations. KTable operations propagate changes downstream: when a key's value changes, a new update record is emitted to downstream operators.

filter() and mapValues()
KTable<String, Long> allCounts = pageViews .groupByKey() .count(Materialized.as("all-counts")); // filter — emit update records only when predicate is true // Records that no longer match emit a tombstone (null value) KTable<String, Long> activeCounts = allCounts .filter((userId, count) -> count > 0); // mapValues — transform the value type; key is unchanged KTable<String, String> countLabel = allCounts .mapValues(count -> "views=" + count);
toStream()

toStream() converts a KTable back into a KStream so that individual update records can be routed to Kafka output topics, joined with other streams, or passed to stateless operations. Each record in the resulting KStream represents one state-change event, not the full current table.

allCounts .toStream() // KStream<String, Long> .peek((k, v) -> log.info("Updated count for {}: {}", k, v)) .to("user-view-counts-topic", Produced.with(Serdes.String(), Serdes.Long()));
Joining Two KTables

KTable-KTable joins are equi-joins on the key. Kafka Streams evaluates the join lazily — whenever either side emits an update, a new joined record is produced using the current value from the other side's state store.

// KTable of user profiles (key = userId) KTable<String, String> userProfiles = builder.table( "user-profiles", Consumed.with(Serdes.String(), Serdes.String()) ); // KTable of page-view counts (key = userId) KTable<String, Long> viewCounts = pageViews .groupByKey() .count(Materialized.as("view-count-join-store")); // Inner join: both keys must exist KTable<String, String> enrichedCounts = viewCounts.join( userProfiles, (count, profile) -> "user=" + extractName(profile) + " views=" + count ); // Left join: emit result even when profile is absent (null right side) KTable<String, String> leftJoined = viewCounts.leftJoin( userProfiles, (count, profile) -> profile != null ? extractName(profile) + ":" + count : "unknown:" + count ); For a KTable-KTable join to work correctly, both tables must be co-partitioned — same number of partitions and the same partitioning strategy. Kafka Streams validates this at topology build time.
Suppression with Suppressed.untilWindowCloses()

By default, a windowed KTable emits an update record every time any record falls into a window — potentially generating many intermediate results per window. suppress() with Suppressed.untilWindowCloses() holds back all intermediate updates for a window and emits only the final result after the window closes (when the grace period has elapsed). This is essential for downstream consumers that need exactly one result per window.

import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import java.time.Duration; KTable<Windowed<String>, Long> windowedCounts = pageViews .groupByKey() .windowedBy(TimeWindows.ofSizeAndGrace( Duration.ofMinutes(5), Duration.ofSeconds(30) )) .count(Materialized.as("suppressed-window-store")); // Emit exactly one record per window per key, after the window closes KTable<Windowed<String>, Long> finalCounts = windowedCounts .suppress( Suppressed.untilWindowCloses( BufferConfig.unbounded() // buffer all pending updates in memory ) ); finalCounts .toStream() .map((wk, count) -> KeyValue.pair(wk.key(), count)) .to("final-window-counts", Produced.with(Serdes.String(), Serdes.Long())); BufferConfig.unbounded() can consume significant heap memory if windows are large or record volume is high. Use BufferConfig.maxBytes() or BufferConfig.maxRecords() to set a bound; when the buffer is full, Kafka Streams will either throw an exception or spill to disk depending on configuration.

Windowing divides an infinite stream into finite, time-bounded buckets so that stateful aggregations can be scoped to a particular time period. Kafka Streams supports three window types, each with distinct semantics.

Window TypeSizeOverlapBoundary
Tumbling Fixed None — each record belongs to exactly one window Wall-clock aligned (e.g. 00:00–00:05, 00:05–00:10)
Hopping Fixed Yes — a record can belong to multiple windows Determined by window size and advance interval
Session Variable None — determined by inactivity gap Starts on first event; closes after inactivity gap expires
Tumbling Windows

A tumbling window has a fixed size and no overlap. Records are assigned to the window whose [start, start + size) interval contains the record's timestamp. Use TimeWindows.ofSizeWithNoGrace() when late arrivals are not a concern, or TimeWindows.ofSizeAndGrace() to allow a grace period for out-of-order events.

import java.time.Duration; import org.apache.kafka.streams.kstream.TimeWindows; // Tumbling window of 5 minutes — no grace period (drop late arrivals) KTable<Windowed<String>, Long> tumblingCount = pageViews .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))) .count(Materialized.as("tumbling-view-counts")); // Tumbling window of 5 minutes — 30-second grace period KTable<Windowed<String>, Long> tumblingWithGrace = pageViews .groupByKey() .windowedBy(TimeWindows.ofSizeAndGrace( Duration.ofMinutes(5), Duration.ofSeconds(30) )) .count(Materialized.as("tumbling-grace-counts"));
Hopping Windows

A hopping window has a fixed size but advances in steps smaller than the window size, so windows overlap. A single record is included in ceil(size / advanceBy) windows simultaneously, which means the input data is "amplified" in downstream output. Use hopping windows for moving averages or rolling statistics.

import org.apache.kafka.streams.kstream.TimeWindows; // 10-minute window advancing every 2 minutes → each record appears in 5 windows KTable<Windowed<String>, Long> hoppingCount = pageViews .groupByKey() .windowedBy( TimeWindows.ofSizeAndGrace( Duration.ofMinutes(10), Duration.ofSeconds(30) ).advanceBy(Duration.ofMinutes(2)) ) .count(Materialized.as("hopping-view-counts"));
Session Windows

Session windows group records by activity gaps rather than wall-clock intervals. A session stays open as long as new events arrive within the inactivity gap. When two sessions are close enough that a new event bridges them, Kafka Streams merges the sessions (and their accumulated state). Session windows are ideal for modelling user browsing sessions, API call sequences, or any activity where natural pauses define boundaries.

import org.apache.kafka.streams.kstream.SessionWindows; // Session window with 5-minute inactivity gap, no grace period KTable<Windowed<String>, Long> sessionCount = pageViews .groupByKey() .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))) .count(Materialized.as("session-view-counts")); // Session window with inactivity gap AND grace period KTable<Windowed<String>, Long> sessionWithGrace = pageViews .groupByKey() .windowedBy(SessionWindows.ofInactivityGapAndGrace( Duration.ofMinutes(5), Duration.ofSeconds(60) )) .count(Materialized.as("session-grace-counts"));
Reading Windowed Results — Windowed<K>

The key of a windowed KTable is Windowed<K>, which wraps the original key and the window's time boundaries. When routing results to a Kafka topic you typically extract the inner key and/or encode the window boundaries into the value.

tumblingCount .toStream() .map((windowedKey, count) -> { String userId = windowedKey.key(); long windowStart = windowedKey.window().start(); // epoch ms long windowEnd = windowedKey.window().end(); String outKey = userId + "@" + windowStart + "-" + windowEnd; return KeyValue.pair(outKey, count); }) .to("windowed-counts-output", Produced.with(Serdes.String(), Serdes.Long())); Window boundaries use event time, not wall-clock time. The timestamp of a record is determined by the TimestampExtractor configured on the stream (default: the Kafka record timestamp). Incorrect timestamps cause records to be placed in wrong windows or dropped if they fall outside the retention period.

Interactive Queries allow you to query state stores from outside the topology — for example, from a REST endpoint — without reading back from a Kafka topic. This turns a Kafka Streams application into a queryable, distributed key-value service over its own aggregated state.

Querying a Key-Value Store
// 1. Get a handle to the store (call this after streams.start()) ReadOnlyKeyValueStore<String, Long> kvStore = kafkaStreams.store( StoreQueryParameters.fromNameAndType( "page-view-count-store", QueryableStoreTypes.keyValueStore() ) ); // 2. Point lookup — O(1) Long countForUser = kvStore.get("user-123"); // null if key not found // 3. Range scan — returns all keys in [from, to] KeyValueIterator<String, Long> range = kvStore.range("user-100", "user-200"); while (range.hasNext()) { KeyValue<String, Long> entry = range.next(); System.out.println(entry.key + " -> " + entry.value); } range.close(); // always close iterators to release resources // 4. Full scan KeyValueIterator<String, Long> all = kvStore.all(); while (all.hasNext()) { /* process */ } all.close();
Querying a Windowed Store
ReadOnlyWindowStore<String, Long> windowStore = kafkaStreams.store( StoreQueryParameters.fromNameAndType( "tumbling-view-counts", QueryableStoreTypes.windowStore() ) ); // Fetch all windows for a specific key across a time range Instant from = Instant.now().minus(Duration.ofHours(1)); Instant to = Instant.now(); WindowStoreIterator<Long> windowIter = windowStore.fetch("user-123", from, to); while (windowIter.hasNext()) { KeyValue<Long, Long> windowEntry = windowIter.next(); long windowStartMs = windowEntry.key; long count = windowEntry.value; System.out.printf("Window starting at %d: %d views%n", windowStartMs, count); } windowIter.close(); // Fetch all keys within a time range (range query across keys) KeyValueIterator<Windowed<String>, Long> rangeIter = windowStore.fetchAll(from, to); while (rangeIter.hasNext()) { KeyValue<Windowed<String>, Long> entry = rangeIter.next(); System.out.println(entry.key.key() + " [" + entry.key.window().start() + "] = " + entry.value); } rangeIter.close();
Querying Across Multiple Instances (RPC)

State is partition-local. If your application runs as multiple instances (e.g., in a Kubernetes Deployment), a given key's state lives on only one instance. The Interactive Queries API provides metadata to locate which instance owns a given key.

// Advertise the instance's host:port so other instances can reach it props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "this-host:8080"); // --- // Find which instance owns "user-123" in the given store KeyQueryMetadata metadata = kafkaStreams.queryMetadataForKey( "page-view-count-store", "user-123", new StringSerializer() ); HostInfo activeHost = metadata.activeHost(); if (activeHost.equals(thisInstanceHostInfo)) { // local — query directly Long count = kvStore.get("user-123"); } else { // remote — forward the HTTP request to the owning instance String url = "http://" + activeHost.host() + ":" + activeHost.port() + "/counts/user-123"; Long count = httpClient.get(url, Long.class); } Set application.server to the externally reachable host and port of each Kafka Streams instance. Without this config, queryMetadataForKey() cannot route requests across instances and returns HostInfo.unavailable().
Exposing State via a REST Endpoint
// Minimal example using a simple HTTP server (e.g., Javalin / Micronaut) get("/counts/{userId}", ctx -> { String userId = ctx.pathParam("userId"); KeyQueryMetadata meta = kafkaStreams.queryMetadataForKey( "page-view-count-store", userId, new StringSerializer()); if (meta.activeHost().equals(LOCAL_HOST_INFO)) { ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store( StoreQueryParameters.fromNameAndType( "page-view-count-store", QueryableStoreTypes.keyValueStore() ) ); ctx.json(Map.of("userId", userId, "count", store.get(userId))); } else { // Proxy to the owning instance ctx.redirect("http://" + meta.activeHost().host() + ":" + meta.activeHost().port() + "/counts/" + userId); } });
State Directory Configuration

The default state directory /tmp/kafka-streams is volatile and may be cleared on OS restarts, forcing full state restoration from changelog topics on every redeploy. Always configure a stable path on a persistent, fast disk.

props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams/my-app");

In containerised environments (Docker / Kubernetes), mount a PersistentVolumeClaim at the state directory path so that state survives pod restarts. Without a persistent volume, each restart triggers a full state restore from Kafka, which can take several minutes for large stores and causes processing lag during the restore window.

Standby Replicas

When a Kafka Streams instance fails, Kafka re-assigns its tasks to remaining instances. Those instances must first restore the state from the changelog topic — a process that can be slow for large stores. Standby replicas solve this by maintaining warm, partially-restored copies of state on other instances so that failover is nearly instantaneous.

// Keep 1 warm standby copy of every state store partition props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
State Restoration on Restart

When a Kafka Streams application starts and detects a local state store, it compares the local store's offset with the latest changelog topic offset. If there is a gap, it replays changelog records to bring the store up to date before the task resumes processing new input records. During restoration, interactive queries are served from the store in its partially-restored state (or blocked, depending on configuration).

kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() { @Override public void onRestoreStart(TopicPartition tp, String storeName, long startOffset, long endOffset) { log.info("Restoring store {} partition {} from offset {} to {}", storeName, tp.partition(), startOffset, endOffset); } @Override public void onBatchRestored(TopicPartition tp, String storeName, long batchEndOffset, long numRestored) { log.debug("Restored {} records for store {}", numRestored, storeName); } @Override public void onRestoreEnd(TopicPartition tp, String storeName, long totalRestored) { log.info("Restoration complete for store {}. Total records: {}", storeName, totalRestored); } }); kafkaStreams.start();
RocksDB Tuning

For large state stores, RocksDB's default configuration may not be optimal. Kafka Streams allows you to inject a custom RocksDBConfigSetter to tune block cache size, write buffer count, compression, and more.

public class CustomRocksDBConfig implements RocksDBConfigSetter { private static final org.rocksdb.Cache blockCache = new org.rocksdb.LRUCache(256 * 1024 * 1024L); // 256 MB shared block cache @Override public void setConfig(String storeName, Options options, Map<String, Object> configs) { BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); tableConfig.setBlockCache(blockCache); tableConfig.setBlockSize(16 * 1024); // 16 KB block size tableConfig.setCacheIndexAndFilterBlocks(true); options.setTableFormatConfig(tableConfig); options.setMaxWriteBufferNumber(3); options.setWriteBufferSize(64 * 1024 * 1024L); // 64 MB write buffer options.setCompressionType(CompressionType.LZ4_COMPRESSION); } @Override public void close(String storeName, Options options) { // release native resources if any were allocated per store } } // Register the custom config setter props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
ConfigDefaultRecommendation
state.dir /tmp/kafka-streams Stable, persistent, fast SSD path
num.standby.replicas 0 1 for production; 2 for critical paths
rocksdb.config.setter None Custom setter when state store exceeds a few GB
commit.interval.ms 30000 Lower (e.g., 5000) to reduce data loss on crash at cost of I/O
restore.consumer.max.poll.records 1000 Increase to 10000+ to speed up state restoration
application.server Empty Set to host:port to enable cross-instance interactive queries

This complete, runnable example ties together all the concepts covered in this article. It consumes page-view events from a Kafka topic, groups them by user ID, counts views in 5-minute tumbling windows with a 30-second grace period, suppresses intermediate results until each window closes, and exposes the final counts via a simple HTTP endpoint backed by interactive queries.

Event Model
// PageViewEvent.java public class PageViewEvent { public String userId; public String page; public long timestampMs; public PageViewEvent() {} public PageViewEvent(String userId, String page, long timestampMs) { this.userId = userId; this.page = page; this.timestampMs = timestampMs; } }
Topology Definition
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.state.*; import java.time.Duration; import java.util.Properties; public class PageViewCountApp { static final String INPUT_TOPIC = "page-views"; static final String OUTPUT_TOPIC = "page-view-window-counts"; static final String STORE_NAME = "page-view-window-store"; public static Topology buildTopology(Serde<PageViewEvent> eventSerde) { StreamsBuilder builder = new StreamsBuilder(); KStream<String, PageViewEvent> rawViews = builder.stream( INPUT_TOPIC, Consumed.with(Serdes.String(), eventSerde) .withTimestampExtractor( (record, previousTimestamp) -> ((PageViewEvent) record.value()).timestampMs ) ); KTable<Windowed<String>, Long> windowedCounts = rawViews // key is already userId — no repartition .groupByKey(Grouped.with(Serdes.String(), eventSerde)) // 5-minute tumbling window, 30-second grace for late arrivals .windowedBy(TimeWindows.ofSizeAndGrace( Duration.ofMinutes(5), Duration.ofSeconds(30) )) // count views per user per window, backed by a named state store .count(Materialized.as(STORE_NAME)); // Suppress intermediate updates — emit only the final count per window windowedCounts .suppress(Suppressed.untilWindowCloses( Suppressed.BufferConfig.unbounded() )) .toStream() .map((windowedKey, count) -> { String userId = windowedKey.key(); long windowStart = windowedKey.window().start(); long windowEnd = windowedKey.window().end(); // Encode window boundaries into the key for easy parsing downstream String outKey = userId + "|" + windowStart + "|" + windowEnd; return KeyValue.pair(outKey, count); }) .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); return builder.build(); } }
Application Bootstrap
public class PageViewCountMain { public static void main(String[] args) { Serde<PageViewEvent> eventSerde = buildJsonSerde(PageViewEvent.class); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "page-view-count-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // Stable state directory (mount a PVC in Kubernetes) props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams/page-view-app"); // One standby replica for fast failover props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); // Advertise this instance for cross-instance interactive queries props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, System.getenv("POD_IP") + ":8080"); // Reduce commit interval for lower data-loss window props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5_000); Topology topology = PageViewCountApp.buildTopology(eventSerde); KafkaStreams streams = new KafkaStreams(topology, props); // Log state restoration progress streams.setGlobalStateRestoreListener(new LoggingStateRestoreListener()); // Graceful shutdown on SIGTERM Runtime.getRuntime().addShutdownHook( new Thread(() -> streams.close(Duration.ofSeconds(20))) ); streams.start(); // Block until RUNNING before serving HTTP queries waitForState(streams, KafkaStreams.State.RUNNING); // Start the HTTP server for interactive queries (port 8080) startHttpServer(streams); } private static void waitForState(KafkaStreams streams, KafkaStreams.State target) { while (streams.state() != target) { try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
Interactive Query REST Handler
// GET /counts/{userId}?from=<epochMs>&to=<epochMs> get("/counts/:userId", (req, res) -> { String userId = req.params("userId"); Instant from = Instant.ofEpochMilli(Long.parseLong( req.queryParamOrDefault("from", String.valueOf(System.currentTimeMillis() - 3_600_000L)))); Instant to = Instant.ofEpochMilli(Long.parseLong( req.queryParamOrDefault("to", String.valueOf(System.currentTimeMillis())))); // Locate the instance that owns this key KeyQueryMetadata meta = streams.queryMetadataForKey( PageViewCountApp.STORE_NAME, userId, new StringSerializer()); if (!meta.activeHost().equals(LOCAL_HOST_INFO)) { res.redirect("http://" + meta.activeHost().host() + ":" + meta.activeHost().port() + req.uri() + "?" + req.queryString()); return null; } ReadOnlyWindowStore<String, Long> store = streams.store( StoreQueryParameters.fromNameAndType( PageViewCountApp.STORE_NAME, QueryableStoreTypes.windowStore() ) ); List<Map<String, Object>> results = new ArrayList<>(); try (WindowStoreIterator<Long> it = store.fetch(userId, from, to)) { while (it.hasNext()) { KeyValue<Long, Long> entry = it.next(); results.add(Map.of( "windowStartMs", entry.key, "windowEndMs", entry.key + Duration.ofMinutes(5).toMillis(), "userId", userId, "views", entry.value )); } } res.type("application/json"); return objectMapper.writeValueAsString(results); });
Sample Input and Expected Output

Given these events on the page-views topic (key = userId, all within the 00:00–00:05 window):

// Produced to "page-views" topic // Key: "alice" Value: PageViewEvent("alice", "/home", timestampMs=1_000_000) // Key: "alice" Value: PageViewEvent("alice", "/docs", timestampMs=1_060_000) // Key: "alice" Value: PageViewEvent("alice", "/pricing", timestampMs=1_180_000) // Key: "bob" Value: PageViewEvent("bob", "/home", timestampMs=1_020_000) // After the window closes (grace period elapsed), the output topic receives: // Key: "alice|0|300000" Value: 3 // Key: "bob|0|300000" Value: 1 // Interactive query — HTTP GET /counts/alice?from=0&to=300000 // Response: // [{"windowStartMs":0,"windowEndMs":300000,"userId":"alice","views":3}] In this example, timestamps are in milliseconds from epoch. The window [0, 300000) corresponds to the first 5 minutes after epoch. In real deployments, use actual Unix epoch milliseconds from the event payload or the Kafka record timestamp.