Contents
- Join Types Overview
- KStream-KStream Join
- KStream-KTable Join
- GlobalKTable
- KTable-KTable Join
- Foreign-Key Joins (KTable-KTable FK)
- Co-partitioning Rules & Repartitioning
- Common Pitfalls
Kafka Streams supports five categories of joins. The table below summarises the co-partitioning requirement,
whether a time window is involved, and the cardinality of the output.
| Join Type |
Left |
Right |
Co-partitioning Required |
Window |
Output Cardinality |
| KStream-KStream |
KStream |
KStream |
Yes — same key, same partition count, same Serdes |
Required (JoinWindows) |
KStream — one output per matching pair within the window |
| KStream-KTable |
KStream |
KTable |
Yes — same key, same partition count |
None — point-in-time lookup |
KStream — one output per stream record (or null on leftJoin miss) |
| KStream-GlobalKTable |
KStream |
GlobalKTable |
No — fully replicated |
None — point-in-time lookup |
KStream — one output per stream record; supports foreign-key extractor |
| KTable-KTable (equi) |
KTable |
KTable |
Yes — same key, same partition count |
None |
KTable — updated whenever either side changes |
| KTable-KTable (FK) |
KTable |
KTable |
No — internal repartitioning handled automatically |
None |
KTable — updated on left changes; right changes propagate too |
All joins that require co-partitioning will throw a TopologyException at build time if
the partition counts of the two input topics differ. Fix by using repartition() or
creating topics with matching partition counts.
The diagram below shows the data-flow model: streams carry every event independently;
tables carry only the latest value per key (upsert semantics).
A join between a stream and a table enriches each stream record with the current table state at lookup time.
- join() — inner join: output only when both sides match.
- leftJoin() — left outer: left side always produces output; right side may be null.
- outerJoin() — full outer: output when either side has a record (stream-stream only).
A KStream-KStream join correlates two event streams by key within a time window.
Both topics must be co-partitioned. Every record on either side is matched against
all records on the other side that fall within the JoinWindows interval.
The result is a new KStream.
- join() — inner: both sides must have a record within the window.
- leftJoin() — left outer: every record from the left side produces output; if no right-side record exists within the window the right value is null.
- outerJoin() — full outer: any record on either side eventually produces output, padding the missing side with null.
The window is symmetric by default: a record on the left matches records on the right that arrived
up to windowSize milliseconds before or after. Use JoinWindows.ofTimeDifferenceWithNoGrace()
for strict windows or ofTimeDifferenceAndGrace() to tolerate late arrivals.
The example below enriches orders with matching payments
that arrive within 5 minutes of the order.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
// --- Domain objects (assume JSON serialisation via custom Serde) ---
// Order { orderId, customerId, amount }
// Payment { orderId, paymentRef, status }
// EnrichedOrder { orderId, customerId, amount, paymentRef, status }
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-payments-join");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Both topics must have the same number of partitions.
// Key = orderId on both sides — co-partitioning is satisfied.
KStream<String, Order> orders =
builder.stream("orders", Consumed.with(Serdes.String(), orderSerde));
KStream<String, Payment> payments =
builder.stream("payments", Consumed.with(Serdes.String(), paymentSerde));
// Inner join: emit only when both order AND payment exist within 5-min window
KStream<String, EnrichedOrder> enriched = orders.join(
payments,
(order, payment) -> new EnrichedOrder(
order.getOrderId(),
order.getCustomerId(),
order.getAmount(),
payment.getPaymentRef(),
payment.getStatus()
),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);
enriched.to("enriched-orders", Produced.with(Serdes.String(), enrichedOrderSerde));
// Left join variant: emit order even when payment has not arrived yet
KStream<String, EnrichedOrder> enrichedLeft = orders.leftJoin(
payments,
(order, payment) -> {
String ref = payment != null ? payment.getPaymentRef() : "PENDING";
String status = payment != null ? payment.getStatus() : "AWAITING_PAYMENT";
return new EnrichedOrder(order.getOrderId(), order.getCustomerId(),
order.getAmount(), ref, status);
},
JoinWindows.ofTimeDifferenceAndGrace(
Duration.ofMinutes(5), Duration.ofMinutes(1)), // 1 min grace for late arrivals
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);
// Outer join: emit partial records from either side even without a match
KStream<String, EnrichedOrder> enrichedOuter = orders.outerJoin(
payments,
(order, payment) -> {
if (order == null) {
// Payment arrived but no matching order — orphan payment
return new EnrichedOrder(payment.getOrderId(), null, 0.0,
payment.getPaymentRef(), payment.getStatus());
}
String ref = payment != null ? payment.getPaymentRef() : "PENDING";
String status = payment != null ? payment.getStatus() : "AWAITING_PAYMENT";
return new EnrichedOrder(order.getOrderId(), order.getCustomerId(),
order.getAmount(), ref, status);
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Kafka Streams stores unmatched records from both sides in windowed state stores until a match arrives or
the window closes. Large windows with high-throughput topics will consume significant local disk (RocksDB).
Always set a grace period with ofTimeDifferenceAndGrace() rather than
ofTimeDifferenceWithNoGrace() when out-of-order events are possible.
A KStream-KTable join enriches every incoming stream record with the current value
of a table row that shares the same key. There is no time window — the lookup is instantaneous
at the moment the stream record is processed. The KTable acts as a slowly-changing dimension
(e.g. customer profile, product catalogue).
- Both the KStream input topic and the KTable source topic must be co-partitioned (same key type, same partition count).
- If the record key on the stream side does not match the KTable key (foreign-key enrichment), you must repartition first using selectKey() + repartition().
- Use leftJoin() when a missing table row is valid (nullable enrichment).
- Table updates do not trigger re-emission of stream records — only new stream records trigger the join.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
// --- Topics ---
// "orders" key=orderId, value=Order { orderId, customerId, amount }
// "customers" key=customerId, value=Customer { customerId, name, tier }
// Goal: enrich each order with the customer name and tier
//
// Problem: orders are keyed by orderId; customers by customerId.
// We must re-key orders by customerId before joining.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders =
builder.stream("orders", Consumed.with(Serdes.String(), orderSerde));
// KTable — latest customer record per customerId
KTable<String, Customer> customers =
builder.table("customers", Consumed.with(Serdes.String(), customerSerde),
Materialized.as("customers-store"));
// Step 1: re-key the order stream by customerId (triggers repartition topic)
KStream<String, Order> ordersByCustomer = orders
.selectKey((orderId, order) -> order.getCustomerId());
// Step 2: inner join — only emit when customer record exists
KStream<String, EnrichedOrder> enriched = ordersByCustomer.join(
customers,
(order, customer) -> new EnrichedOrder(
order.getOrderId(),
customer.getName(),
customer.getTier(),
order.getAmount()
)
);
// Step 3: left join variant — emit even when customer is not yet in the table
KStream<String, EnrichedOrder> enrichedLeft = ordersByCustomer.leftJoin(
customers,
(order, customer) -> {
String name = customer != null ? customer.getName() : "UNKNOWN";
String tier = customer != null ? customer.getTier() : "STANDARD";
return new EnrichedOrder(order.getOrderId(), name, tier, order.getAmount());
}
);
enriched.to("enriched-orders", Produced.with(Serdes.String(), enrichedOrderSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
When you call selectKey(), Kafka Streams marks the stream as "key changed" and will
automatically repartition it through an internal topic before the join. The internal topic name is
derived from the application ID. No manual through() call is needed, but the partition count
of the repartition topic must match the KTable source topic — set this via
Repartitioned.numberOfPartitions() if needed.
A GlobalKTable is replicated in full to every application instance.
Unlike a regular KTable (which is sharded by partition), every instance has a complete copy of all keys.
This eliminates the co-partitioning requirement and enables foreign-key lookups
directly in the join call via a KeyValueMapper.
- Created with builder.globalTable().
- No co-partitioning requirement — the stream key and table key can be completely different.
- The KeyValueMapper extracts the lookup key from the stream record (foreign-key extraction).
- A ValueJoiner combines the stream value and table value into the output value.
- Only join() and leftJoin() are supported (no outerJoin()).
When to prefer GlobalKTable vs KTable:
| Criterion | KTable | GlobalKTable |
| Table size |
Any size — sharded across instances |
Small-to-medium — entire table fits on every instance |
| Co-partitioning |
Required — stream and table must share key and partition count |
Not required — any key relationship supported |
| Foreign-key lookup |
Requires re-keying via selectKey() |
Native via KeyValueMapper |
| Memory & disk per instance |
Only the partition(s) assigned to the instance |
Full table on every instance |
| Startup time |
Fast — restore only assigned partitions |
Slower — restore all partitions on startup |
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
// Scenario: orders keyed by orderId; products table keyed by productId.
// Each order contains a productId field — foreign-key enrichment.
StreamsBuilder builder = new StreamsBuilder();
// GlobalKTable — fully replicated; no co-partitioning needed
GlobalKTable<String, Product> products =
builder.globalTable("products",
Consumed.with(Serdes.String(), productSerde),
Materialized.as("products-global-store"));
KStream<String, Order> orders =
builder.stream("orders", Consumed.with(Serdes.String(), orderSerde));
// KeyValueMapper: (streamKey, streamValue) -> lookupKey in the GlobalKTable
KeyValueMapper<String, Order, String> productKeyExtractor =
(orderId, order) -> order.getProductId();
// ValueJoiner: combine stream value + table value
ValueJoiner<Order, Product, EnrichedOrder> joiner =
(order, product) -> new EnrichedOrder(
order.getOrderId(),
order.getCustomerId(),
order.getAmount(),
product.getName(),
product.getCategory()
);
// Inner join — only emit when product exists in the GlobalKTable
KStream<String, EnrichedOrder> enriched = orders.join(products, productKeyExtractor, joiner);
// Left join — emit even when product is missing (product will be null)
KStream<String, EnrichedOrder> enrichedLeft = orders.leftJoin(
products,
productKeyExtractor,
(order, product) -> {
String name = product != null ? product.getName() : "UNKNOWN_PRODUCT";
String category = product != null ? product.getCategory() : "UNCATEGORIZED";
return new EnrichedOrder(order.getOrderId(), order.getCustomerId(),
order.getAmount(), name, category);
}
);
enriched.to("enriched-orders", Produced.with(Serdes.String(), enrichedOrderSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
A GlobalKTable reads from all partitions of the source topic regardless of the instance's
partition assignment. If your lookup table has millions of rows, the memory and disk cost multiplies
by the number of application instances. In that case, prefer a regular KTable join with
selectKey() + repartition, or use the KTable foreign-key join API (Kafka Streams 2.4+).
Two KTables can be joined when they share the same key. The result is a new KTable
that is updated whenever either input table changes. This is an equijoin — no
window and no foreign-key extraction.
- Both KTables must be co-partitioned (same key type, same partition count, compatible Serdes).
- join() — inner: result row exists only when both sides have a non-null value for the key.
- leftJoin() — left outer: result row exists whenever the left side has a value; right may be null.
- outerJoin() — full outer: result row exists whenever either side has a value.
- A null value on either side acts as a delete tombstone for that key.
- Consider applying suppress() before writing downstream to avoid intermediate update churn.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import java.time.Duration;
// Scenario: two tables keyed by orderId.
// "order-status" table: { orderId -> status }
// "order-shipping" table: { orderId -> trackingNumber }
// Goal: a combined table with status + trackingNumber per orderId.
StreamsBuilder builder = new StreamsBuilder();
KTable<String, OrderStatus> statusTable =
builder.table("order-status",
Consumed.with(Serdes.String(), orderStatusSerde),
Materialized.as("order-status-store"));
KTable<String, Shipping> shippingTable =
builder.table("order-shipping",
Consumed.with(Serdes.String(), shippingSerde),
Materialized.as("order-shipping-store"));
// Inner join — row present only when BOTH tables have a value for the key
KTable<String, OrderSummary> summary = statusTable.join(
shippingTable,
(status, shipping) -> new OrderSummary(
status.getOrderId(),
status.getStatus(),
shipping.getTrackingNumber()
)
);
// Left join — order summary emitted even before shipping record arrives
KTable<String, OrderSummary> summaryLeft = statusTable.leftJoin(
shippingTable,
(status, shipping) -> {
String tracking = shipping != null ? shipping.getTrackingNumber() : "NOT_SHIPPED";
return new OrderSummary(status.getOrderId(), status.getStatus(), tracking);
}
);
// Suppress intermediate updates — emit only the final value for each key
// per processing tick (useful before writing to a compacted output topic)
summaryLeft
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(1),
BufferConfig.maxRecords(10_000).emitEarlyWhenFull()))
.toStream()
.to("order-summaries", Produced.with(Serdes.String(), orderSummarySerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Every time either input KTable receives an update for a key, the join re-evaluates that key and emits
a new record to the output KTable. If both tables are high-churn, use suppress() to
buffer updates and emit only the latest value within a time limit, reducing downstream write amplification.
Introduced in Kafka Streams 2.4, the KTable-KTable foreign-key join allows
joining two KTables where the left table holds a foreign key that references the
primary key of the right table — without requiring co-partitioning or a GlobalKTable.
Kafka Streams handles the internal repartitioning automatically.
- Method signature: leftTable.join(rightTable, foreignKeyExtractor, valueJoiner)
- The foreignKeyExtractor is a Function<LeftValue, RightKey> — extracts the right-table lookup key from the left value.
- When the left table record changes, the join re-evaluates. When the right table record changes, affected left rows re-evaluate automatically.
- No manual repartitioning, selectKey(), or GlobalKTable is needed.
- Supports join() (inner) and leftJoin() (left outer).
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.function.Function;
// Scenario:
// "orders" KTable keyed by orderId, value = Order { orderId, customerId, amount }
// "customers" KTable keyed by customerId, value = Customer { customerId, name, tier }
//
// orders.customerId is a foreign key into customers.customerId.
// Topics can have different partition counts — FK join handles this internally.
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Order> ordersTable =
builder.table("orders",
Consumed.with(Serdes.String(), orderSerde),
Materialized.as("orders-store"));
KTable<String, Customer> customersTable =
builder.table("customers",
Consumed.with(Serdes.String(), customerSerde),
Materialized.as("customers-store"));
// Foreign-key extractor: given an Order value, return the customerId (right-table key)
Function<Order, String> fkExtractor = order -> order.getCustomerId();
// Inner FK join — result row exists only when the customer record also exists
KTable<String, EnrichedOrder> enriched = ordersTable.join(
customersTable,
fkExtractor,
(order, customer) -> new EnrichedOrder(
order.getOrderId(),
customer.getName(),
customer.getTier(),
order.getAmount()
)
);
// Left FK join — result row exists for every order; customer fields null when missing
KTable<String, EnrichedOrder> enrichedLeft = ordersTable.leftJoin(
customersTable,
fkExtractor,
(order, customer) -> {
String name = customer != null ? customer.getName() : "UNKNOWN";
String tier = customer != null ? customer.getTier() : "STANDARD";
return new EnrichedOrder(order.getOrderId(), name, tier, order.getAmount());
}
);
enriched
.toStream()
.to("enriched-orders", Produced.with(Serdes.String(), enrichedOrderSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
The FK join creates two internal repartition topics: one to route left-table updates to the
correct right-table partition, and one to propagate right-table updates back to matching left rows.
This adds some latency and broker storage but eliminates the need for a GlobalKTable for large lookup tables.
Available since kafka-streams 2.4.0.
Co-partitioning is satisfied when all three conditions hold for both join inputs:
- Same key type and the same key Serde (so the same key serialises to the same bytes).
- Same number of partitions on the source topics.
- Records with the same key land in the same partition (default Kafka partitioner is consistent for this).
What breaks co-partitioning:
- Topics were created with different partition counts.
- A selectKey() or map() that changes the key without a subsequent repartition.
- Using a custom partitioner on one topic but not the other.
- Producer wrote records with explicit partition assignments bypassing the partitioner.
Fixing co-partitioning with repartition():
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders =
builder.stream("orders-6part", // 6 partitions
Consumed.with(Serdes.String(), orderSerde));
KTable<String, Customer> customers =
builder.table("customers-12part", // 12 partitions
Consumed.with(Serdes.String(), customerSerde));
// Re-key orders by customerId first
KStream<String, Order> ordersByCustomer = orders
.selectKey((orderId, order) -> order.getCustomerId());
// Explicit repartition to match the customers topic partition count (12)
KStream<String, Order> repartitioned = ordersByCustomer
.repartition(Repartitioned
.<String, Order>as("orders-by-customer-repartitioned")
.withNumberOfPartitions(12)
.withKeySerde(Serdes.String())
.withValueSerde(orderSerde));
// Now the partition counts match — join is valid
KStream<String, EnrichedOrder> enriched = repartitioned.join(
customers,
(order, customer) -> new EnrichedOrder(
order.getOrderId(),
customer.getName(),
customer.getTier(),
order.getAmount()
)
);
enriched.to("enriched-orders");
// Alternative: use through() to write/read via an intermediate topic
// (older approach, works but less explicit than repartition())
KStream<String, Order> throughRepartitioned = ordersByCustomer
.through("orders-by-customer-repartitioned",
Produced.with(Serdes.String(), orderSerde));
Prefer repartition() over through() for new code — it is explicit about
the number of partitions and is managed entirely by the Streams runtime (auto-created internal topic).
through() writes to a user-managed topic and requires you to create the topic with the
right partition count manually.
- Avoid unnecessary repartitioning — each repartition() introduces a full round-trip through Kafka (produce + consume), adding latency and broker load.
- Design your topic partition counts upfront to match join partners and avoid the need for repartitioning at runtime.
- Use the KTable FK join or GlobalKTable when co-partitioning is impractical to achieve.
The following issues cause silent data loss or incorrect join results and are frequently encountered
in production Kafka Streams applications.
1. Joining on the wrong side (KTable lookup returns latest value only)
A KTable join is a point-in-time lookup. When an order stream record arrives, the join
reflects the current state of the KTable at that instant. If the KTable record
was not yet received (lag on the table topic) the join returns null or the previous value.
This is particularly problematic during application startup when the KTable is still being
populated from the beginning of the changelog topic.
- Use leftJoin() and handle the null case explicitly rather than relying on an inner join that silently drops records.
- For correctness at startup, wait for the KTable to be fully populated before sending stream records, or use KafkaStreams.state() to wait for RUNNING state after restore completes.
2. Window too small causing missed KStream-KStream joins
// BAD: 1-second window too tight for network jitter between orders and payments
KStream<String, EnrichedOrder> tooTight = orders.join(
payments,
(order, payment) -> enrich(order, payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(1)) // will miss many joins
);
// GOOD: 5-minute window with a grace period for late arrivals
KStream<String, EnrichedOrder> reasonable = orders.join(
payments,
(order, payment) -> enrich(order, payment),
JoinWindows.ofTimeDifferenceAndGrace(
Duration.ofMinutes(5), // join window — match records within 5 min of each other
Duration.ofMinutes(1)) // grace — accept late records up to 1 min after window close
);
3. Not accounting for out-of-order events
Kafka guarantees ordering within a partition, not across partitions or topics.
Records from different partitions can arrive at the processor in any wall-clock order.
Event-time timestamps (set by the producer) may differ significantly from processing time.
- Always use event-time timestamps (TimestampExtractor) rather than relying on processing time for windowed joins.
- Configure an appropriate grace period to absorb out-of-order arrivals.
- Use WallclockTimestampExtractor only for prototyping — it breaks deterministic replay.
// Configure a custom TimestampExtractor that reads event time from the record value
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
OrderTimestampExtractor.class);
// Example extractor implementation
public class OrderTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
if (record.value() instanceof Order) {
return ((Order) record.value()).getEventTimestamp();
}
return partitionTime; // fallback to partition time if type is unexpected
}
}
4. Re-keying without repartitioning before a join
Calling selectKey() or map() on a stream changes the key but does not
immediately repartition the data. If you join this stream against a KTable before repartitioning,
the same key may land on different partitions, producing wrong or missing join results.
Kafka Streams marks the stream as "key changed" and will auto-repartition when needed,
but it is best practice to call repartition() explicitly for clarity.
- After any selectKey() or key-changing map(), always call repartition() before joining with a KTable or another KStream.
- Inspect the topology description (topology.describe()) and look for REPARTITION nodes to verify Kafka Streams has inserted the repartition step.
// Print the topology to verify repartition nodes are present
Topology topology = builder.build();
System.out.println(topology.describe());
// Look for: --> KSTREAM-SINK-... (repartition)
// <-- KSTREAM-SOURCE-... (repartition)
5. Forgetting that KTable-KTable joins emit on every side update
A KTable-KTable join re-emits the result row every time either side changes.
In a high-update scenario this produces many intermediate records downstream.
Use suppress() to emit only the final value within a time interval, or
write to a compacted Kafka topic so downstream consumers see only the latest per key.