Contents

GuaranteeProducer ConfigConsumer HandlingRisk
At-most-onceacks=0, no retryAuto-commit before processingMessage loss on failure
At-least-onceacks=all, retries enabledCommit after processingDuplicates on retry
Exactly-onceIdempotent + transactionalisolation.level=read_committedHigher latency, lower throughput

An idempotent producer prevents duplicate messages caused by producer retries. The broker assigns each producer instance a Producer ID (PID) and tracks a sequence number per partition. If a duplicate message arrives (same PID + sequence), the broker deduplicates silently.

// Idempotent producer configuration Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Enable idempotence — forces acks=all, retries=MAX_INT, max.in.flight.requests=5 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // These are automatically set by enable.idempotence=true, but explicit for clarity: props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); KafkaProducer<String, String> producer = new KafkaProducer<>(props); Idempotence only prevents duplicates within a single producer session. If the producer process restarts, it gets a new PID. To prevent duplicates across restarts you need transactions with a stable transactional.id.

A transactional producer writes to multiple partitions atomically — either all writes succeed and become visible, or none do. The transactional.id must be unique and stable per producer instance (survives restarts).

Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Stable identity — survives producer restarts; enables zombie fencing props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-processor-1"); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30_000); // 30s KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); // register with transaction coordinator try { producer.beginTransaction(); // Write to multiple topics atomically producer.send(new ProducerRecord<>("payment.processed", payment.getId(), toJson(payment))); producer.send(new ProducerRecord<>("account.debited", payment.getAccountId(), toJson(debitEvent))); producer.send(new ProducerRecord<>("audit.log", payment.getId(), "PAYMENT_PROCESSED")); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException e) { // Fatal — another producer with same transactional.id took over (zombie fencing) // Do NOT abort — just close and restart producer.close(); } catch (KafkaException e) { // Transient error — abort and retry producer.abortTransaction(); throw e; }

The most important Kafka transactions use case: consume from one topic, process, produce to another topic, and commit the consumer offset — all atomically. If the process crashes mid-flight, the transaction is aborted and the consumer offset is not advanced, so reprocessing starts from the correct point.

public class ExactlyOnceProcessor { private final KafkaConsumer<String, String> consumer; private final KafkaProducer<String, String> producer; public void run() { consumer.subscribe(List.of("orders.raw")); producer.initTransactions(); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); if (records.isEmpty()) continue; producer.beginTransaction(); try { // 1. Process each record and produce output for (ConsumerRecord<String, String> record : records) { Order order = parseOrder(record.value()); EnrichedOrder enriched = enrichOrder(order); producer.send(new ProducerRecord<>( "orders.enriched", record.key(), toJson(enriched))); } // 2. Commit consumer offsets inside the transaction // This is the key to atomicity — offsets and output are one unit Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); for (TopicPartition tp : records.partitions()) { List<ConsumerRecord<String, String>> partRecords = records.records(tp); long lastOffset = partRecords.get(partRecords.size() - 1).offset(); offsets.put(tp, new OffsetAndMetadata(lastOffset + 1)); } // sendOffsetsToTransaction — offset commit is part of the transaction producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("order-enrichment-service")); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); log.error("Transaction aborted — will reprocess", e); } } } } When using sendOffsetsToTransaction(), configure the consumer with enable.auto.commit=false and isolation.level=read_committed. Never call consumer.commitSync() — offset commits happen exclusively through the producer transaction.

Consumers need to opt in to transactional semantics by setting isolation.level. Without it, they may read uncommitted (aborted) messages.

// Consumer that only reads committed transactional messages Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "downstream-service"); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // read_committed — skips aborted transaction messages and waits for open transactions // read_uncommitted (default) — reads all messages including aborted ones consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
Isolation LevelReads Aborted?Waits for Open Txn?Use When
read_uncommittedYesNoNo transactional producers upstream, or duplicates acceptable
read_committedNoYes — up to LSODownstream must see only committed data

LSO (Last Stable Offset) — with read_committed, consumers can only read up to the LSO, which is the offset of the oldest open transaction. Long-running transactions can stall consumers.

A "zombie" producer is an old instance that is still running after being replaced (e.g., after a restart or failover). Without fencing, the zombie could commit a transaction that conflicts with the new producer. Kafka uses the epoch mechanism to fence zombies.

// On ProducerFencedException — do NOT call abortTransaction(), just close and exit try { producer.send(...); producer.commitTransaction(); } catch (ProducerFencedException e) { log.error("This producer instance was fenced — a newer instance took over", e); producer.close(); // Close immediately — do not abort, do not retry System.exit(1); // Let the orchestrator restart this pod }

Kafka Streams provides exactly-once processing with a single config flag. Internally it uses transactions to atomically commit output records and consumer offsets per processing batch.

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration streamsConfig() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); // EOS v2 — recommended for Kafka 2.6+ // Uses one transactional producer per task (more efficient than v1's per-thread model) props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); // Commit interval — how often to flush transactions (default 100ms for EOS) props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); return new KafkaStreamsConfiguration(props); }
EOS VersionConfig ValueKafka VersionNotes
EOS v1 (alpha)exactly_onceKafka 0.11+One producer per thread — deprecated
EOS v2 (beta)exactly_once_betaKafka 2.5One producer per task — more efficient
EOS v2 (stable)exactly_once_v2Kafka 2.6+Recommended — use this

Spring Kafka wraps the transactional producer lifecycle with KafkaTransactionManager and @Transactional.

# application.yml spring: kafka: producer: transaction-id-prefix: "tx-payment-" # enables transactions; suffix is appended per producer acks: all properties: enable.idempotence: true // Spring auto-configures KafkaTransactionManager when transaction-id-prefix is set @Service public class PaymentService { private final KafkaTemplate<String, String> kafka; // @Transactional wraps all kafka.send() calls in a single Kafka transaction @Transactional("kafkaTransactionManager") public void processPayment(Payment payment) { kafka.send("payment.processed", payment.getId(), toJson(payment)); kafka.send("account.debited", payment.getAccountId(), toJson(debit(payment))); // If an exception is thrown here, both sends are aborted atomically } } // Combining a Kafka transaction with a DB transaction — requires ChainedTransactionManager // or handling rollback manually (Kafka and DB transactions cannot be made truly atomic) @Transactional // DB transaction public void processPaymentWithDb(Payment payment) { paymentRepo.save(payment); // DB write kafkaTemplate.executeInTransaction(producer -> { producer.send("payment.processed", payment.getId(), toJson(payment)); return null; }); // Note: if Kafka commit succeeds but DB commit fails, they are out of sync // Use the Outbox Pattern instead for true atomicity across DB + Kafka }