Contents
- Delivery Guarantees Recap
- Idempotent Producer
- Transactional Producer API
- Read-Process-Write Pattern
- Consumer Isolation Levels
- Zombie Fencing
- Kafka Streams EOS
- Spring Kafka Transactions
- Trade-offs & When to Use EOS
| Guarantee | Producer Config | Consumer Handling | Risk |
| At-most-once | acks=0, no retry | Auto-commit before processing | Message loss on failure |
| At-least-once | acks=all, retries enabled | Commit after processing | Duplicates on retry |
| Exactly-once | Idempotent + transactional | isolation.level=read_committed | Higher latency, lower throughput |
An idempotent producer prevents duplicate messages caused by producer retries. The broker assigns each producer instance a Producer ID (PID) and tracks a sequence number per partition. If a duplicate message arrives (same PID + sequence), the broker deduplicates silently.
// Idempotent producer configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Enable idempotence — forces acks=all, retries=MAX_INT, max.in.flight.requests=5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// These are automatically set by enable.idempotence=true, but explicit for clarity:
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Idempotence only prevents duplicates within a single producer session. If the producer process restarts, it gets a new PID. To prevent duplicates across restarts you need transactions with a stable transactional.id.
A transactional producer writes to multiple partitions atomically — either all writes succeed and become visible, or none do. The transactional.id must be unique and stable per producer instance (survives restarts).
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Stable identity — survives producer restarts; enables zombie fencing
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-processor-1");
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30_000); // 30s
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // register with transaction coordinator
try {
producer.beginTransaction();
// Write to multiple topics atomically
producer.send(new ProducerRecord<>("payment.processed",
payment.getId(), toJson(payment)));
producer.send(new ProducerRecord<>("account.debited",
payment.getAccountId(), toJson(debitEvent)));
producer.send(new ProducerRecord<>("audit.log",
payment.getId(), "PAYMENT_PROCESSED"));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
// Fatal — another producer with same transactional.id took over (zombie fencing)
// Do NOT abort — just close and restart
producer.close();
} catch (KafkaException e) {
// Transient error — abort and retry
producer.abortTransaction();
throw e;
}
The most important Kafka transactions use case: consume from one topic, process, produce to another topic, and commit the consumer offset — all atomically. If the process crashes mid-flight, the transaction is aborted and the consumer offset is not advanced, so reprocessing starts from the correct point.
public class ExactlyOnceProcessor {
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
public void run() {
consumer.subscribe(List.of("orders.raw"));
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
// 1. Process each record and produce output
for (ConsumerRecord<String, String> record : records) {
Order order = parseOrder(record.value());
EnrichedOrder enriched = enrichOrder(order);
producer.send(new ProducerRecord<>(
"orders.enriched", record.key(), toJson(enriched)));
}
// 2. Commit consumer offsets inside the transaction
// This is the key to atomicity — offsets and output are one unit
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords = records.records(tp);
long lastOffset = partRecords.get(partRecords.size() - 1).offset();
offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
}
// sendOffsetsToTransaction — offset commit is part of the transaction
producer.sendOffsetsToTransaction(offsets,
new ConsumerGroupMetadata("order-enrichment-service"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
log.error("Transaction aborted — will reprocess", e);
}
}
}
}
When using sendOffsetsToTransaction(), configure the consumer with enable.auto.commit=false and isolation.level=read_committed. Never call consumer.commitSync() — offset commits happen exclusively through the producer transaction.
Consumers need to opt in to transactional semantics by setting isolation.level. Without it, they may read uncommitted (aborted) messages.
// Consumer that only reads committed transactional messages
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "downstream-service");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// read_committed — skips aborted transaction messages and waits for open transactions
// read_uncommitted (default) — reads all messages including aborted ones
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
| Isolation Level | Reads Aborted? | Waits for Open Txn? | Use When |
| read_uncommitted | Yes | No | No transactional producers upstream, or duplicates acceptable |
| read_committed | No | Yes — up to LSO | Downstream must see only committed data |
LSO (Last Stable Offset) — with read_committed, consumers can only read up to the LSO, which is the offset of the oldest open transaction. Long-running transactions can stall consumers.
A "zombie" producer is an old instance that is still running after being replaced (e.g., after a restart or failover). Without fencing, the zombie could commit a transaction that conflicts with the new producer. Kafka uses the epoch mechanism to fence zombies.
- Each transactional.id has an associated epoch stored in the transaction coordinator.
- When a new producer calls initTransactions() with the same transactional.id, the epoch is incremented.
- Any message from the old producer (with the old epoch) is rejected with ProducerFencedException.
- The zombie's pending transaction is aborted by the coordinator.
// On ProducerFencedException — do NOT call abortTransaction(), just close and exit
try {
producer.send(...);
producer.commitTransaction();
} catch (ProducerFencedException e) {
log.error("This producer instance was fenced — a newer instance took over", e);
producer.close(); // Close immediately — do not abort, do not retry
System.exit(1); // Let the orchestrator restart this pod
}
Kafka Streams provides exactly-once processing with a single config flag. Internally it uses transactions to atomically commit output records and consumer offsets per processing batch.
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration streamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
// EOS v2 — recommended for Kafka 2.6+
// Uses one transactional producer per task (more efficient than v1's per-thread model)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Commit interval — how often to flush transactions (default 100ms for EOS)
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
return new KafkaStreamsConfiguration(props);
}
| EOS Version | Config Value | Kafka Version | Notes |
| EOS v1 (alpha) | exactly_once | Kafka 0.11+ | One producer per thread — deprecated |
| EOS v2 (beta) | exactly_once_beta | Kafka 2.5 | One producer per task — more efficient |
| EOS v2 (stable) | exactly_once_v2 | Kafka 2.6+ | Recommended — use this |
Spring Kafka wraps the transactional producer lifecycle with KafkaTransactionManager and @Transactional.
# application.yml
spring:
kafka:
producer:
transaction-id-prefix: "tx-payment-" # enables transactions; suffix is appended per producer
acks: all
properties:
enable.idempotence: true
// Spring auto-configures KafkaTransactionManager when transaction-id-prefix is set
@Service
public class PaymentService {
private final KafkaTemplate<String, String> kafka;
// @Transactional wraps all kafka.send() calls in a single Kafka transaction
@Transactional("kafkaTransactionManager")
public void processPayment(Payment payment) {
kafka.send("payment.processed", payment.getId(), toJson(payment));
kafka.send("account.debited", payment.getAccountId(), toJson(debit(payment)));
// If an exception is thrown here, both sends are aborted atomically
}
}
// Combining a Kafka transaction with a DB transaction — requires ChainedTransactionManager
// or handling rollback manually (Kafka and DB transactions cannot be made truly atomic)
@Transactional // DB transaction
public void processPaymentWithDb(Payment payment) {
paymentRepo.save(payment); // DB write
kafkaTemplate.executeInTransaction(producer -> {
producer.send("payment.processed", payment.getId(), toJson(payment));
return null;
});
// Note: if Kafka commit succeeds but DB commit fails, they are out of sync
// Use the Outbox Pattern instead for true atomicity across DB + Kafka
}
- Use EOS when duplicate processing causes real harm — financial transactions, inventory deductions, billing events.
- Avoid EOS for analytics pipelines where duplicates can be deduplicated downstream cheaply (e.g., COUNT DISTINCT in a data warehouse).
- Latency cost: EOS adds ~2× end-to-end latency due to the two-phase commit protocol between producer, transaction coordinator, and partition leaders.
- Throughput cost: 20–30% throughput reduction compared to at-least-once with the same hardware.
- Alternative: At-least-once + idempotent consumers is often simpler and nearly as safe. Use enable.idempotence=true on the producer and handle deduplication in the consumer using a processed-event table or natural keys.