Contents

Kafka Streams supports five categories of joins. The table below summarises the co-partitioning requirement, whether a time window is involved, and the cardinality of the output.

Join Type Left Right Co-partitioning Required Window Output Cardinality
KStream-KStream KStream KStream Yes — same key, same partition count, same Serdes Required (JoinWindows) KStream — one output per matching pair within the window
KStream-KTable KStream KTable Yes — same key, same partition count None — point-in-time lookup KStream — one output per stream record (or null on leftJoin miss)
KStream-GlobalKTable KStream GlobalKTable No — fully replicated None — point-in-time lookup KStream — one output per stream record; supports foreign-key extractor
KTable-KTable (equi) KTable KTable Yes — same key, same partition count None KTable — updated whenever either side changes
KTable-KTable (FK) KTable KTable No — internal repartitioning handled automatically None KTable — updated on left changes; right changes propagate too
All joins that require co-partitioning will throw a TopologyException at build time if the partition counts of the two input topics differ. Fix by using repartition() or creating topics with matching partition counts.

The diagram below shows the data-flow model: streams carry every event independently; tables carry only the latest value per key (upsert semantics). A join between a stream and a table enriches each stream record with the current table state at lookup time.

A KStream-KStream join correlates two event streams by key within a time window. Both topics must be co-partitioned. Every record on either side is matched against all records on the other side that fall within the JoinWindows interval. The result is a new KStream.

The window is symmetric by default: a record on the left matches records on the right that arrived up to windowSize milliseconds before or after. Use JoinWindows.ofTimeDifferenceWithNoGrace() for strict windows or ofTimeDifferenceAndGrace() to tolerate late arrivals.

The example below enriches orders with matching payments that arrive within 5 minutes of the order.

import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import java.time.Duration; import java.util.Properties; // --- Domain objects (assume JSON serialisation via custom Serde) --- // Order { orderId, customerId, amount } // Payment { orderId, paymentRef, status } // EnrichedOrder { orderId, customerId, amount, paymentRef, status } Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-payments-join"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); // Both topics must have the same number of partitions. // Key = orderId on both sides — co-partitioning is satisfied. KStream<String, Order> orders = builder.stream("orders", Consumed.with(Serdes.String(), orderSerde)); KStream<String, Payment> payments = builder.stream("payments", Consumed.with(Serdes.String(), paymentSerde)); // Inner join: emit only when both order AND payment exist within 5-min window KStream<String, EnrichedOrder> enriched = orders.join( payments, (order, payment) -> new EnrichedOrder( order.getOrderId(), order.getCustomerId(), order.getAmount(), payment.getPaymentRef(), payment.getStatus() ), JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)), StreamJoined.with(Serdes.String(), orderSerde, paymentSerde) ); enriched.to("enriched-orders", Produced.with(Serdes.String(), enrichedOrderSerde)); // Left join variant: emit order even when payment has not arrived yet KStream<String, EnrichedOrder> enrichedLeft = orders.leftJoin( payments, (order, payment) -> { String ref = payment != null ? payment.getPaymentRef() : "PENDING"; String status = payment != null ? payment.getStatus() : "AWAITING_PAYMENT"; return new EnrichedOrder(order.getOrderId(), order.getCustomerId(), order.getAmount(), ref, status); }, JoinWindows.ofTimeDifferenceAndGrace( Duration.ofMinutes(5), Duration.ofMinutes(1)), // 1 min grace for late arrivals StreamJoined.with(Serdes.String(), orderSerde, paymentSerde) ); // Outer join: emit partial records from either side even without a match KStream<String, EnrichedOrder> enrichedOuter = orders.outerJoin( payments, (order, payment) -> { if (order == null) { // Payment arrived but no matching order — orphan payment return new EnrichedOrder(payment.getOrderId(), null, 0.0, payment.getPaymentRef(), payment.getStatus()); } String ref = payment != null ? payment.getPaymentRef() : "PENDING"; String status = payment != null ? payment.getStatus() : "AWAITING_PAYMENT"; return new EnrichedOrder(order.getOrderId(), order.getCustomerId(), order.getAmount(), ref, status); }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)), StreamJoined.with(Serdes.String(), orderSerde, paymentSerde) ); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); Kafka Streams stores unmatched records from both sides in windowed state stores until a match arrives or the window closes. Large windows with high-throughput topics will consume significant local disk (RocksDB). Always set a grace period with ofTimeDifferenceAndGrace() rather than ofTimeDifferenceWithNoGrace() when out-of-order events are possible.

A KStream-KTable join enriches every incoming stream record with the current value of a table row that shares the same key. There is no time window — the lookup is instantaneous at the moment the stream record is processed. The KTable acts as a slowly-changing dimension (e.g. customer profile, product catalogue).

import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; // --- Topics --- // "orders" key=orderId, value=Order { orderId, customerId, amount } // "customers" key=customerId, value=Customer { customerId, name, tier } // Goal: enrich each order with the customer name and tier // // Problem: orders are keyed by orderId; customers by customerId. // We must re-key orders by customerId before joining. StreamsBuilder builder = new StreamsBuilder(); KStream<String, Order> orders = builder.stream("orders", Consumed.with(Serdes.String(), orderSerde)); // KTable — latest customer record per customerId KTable<String, Customer> customers = builder.table("customers", Consumed.with(Serdes.String(), customerSerde), Materialized.as("customers-store")); // Step 1: re-key the order stream by customerId (triggers repartition topic) KStream<String, Order> ordersByCustomer = orders .selectKey((orderId, order) -> order.getCustomerId()); // Step 2: inner join — only emit when customer record exists KStream<String, EnrichedOrder> enriched = ordersByCustomer.join( customers, (order, customer) -> new EnrichedOrder( order.getOrderId(), customer.getName(), customer.getTier(), order.getAmount() ) ); // Step 3: left join variant — emit even when customer is not yet in the table KStream<String, EnrichedOrder> enrichedLeft = ordersByCustomer.leftJoin( customers, (order, customer) -> { String name = customer != null ? customer.getName() : "UNKNOWN"; String tier = customer != null ? customer.getTier() : "STANDARD"; return new EnrichedOrder(order.getOrderId(), name, tier, order.getAmount()); } ); enriched.to("enriched-orders", Produced.with(Serdes.String(), enrichedOrderSerde)); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); When you call selectKey(), Kafka Streams marks the stream as "key changed" and will automatically repartition it through an internal topic before the join. The internal topic name is derived from the application ID. No manual through() call is needed, but the partition count of the repartition topic must match the KTable source topic — set this via Repartitioned.numberOfPartitions() if needed.

A GlobalKTable is replicated in full to every application instance. Unlike a regular KTable (which is sharded by partition), every instance has a complete copy of all keys. This eliminates the co-partitioning requirement and enables foreign-key lookups directly in the join call via a KeyValueMapper.

When to prefer GlobalKTable vs KTable:

CriterionKTableGlobalKTable
Table size Any size — sharded across instances Small-to-medium — entire table fits on every instance
Co-partitioning Required — stream and table must share key and partition count Not required — any key relationship supported
Foreign-key lookup Requires re-keying via selectKey() Native via KeyValueMapper
Memory & disk per instance Only the partition(s) assigned to the instance Full table on every instance
Startup time Fast — restore only assigned partitions Slower — restore all partitions on startup
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; // Scenario: orders keyed by orderId; products table keyed by productId. // Each order contains a productId field — foreign-key enrichment. StreamsBuilder builder = new StreamsBuilder(); // GlobalKTable — fully replicated; no co-partitioning needed GlobalKTable<String, Product> products = builder.globalTable("products", Consumed.with(Serdes.String(), productSerde), Materialized.as("products-global-store")); KStream<String, Order> orders = builder.stream("orders", Consumed.with(Serdes.String(), orderSerde)); // KeyValueMapper: (streamKey, streamValue) -> lookupKey in the GlobalKTable KeyValueMapper<String, Order, String> productKeyExtractor = (orderId, order) -> order.getProductId(); // ValueJoiner: combine stream value + table value ValueJoiner<Order, Product, EnrichedOrder> joiner = (order, product) -> new EnrichedOrder( order.getOrderId(), order.getCustomerId(), order.getAmount(), product.getName(), product.getCategory() ); // Inner join — only emit when product exists in the GlobalKTable KStream<String, EnrichedOrder> enriched = orders.join(products, productKeyExtractor, joiner); // Left join — emit even when product is missing (product will be null) KStream<String, EnrichedOrder> enrichedLeft = orders.leftJoin( products, productKeyExtractor, (order, product) -> { String name = product != null ? product.getName() : "UNKNOWN_PRODUCT"; String category = product != null ? product.getCategory() : "UNCATEGORIZED"; return new EnrichedOrder(order.getOrderId(), order.getCustomerId(), order.getAmount(), name, category); } ); enriched.to("enriched-orders", Produced.with(Serdes.String(), enrichedOrderSerde)); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); A GlobalKTable reads from all partitions of the source topic regardless of the instance's partition assignment. If your lookup table has millions of rows, the memory and disk cost multiplies by the number of application instances. In that case, prefer a regular KTable join with selectKey() + repartition, or use the KTable foreign-key join API (Kafka Streams 2.4+).

Two KTables can be joined when they share the same key. The result is a new KTable that is updated whenever either input table changes. This is an equijoin — no window and no foreign-key extraction.

import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import java.time.Duration; // Scenario: two tables keyed by orderId. // "order-status" table: { orderId -> status } // "order-shipping" table: { orderId -> trackingNumber } // Goal: a combined table with status + trackingNumber per orderId. StreamsBuilder builder = new StreamsBuilder(); KTable<String, OrderStatus> statusTable = builder.table("order-status", Consumed.with(Serdes.String(), orderStatusSerde), Materialized.as("order-status-store")); KTable<String, Shipping> shippingTable = builder.table("order-shipping", Consumed.with(Serdes.String(), shippingSerde), Materialized.as("order-shipping-store")); // Inner join — row present only when BOTH tables have a value for the key KTable<String, OrderSummary> summary = statusTable.join( shippingTable, (status, shipping) -> new OrderSummary( status.getOrderId(), status.getStatus(), shipping.getTrackingNumber() ) ); // Left join — order summary emitted even before shipping record arrives KTable<String, OrderSummary> summaryLeft = statusTable.leftJoin( shippingTable, (status, shipping) -> { String tracking = shipping != null ? shipping.getTrackingNumber() : "NOT_SHIPPED"; return new OrderSummary(status.getOrderId(), status.getStatus(), tracking); } ); // Suppress intermediate updates — emit only the final value for each key // per processing tick (useful before writing to a compacted output topic) summaryLeft .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(1), BufferConfig.maxRecords(10_000).emitEarlyWhenFull())) .toStream() .to("order-summaries", Produced.with(Serdes.String(), orderSummarySerde)); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); Every time either input KTable receives an update for a key, the join re-evaluates that key and emits a new record to the output KTable. If both tables are high-churn, use suppress() to buffer updates and emit only the latest value within a time limit, reducing downstream write amplification.

Introduced in Kafka Streams 2.4, the KTable-KTable foreign-key join allows joining two KTables where the left table holds a foreign key that references the primary key of the right table — without requiring co-partitioning or a GlobalKTable. Kafka Streams handles the internal repartitioning automatically.

import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import java.util.function.Function; // Scenario: // "orders" KTable keyed by orderId, value = Order { orderId, customerId, amount } // "customers" KTable keyed by customerId, value = Customer { customerId, name, tier } // // orders.customerId is a foreign key into customers.customerId. // Topics can have different partition counts — FK join handles this internally. StreamsBuilder builder = new StreamsBuilder(); KTable<String, Order> ordersTable = builder.table("orders", Consumed.with(Serdes.String(), orderSerde), Materialized.as("orders-store")); KTable<String, Customer> customersTable = builder.table("customers", Consumed.with(Serdes.String(), customerSerde), Materialized.as("customers-store")); // Foreign-key extractor: given an Order value, return the customerId (right-table key) Function<Order, String> fkExtractor = order -> order.getCustomerId(); // Inner FK join — result row exists only when the customer record also exists KTable<String, EnrichedOrder> enriched = ordersTable.join( customersTable, fkExtractor, (order, customer) -> new EnrichedOrder( order.getOrderId(), customer.getName(), customer.getTier(), order.getAmount() ) ); // Left FK join — result row exists for every order; customer fields null when missing KTable<String, EnrichedOrder> enrichedLeft = ordersTable.leftJoin( customersTable, fkExtractor, (order, customer) -> { String name = customer != null ? customer.getName() : "UNKNOWN"; String tier = customer != null ? customer.getTier() : "STANDARD"; return new EnrichedOrder(order.getOrderId(), name, tier, order.getAmount()); } ); enriched .toStream() .to("enriched-orders", Produced.with(Serdes.String(), enrichedOrderSerde)); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); The FK join creates two internal repartition topics: one to route left-table updates to the correct right-table partition, and one to propagate right-table updates back to matching left rows. This adds some latency and broker storage but eliminates the need for a GlobalKTable for large lookup tables. Available since kafka-streams 2.4.0.

Co-partitioning is satisfied when all three conditions hold for both join inputs:

What breaks co-partitioning:

Fixing co-partitioning with repartition():

import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.common.serialization.Serdes; StreamsBuilder builder = new StreamsBuilder(); KStream<String, Order> orders = builder.stream("orders-6part", // 6 partitions Consumed.with(Serdes.String(), orderSerde)); KTable<String, Customer> customers = builder.table("customers-12part", // 12 partitions Consumed.with(Serdes.String(), customerSerde)); // Re-key orders by customerId first KStream<String, Order> ordersByCustomer = orders .selectKey((orderId, order) -> order.getCustomerId()); // Explicit repartition to match the customers topic partition count (12) KStream<String, Order> repartitioned = ordersByCustomer .repartition(Repartitioned .<String, Order>as("orders-by-customer-repartitioned") .withNumberOfPartitions(12) .withKeySerde(Serdes.String()) .withValueSerde(orderSerde)); // Now the partition counts match — join is valid KStream<String, EnrichedOrder> enriched = repartitioned.join( customers, (order, customer) -> new EnrichedOrder( order.getOrderId(), customer.getName(), customer.getTier(), order.getAmount() ) ); enriched.to("enriched-orders"); // Alternative: use through() to write/read via an intermediate topic // (older approach, works but less explicit than repartition()) KStream<String, Order> throughRepartitioned = ordersByCustomer .through("orders-by-customer-repartitioned", Produced.with(Serdes.String(), orderSerde)); Prefer repartition() over through() for new code — it is explicit about the number of partitions and is managed entirely by the Streams runtime (auto-created internal topic). through() writes to a user-managed topic and requires you to create the topic with the right partition count manually.

The following issues cause silent data loss or incorrect join results and are frequently encountered in production Kafka Streams applications.

1. Joining on the wrong side (KTable lookup returns latest value only)

A KTable join is a point-in-time lookup. When an order stream record arrives, the join reflects the current state of the KTable at that instant. If the KTable record was not yet received (lag on the table topic) the join returns null or the previous value. This is particularly problematic during application startup when the KTable is still being populated from the beginning of the changelog topic.

2. Window too small causing missed KStream-KStream joins

// BAD: 1-second window too tight for network jitter between orders and payments KStream<String, EnrichedOrder> tooTight = orders.join( payments, (order, payment) -> enrich(order, payment), JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(1)) // will miss many joins ); // GOOD: 5-minute window with a grace period for late arrivals KStream<String, EnrichedOrder> reasonable = orders.join( payments, (order, payment) -> enrich(order, payment), JoinWindows.ofTimeDifferenceAndGrace( Duration.ofMinutes(5), // join window — match records within 5 min of each other Duration.ofMinutes(1)) // grace — accept late records up to 1 min after window close );

3. Not accounting for out-of-order events

Kafka guarantees ordering within a partition, not across partitions or topics. Records from different partitions can arrive at the processor in any wall-clock order. Event-time timestamps (set by the producer) may differ significantly from processing time.

// Configure a custom TimestampExtractor that reads event time from the record value props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, OrderTimestampExtractor.class); // Example extractor implementation public class OrderTimestampExtractor implements TimestampExtractor { @Override public long extract(ConsumerRecord<Object, Object> record, long partitionTime) { if (record.value() instanceof Order) { return ((Order) record.value()).getEventTimestamp(); } return partitionTime; // fallback to partition time if type is unexpected } }

4. Re-keying without repartitioning before a join

Calling selectKey() or map() on a stream changes the key but does not immediately repartition the data. If you join this stream against a KTable before repartitioning, the same key may land on different partitions, producing wrong or missing join results. Kafka Streams marks the stream as "key changed" and will auto-repartition when needed, but it is best practice to call repartition() explicitly for clarity.

// Print the topology to verify repartition nodes are present Topology topology = builder.build(); System.out.println(topology.describe()); // Look for: --> KSTREAM-SINK-... (repartition) // <-- KSTREAM-SOURCE-... (repartition)

5. Forgetting that KTable-KTable joins emit on every side update

A KTable-KTable join re-emits the result row every time either side changes. In a high-update scenario this produces many intermediate records downstream. Use suppress() to emit only the final value within a time interval, or write to a compacted Kafka topic so downstream consumers see only the latest per key.