Contents
- Why Stateful Operations
- groupByKey & groupBy
- count() & reduce()
- aggregate()
- KTable Operations
- Tumbling, Hopping & Session Windows
- State Store Access (Interactive Queries)
- Production Tips
- Full Example — Page-View Count per User per 5-Minute Window
Stateless operations such as filter() and mapValues() process each record in isolation — the output depends
only on the single record being processed. Stateful operations, on the other hand, accumulate information across many records
and can answer questions like "how many page views has user X had in the last five minutes?" or "what is the running total revenue
per product category?".
- Stateless operations — filter, map, flatMap, peek, branch. No memory of previous records. No state store created.
- Stateful operations — count, reduce, aggregate, joins, and windowed variants. Each requires a backing state store.
State Stores and RocksDB
Every stateful operation in Kafka Streams creates a state store — a local, embedded key-value store maintained
on the same JVM as the stream task. By default Kafka Streams uses RocksDB as the storage engine, which keeps
the active working set in memory and spills larger data to disk. The state directory defaults to /tmp/kafka-streams
and should be changed to a stable, fast disk in production.
- Each state store is partition-local: a task that owns partition 3 of the input topic owns the corresponding slice of the state store.
- Kafka Streams provides in-memory store variants via Materialized.as(...).withLoggingDisabled() when persistence is not needed.
- Named stores (e.g., Materialized.as("my-store")) can be queried at runtime via the Interactive Queries API.
Changelog Topics for Fault Tolerance
Every persistent state store is backed by a changelog topic — a compacted Kafka topic that records every mutation
to the store. If a task crashes or is migrated to another instance, Kafka Streams restores the state store by replaying the
changelog before resuming processing. This gives stateful applications the same durability guarantees as Kafka itself.
The changelog topic name follows the pattern <application.id>-<store-name>-changelog. Do not delete these topics manually — they are the source of truth for state recovery.
| Characteristic | Stateless | Stateful |
| Memory / disk usage | None (beyond processing buffer) | State store on local disk (RocksDB) |
| Fault tolerance | Replay input topic on restart | Restore changelog topic into state store |
| Scalability | Any partition can process any record | A given key must always route to the same partition/task |
| Typical output | KStream | KTable (or windowed KTable) |
Every stateful aggregation starts with a grouping step that tells Kafka Streams which key to aggregate on.
Two methods exist, and choosing incorrectly can silently introduce correctness issues or unnecessary performance overhead.
groupByKey()
groupByKey() groups records by their existing message key. Because the key does not change, Kafka Streams knows
that the data is already correctly partitioned and does not trigger a repartition. This is the preferred method
whenever the current key is already the grouping dimension.
KStream<String, String> pageViews = builder.stream(
"page-views",
Consumed.with(Serdes.String(), Serdes.String())
);
// Key is already userId — no repartition needed
KGroupedStream<String, String> groupedByUser = pageViews.groupByKey();
groupBy()
groupBy() accepts a KeyValueMapper to derive a new key from the record. Because the key changes, Kafka Streams
must repartition the stream to ensure all records with the same new key land in the same task. This repartition
is implemented internally by writing to an intermediate Kafka topic (named <application.id>-<name>-repartition).
// Repartition by page URL extracted from the value JSON
KGroupedStream<String, String> groupedByPage = pageViews
.groupBy(
(userId, eventJson) -> extractField(eventJson, "page"),
Grouped.with(Serdes.String(), Serdes.String())
);
Every groupBy() call creates an additional internal Kafka topic and doubles the I/O for that portion of the topology. Prefer groupByKey() and use selectKey() upstream if you need to re-key before grouping.
Grouped and Serialized Configuration
Both grouping methods accept an optional Grouped parameter that configures the SerDes for the grouped stream and
optionally assigns a name to the repartition topic. Naming the repartition topic makes topology debugging and monitoring much
easier.
// Named repartition topic and explicit SerDes
KGroupedStream<String, PageView> grouped = pageViews
.mapValues(json -> parsePageView(json))
.groupBy(
(key, pv) -> pv.getPage(),
Grouped.<String, PageView>as("by-page-repartition")
.withKeySerde(Serdes.String())
.withValueSerde(pageViewSerde)
);
If you omit explicit SerDes in Grouped, Kafka Streams falls back to the application-level defaults configured via StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG and DEFAULT_VALUE_SERDE_CLASS_CONFIG.
count()
count() is the simplest stateful aggregation — it increments a Long counter for every record in the grouped stream.
The result is a KTable<K, Long> where each entry holds the current running count for its key.
// Count page views per user (key = userId)
KTable<String, Long> viewCountPerUser = pageViews
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.count(Materialized.as("page-view-count-store"));
// Publish counts downstream
viewCountPerUser
.toStream()
.to("page-view-counts", Produced.with(Serdes.String(), Serdes.Long()));
The Materialized.as("page-view-count-store") argument names the state store so it can be queried later via the Interactive
Queries API. Omitting it causes Kafka Streams to auto-generate an internal name that changes with topology changes.
KTable Result Semantics
A KTable is a changelog stream — when you call toStream() on a KTable you get a stream of
update records, not raw events. Each record emitted represents the new current value for a key after the latest
aggregation update. Downstream consumers reading from a KTable-backed topic should treat records as upserts.
reduce()
reduce() is a special form of aggregation where the accumulator and the input share the same type. You supply a
Reducer — a two-argument function that merges a new value into the running aggregate. A common pattern is keeping
the latest value, or summing numeric values.
KStream<String, Long> sensorReadings = builder.stream(
"sensor-readings",
Consumed.with(Serdes.String(), Serdes.Long())
);
// Running maximum reading per sensor
KTable<String, Long> maxReading = sensorReadings
.groupByKey()
.reduce(
(currentMax, newValue) -> Math.max(currentMax, newValue),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>
as("max-sensor-store")
.withValueSerde(Serdes.Long())
);
// Running sum of revenues per merchant
KStream<String, Double> sales = builder.stream(
"sales-events",
Consumed.with(Serdes.String(), Serdes.Double())
);
KTable<String, Double> runningRevenue = sales
.groupByKey()
.reduce(
Double::sum,
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>
as("revenue-store")
.withValueSerde(Serdes.Double())
);
reduce() cannot handle tombstone (null-value) records for subtraction. If you need to handle deletions or a different accumulator type, use aggregate() instead.
aggregate() is the most flexible stateful operation. Unlike reduce(), the accumulator can be a completely
different type from the input value — which makes it ideal for building summaries, histograms, custom POJOs, and any aggregation
that requires intermediate state richer than a single number.
Method Signature
// On a KGroupedStream<K, V>:
KTable<K, VR> aggregate(
Initializer<VR> initializer, // () -> new VR instance
Aggregator<K, V, VR> aggregator, // (key, value, aggregate) -> updated VR
Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized
)
Running Sum with aggregate()
KStream<String, String> orderEvents = builder.stream(
"orders",
Consumed.with(Serdes.String(), Serdes.String())
);
// Accumulate total order amount per customerId
KTable<String, Double> totalPerCustomer = orderEvents
.mapValues(json -> parseOrderAmount(json)) // KStream<String, Double>
.groupByKey()
.aggregate(
() -> 0.0, // initializer
(customerId, amount, runningTotal) -> // aggregator
runningTotal + amount,
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>
as("customer-total-store")
.withValueSerde(Serdes.Double())
);
Custom POJO Aggregator
When you need to track multiple statistics in a single pass, model the accumulator as a POJO and register a custom Serde for it.
The example below tracks both event count and total amount per customer in a single state store entry.
// --- Accumulator POJO ---
public class OrderStats {
public long count;
public double totalAmount;
public double maxAmount;
public OrderStats() {} // required for deserialization
public OrderStats update(double amount) {
this.count++;
this.totalAmount += amount;
this.maxAmount = Math.max(this.maxAmount, amount);
return this;
}
}
// --- Topology ---
Serde<OrderStats> orderStatsSerde = /* JSON or Avro serde */;
KTable<String, OrderStats> statsPerCustomer = orderEvents
.mapValues(json -> parseOrderAmount(json))
.groupByKey()
.aggregate(
OrderStats::new, // initializer
(customerId, amount, stats) -> stats.update(amount), // aggregator
Materialized.<String, OrderStats, KeyValueStore<Bytes, byte[]>>
as("order-stats-store")
.withKeySerde(Serdes.String())
.withValueSerde(orderStatsSerde)
);
Named State Stores
Giving state stores explicit names via Materialized.as("my-store-name") has several important benefits:
- The store name becomes stable across topology restarts — the changelog topic keeps the same name, so state is not lost if you redeploy.
- Named stores can be looked up at runtime via KafkaStreams.store(StoreQueryParameters.fromNameAndType(...)) for interactive queries.
- Monitoring tools (JMX, Prometheus) expose per-store metrics using the store name.
- Unnamed stores get auto-generated names that change when you modify your topology, which can orphan the old changelog topic and force full state restoration.
Store names must be unique within an application. If two operations share the same name, the topology build step throws an TopologyException at startup.
A KTable is more than just an aggregation output — it supports its own set of transformation and join operations.
KTable operations propagate changes downstream: when a key's value changes, a new update record is emitted to downstream operators.
filter() and mapValues()
KTable<String, Long> allCounts = pageViews
.groupByKey()
.count(Materialized.as("all-counts"));
// filter — emit update records only when predicate is true
// Records that no longer match emit a tombstone (null value)
KTable<String, Long> activeCounts = allCounts
.filter((userId, count) -> count > 0);
// mapValues — transform the value type; key is unchanged
KTable<String, String> countLabel = allCounts
.mapValues(count -> "views=" + count);
toStream()
toStream() converts a KTable back into a KStream so that individual update records can be routed to Kafka output topics,
joined with other streams, or passed to stateless operations. Each record in the resulting KStream represents one state-change
event, not the full current table.
allCounts
.toStream() // KStream<String, Long>
.peek((k, v) -> log.info("Updated count for {}: {}", k, v))
.to("user-view-counts-topic", Produced.with(Serdes.String(), Serdes.Long()));
Joining Two KTables
KTable-KTable joins are equi-joins on the key. Kafka Streams evaluates the join lazily — whenever either side
emits an update, a new joined record is produced using the current value from the other side's state store.
// KTable of user profiles (key = userId)
KTable<String, String> userProfiles = builder.table(
"user-profiles",
Consumed.with(Serdes.String(), Serdes.String())
);
// KTable of page-view counts (key = userId)
KTable<String, Long> viewCounts = pageViews
.groupByKey()
.count(Materialized.as("view-count-join-store"));
// Inner join: both keys must exist
KTable<String, String> enrichedCounts = viewCounts.join(
userProfiles,
(count, profile) -> "user=" + extractName(profile) + " views=" + count
);
// Left join: emit result even when profile is absent (null right side)
KTable<String, String> leftJoined = viewCounts.leftJoin(
userProfiles,
(count, profile) -> profile != null
? extractName(profile) + ":" + count
: "unknown:" + count
);
For a KTable-KTable join to work correctly, both tables must be co-partitioned — same number of partitions and the same partitioning strategy. Kafka Streams validates this at topology build time.
Suppression with Suppressed.untilWindowCloses()
By default, a windowed KTable emits an update record every time any record falls into a window — potentially generating many
intermediate results per window. suppress() with Suppressed.untilWindowCloses() holds back all intermediate
updates for a window and emits only the final result after the window closes (when the grace period has elapsed).
This is essential for downstream consumers that need exactly one result per window.
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import java.time.Duration;
KTable<Windowed<String>, Long> windowedCounts = pageViews
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5),
Duration.ofSeconds(30)
))
.count(Materialized.as("suppressed-window-store"));
// Emit exactly one record per window per key, after the window closes
KTable<Windowed<String>, Long> finalCounts = windowedCounts
.suppress(
Suppressed.untilWindowCloses(
BufferConfig.unbounded() // buffer all pending updates in memory
)
);
finalCounts
.toStream()
.map((wk, count) -> KeyValue.pair(wk.key(), count))
.to("final-window-counts", Produced.with(Serdes.String(), Serdes.Long()));
BufferConfig.unbounded() can consume significant heap memory if windows are large or record volume is high. Use BufferConfig.maxBytes() or BufferConfig.maxRecords() to set a bound; when the buffer is full, Kafka Streams will either throw an exception or spill to disk depending on configuration.
Windowing divides an infinite stream into finite, time-bounded buckets so that stateful aggregations can be scoped to a
particular time period. Kafka Streams supports three window types, each with distinct semantics.
| Window Type | Size | Overlap | Boundary |
| Tumbling |
Fixed |
None — each record belongs to exactly one window |
Wall-clock aligned (e.g. 00:00–00:05, 00:05–00:10) |
| Hopping |
Fixed |
Yes — a record can belong to multiple windows |
Determined by window size and advance interval |
| Session |
Variable |
None — determined by inactivity gap |
Starts on first event; closes after inactivity gap expires |
Tumbling Windows
A tumbling window has a fixed size and no overlap. Records are assigned to the window whose [start, start + size)
interval contains the record's timestamp. Use TimeWindows.ofSizeWithNoGrace() when late arrivals are not a concern,
or TimeWindows.ofSizeAndGrace() to allow a grace period for out-of-order events.
import java.time.Duration;
import org.apache.kafka.streams.kstream.TimeWindows;
// Tumbling window of 5 minutes — no grace period (drop late arrivals)
KTable<Windowed<String>, Long> tumblingCount = pageViews
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("tumbling-view-counts"));
// Tumbling window of 5 minutes — 30-second grace period
KTable<Windowed<String>, Long> tumblingWithGrace = pageViews
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5),
Duration.ofSeconds(30)
))
.count(Materialized.as("tumbling-grace-counts"));
Hopping Windows
A hopping window has a fixed size but advances in steps smaller than the window size, so windows overlap.
A single record is included in ceil(size / advanceBy) windows simultaneously, which means the input data is
"amplified" in downstream output. Use hopping windows for moving averages or rolling statistics.
import org.apache.kafka.streams.kstream.TimeWindows;
// 10-minute window advancing every 2 minutes → each record appears in 5 windows
KTable<Windowed<String>, Long> hoppingCount = pageViews
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(10),
Duration.ofSeconds(30)
).advanceBy(Duration.ofMinutes(2))
)
.count(Materialized.as("hopping-view-counts"));
Session Windows
Session windows group records by activity gaps rather than wall-clock intervals. A session stays open as long as new events
arrive within the inactivity gap. When two sessions are close enough that a new event bridges them, Kafka Streams
merges the sessions (and their accumulated state). Session windows are ideal for modelling user browsing
sessions, API call sequences, or any activity where natural pauses define boundaries.
import org.apache.kafka.streams.kstream.SessionWindows;
// Session window with 5-minute inactivity gap, no grace period
KTable<Windowed<String>, Long> sessionCount = pageViews
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("session-view-counts"));
// Session window with inactivity gap AND grace period
KTable<Windowed<String>, Long> sessionWithGrace = pageViews
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofMinutes(5),
Duration.ofSeconds(60)
))
.count(Materialized.as("session-grace-counts"));
Reading Windowed Results — Windowed<K>
The key of a windowed KTable is Windowed<K>, which wraps the original key and the window's time boundaries.
When routing results to a Kafka topic you typically extract the inner key and/or encode the window boundaries into the value.
tumblingCount
.toStream()
.map((windowedKey, count) -> {
String userId = windowedKey.key();
long windowStart = windowedKey.window().start(); // epoch ms
long windowEnd = windowedKey.window().end();
String outKey = userId + "@" + windowStart + "-" + windowEnd;
return KeyValue.pair(outKey, count);
})
.to("windowed-counts-output", Produced.with(Serdes.String(), Serdes.Long()));
Window boundaries use event time, not wall-clock time. The timestamp of a record is determined by the TimestampExtractor configured on the stream (default: the Kafka record timestamp). Incorrect timestamps cause records to be placed in wrong windows or dropped if they fall outside the retention period.
Interactive Queries allow you to query state stores from outside the topology — for example, from a REST endpoint — without
reading back from a Kafka topic. This turns a Kafka Streams application into a queryable, distributed key-value service
over its own aggregated state.
Querying a Key-Value Store
// 1. Get a handle to the store (call this after streams.start())
ReadOnlyKeyValueStore<String, Long> kvStore = kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"page-view-count-store",
QueryableStoreTypes.keyValueStore()
)
);
// 2. Point lookup — O(1)
Long countForUser = kvStore.get("user-123"); // null if key not found
// 3. Range scan — returns all keys in [from, to]
KeyValueIterator<String, Long> range = kvStore.range("user-100", "user-200");
while (range.hasNext()) {
KeyValue<String, Long> entry = range.next();
System.out.println(entry.key + " -> " + entry.value);
}
range.close(); // always close iterators to release resources
// 4. Full scan
KeyValueIterator<String, Long> all = kvStore.all();
while (all.hasNext()) { /* process */ }
all.close();
Querying a Windowed Store
ReadOnlyWindowStore<String, Long> windowStore = kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"tumbling-view-counts",
QueryableStoreTypes.windowStore()
)
);
// Fetch all windows for a specific key across a time range
Instant from = Instant.now().minus(Duration.ofHours(1));
Instant to = Instant.now();
WindowStoreIterator<Long> windowIter = windowStore.fetch("user-123", from, to);
while (windowIter.hasNext()) {
KeyValue<Long, Long> windowEntry = windowIter.next();
long windowStartMs = windowEntry.key;
long count = windowEntry.value;
System.out.printf("Window starting at %d: %d views%n", windowStartMs, count);
}
windowIter.close();
// Fetch all keys within a time range (range query across keys)
KeyValueIterator<Windowed<String>, Long> rangeIter =
windowStore.fetchAll(from, to);
while (rangeIter.hasNext()) {
KeyValue<Windowed<String>, Long> entry = rangeIter.next();
System.out.println(entry.key.key() + " [" +
entry.key.window().start() + "] = " + entry.value);
}
rangeIter.close();
Querying Across Multiple Instances (RPC)
State is partition-local. If your application runs as multiple instances (e.g., in a Kubernetes Deployment), a given key's
state lives on only one instance. The Interactive Queries API provides metadata to locate which instance owns a given key.
// Advertise the instance's host:port so other instances can reach it
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "this-host:8080");
// ---
// Find which instance owns "user-123" in the given store
KeyQueryMetadata metadata = kafkaStreams.queryMetadataForKey(
"page-view-count-store",
"user-123",
new StringSerializer()
);
HostInfo activeHost = metadata.activeHost();
if (activeHost.equals(thisInstanceHostInfo)) {
// local — query directly
Long count = kvStore.get("user-123");
} else {
// remote — forward the HTTP request to the owning instance
String url = "http://" + activeHost.host() + ":" + activeHost.port()
+ "/counts/user-123";
Long count = httpClient.get(url, Long.class);
}
Set application.server to the externally reachable host and port of each Kafka Streams instance. Without this config, queryMetadataForKey() cannot route requests across instances and returns HostInfo.unavailable().
Exposing State via a REST Endpoint
// Minimal example using a simple HTTP server (e.g., Javalin / Micronaut)
get("/counts/{userId}", ctx -> {
String userId = ctx.pathParam("userId");
KeyQueryMetadata meta = kafkaStreams.queryMetadataForKey(
"page-view-count-store", userId, new StringSerializer());
if (meta.activeHost().equals(LOCAL_HOST_INFO)) {
ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"page-view-count-store",
QueryableStoreTypes.keyValueStore()
)
);
ctx.json(Map.of("userId", userId, "count", store.get(userId)));
} else {
// Proxy to the owning instance
ctx.redirect("http://" + meta.activeHost().host()
+ ":" + meta.activeHost().port() + "/counts/" + userId);
}
});
State Directory Configuration
The default state directory /tmp/kafka-streams is volatile and may be cleared on OS restarts, forcing full state
restoration from changelog topics on every redeploy. Always configure a stable path on a persistent, fast disk.
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams/my-app");
In containerised environments (Docker / Kubernetes), mount a PersistentVolumeClaim at the state directory path so that
state survives pod restarts. Without a persistent volume, each restart triggers a full state restore from Kafka, which can
take several minutes for large stores and causes processing lag during the restore window.
Standby Replicas
When a Kafka Streams instance fails, Kafka re-assigns its tasks to remaining instances. Those instances must first restore
the state from the changelog topic — a process that can be slow for large stores. Standby replicas solve this
by maintaining warm, partially-restored copies of state on other instances so that failover is nearly instantaneous.
// Keep 1 warm standby copy of every state store partition
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
- Each standby replica continuously catches up with the changelog topic for its assigned partitions without actually processing input records.
- On failover, the standby is promoted to active and needs to catch up only on changes since its last update (usually milliseconds).
- Standby replicas consume extra broker and application resources proportional to state store size. One standby per partition is usually sufficient.
State Restoration on Restart
When a Kafka Streams application starts and detects a local state store, it compares the local store's offset with the
latest changelog topic offset. If there is a gap, it replays changelog records to bring the store up to date before the
task resumes processing new input records. During restoration, interactive queries are served from the store in its
partially-restored state (or blocked, depending on configuration).
- Register a StateRestoreListener to monitor restoration progress and emit metrics.
- Use KafkaStreams.state() to wait for State.RUNNING before serving interactive queries to avoid stale reads.
- Set restore.consumer.max.poll.records to a higher value to speed up bulk restoration.
kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
@Override
public void onRestoreStart(TopicPartition tp, String storeName,
long startOffset, long endOffset) {
log.info("Restoring store {} partition {} from offset {} to {}",
storeName, tp.partition(), startOffset, endOffset);
}
@Override
public void onBatchRestored(TopicPartition tp, String storeName,
long batchEndOffset, long numRestored) {
log.debug("Restored {} records for store {}", numRestored, storeName);
}
@Override
public void onRestoreEnd(TopicPartition tp, String storeName,
long totalRestored) {
log.info("Restoration complete for store {}. Total records: {}",
storeName, totalRestored);
}
});
kafkaStreams.start();
RocksDB Tuning
For large state stores, RocksDB's default configuration may not be optimal. Kafka Streams allows you to inject a custom
RocksDBConfigSetter to tune block cache size, write buffer count, compression, and more.
public class CustomRocksDBConfig implements RocksDBConfigSetter {
private static final org.rocksdb.Cache blockCache =
new org.rocksdb.LRUCache(256 * 1024 * 1024L); // 256 MB shared block cache
@Override
public void setConfig(String storeName, Options options,
Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCache(blockCache);
tableConfig.setBlockSize(16 * 1024); // 16 KB block size
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
options.setMaxWriteBufferNumber(3);
options.setWriteBufferSize(64 * 1024 * 1024L); // 64 MB write buffer
options.setCompressionType(CompressionType.LZ4_COMPRESSION);
}
@Override
public void close(String storeName, Options options) {
// release native resources if any were allocated per store
}
}
// Register the custom config setter
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
CustomRocksDBConfig.class);
| Config | Default | Recommendation |
| state.dir |
/tmp/kafka-streams |
Stable, persistent, fast SSD path |
| num.standby.replicas |
0 |
1 for production; 2 for critical paths |
| rocksdb.config.setter |
None |
Custom setter when state store exceeds a few GB |
| commit.interval.ms |
30000 |
Lower (e.g., 5000) to reduce data loss on crash at cost of I/O |
| restore.consumer.max.poll.records |
1000 |
Increase to 10000+ to speed up state restoration |
| application.server |
Empty |
Set to host:port to enable cross-instance interactive queries |
This complete, runnable example ties together all the concepts covered in this article. It consumes page-view events from
a Kafka topic, groups them by user ID, counts views in 5-minute tumbling windows with a 30-second grace period, suppresses
intermediate results until each window closes, and exposes the final counts via a simple HTTP endpoint backed by interactive
queries.
Event Model
// PageViewEvent.java
public class PageViewEvent {
public String userId;
public String page;
public long timestampMs;
public PageViewEvent() {}
public PageViewEvent(String userId, String page, long timestampMs) {
this.userId = userId;
this.page = page;
this.timestampMs = timestampMs;
}
}
Topology Definition
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.*;
import java.time.Duration;
import java.util.Properties;
public class PageViewCountApp {
static final String INPUT_TOPIC = "page-views";
static final String OUTPUT_TOPIC = "page-view-window-counts";
static final String STORE_NAME = "page-view-window-store";
public static Topology buildTopology(Serde<PageViewEvent> eventSerde) {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, PageViewEvent> rawViews = builder.stream(
INPUT_TOPIC,
Consumed.with(Serdes.String(), eventSerde)
.withTimestampExtractor(
(record, previousTimestamp) ->
((PageViewEvent) record.value()).timestampMs
)
);
KTable<Windowed<String>, Long> windowedCounts = rawViews
// key is already userId — no repartition
.groupByKey(Grouped.with(Serdes.String(), eventSerde))
// 5-minute tumbling window, 30-second grace for late arrivals
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5),
Duration.ofSeconds(30)
))
// count views per user per window, backed by a named state store
.count(Materialized.as(STORE_NAME));
// Suppress intermediate updates — emit only the final count per window
windowedCounts
.suppress(Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()
))
.toStream()
.map((windowedKey, count) -> {
String userId = windowedKey.key();
long windowStart = windowedKey.window().start();
long windowEnd = windowedKey.window().end();
// Encode window boundaries into the key for easy parsing downstream
String outKey = userId + "|" + windowStart + "|" + windowEnd;
return KeyValue.pair(outKey, count);
})
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
}
Application Bootstrap
public class PageViewCountMain {
public static void main(String[] args) {
Serde<PageViewEvent> eventSerde = buildJsonSerde(PageViewEvent.class);
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"page-view-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
// Stable state directory (mount a PVC in Kubernetes)
props.put(StreamsConfig.STATE_DIR_CONFIG,
"/var/lib/kafka-streams/page-view-app");
// One standby replica for fast failover
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
// Advertise this instance for cross-instance interactive queries
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
System.getenv("POD_IP") + ":8080");
// Reduce commit interval for lower data-loss window
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5_000);
Topology topology = PageViewCountApp.buildTopology(eventSerde);
KafkaStreams streams = new KafkaStreams(topology, props);
// Log state restoration progress
streams.setGlobalStateRestoreListener(new LoggingStateRestoreListener());
// Graceful shutdown on SIGTERM
Runtime.getRuntime().addShutdownHook(
new Thread(() -> streams.close(Duration.ofSeconds(20)))
);
streams.start();
// Block until RUNNING before serving HTTP queries
waitForState(streams, KafkaStreams.State.RUNNING);
// Start the HTTP server for interactive queries (port 8080)
startHttpServer(streams);
}
private static void waitForState(KafkaStreams streams,
KafkaStreams.State target) {
while (streams.state() != target) {
try { Thread.sleep(100); }
catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
}
Interactive Query REST Handler
// GET /counts/{userId}?from=<epochMs>&to=<epochMs>
get("/counts/:userId", (req, res) -> {
String userId = req.params("userId");
Instant from = Instant.ofEpochMilli(Long.parseLong(
req.queryParamOrDefault("from",
String.valueOf(System.currentTimeMillis() - 3_600_000L))));
Instant to = Instant.ofEpochMilli(Long.parseLong(
req.queryParamOrDefault("to",
String.valueOf(System.currentTimeMillis()))));
// Locate the instance that owns this key
KeyQueryMetadata meta = streams.queryMetadataForKey(
PageViewCountApp.STORE_NAME, userId, new StringSerializer());
if (!meta.activeHost().equals(LOCAL_HOST_INFO)) {
res.redirect("http://" + meta.activeHost().host()
+ ":" + meta.activeHost().port()
+ req.uri() + "?" + req.queryString());
return null;
}
ReadOnlyWindowStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
PageViewCountApp.STORE_NAME,
QueryableStoreTypes.windowStore()
)
);
List<Map<String, Object>> results = new ArrayList<>();
try (WindowStoreIterator<Long> it = store.fetch(userId, from, to)) {
while (it.hasNext()) {
KeyValue<Long, Long> entry = it.next();
results.add(Map.of(
"windowStartMs", entry.key,
"windowEndMs", entry.key + Duration.ofMinutes(5).toMillis(),
"userId", userId,
"views", entry.value
));
}
}
res.type("application/json");
return objectMapper.writeValueAsString(results);
});
Sample Input and Expected Output
Given these events on the page-views topic (key = userId, all within the 00:00–00:05 window):
// Produced to "page-views" topic
// Key: "alice" Value: PageViewEvent("alice", "/home", timestampMs=1_000_000)
// Key: "alice" Value: PageViewEvent("alice", "/docs", timestampMs=1_060_000)
// Key: "alice" Value: PageViewEvent("alice", "/pricing", timestampMs=1_180_000)
// Key: "bob" Value: PageViewEvent("bob", "/home", timestampMs=1_020_000)
// After the window closes (grace period elapsed), the output topic receives:
// Key: "alice|0|300000" Value: 3
// Key: "bob|0|300000" Value: 1
// Interactive query — HTTP GET /counts/alice?from=0&to=300000
// Response:
// [{"windowStartMs":0,"windowEndMs":300000,"userId":"alice","views":3}]
In this example, timestamps are in milliseconds from epoch. The window [0, 300000) corresponds to the first 5 minutes after epoch. In real deployments, use actual Unix epoch milliseconds from the event payload or the Kafka record timestamp.