Contents

A Dead Letter Topic (DLT) is a separate Kafka topic that receives messages which could not be successfully processed by a consumer after all retry attempts have been exhausted. Rather than silently dropping the failed record or letting it block the partition indefinitely, the consumer forwards it to the DLT where it is stored for later inspection or reprocessing.

DLTs matter because message loss in event-driven systems can lead to data inconsistency, missed business events, or silent corruption. Without a DLT, you have limited options when a message fails: you can retry indefinitely (blocking the consumer), skip the message (losing data), or crash the application. A DLT gives you a fourth option — park the message safely and continue processing the rest of the partition.

When a message lands in a DLT, Spring Kafka enriches it with headers that capture the original topic, partition, offset, the exception class and message, and a timestamp. This metadata is essential for diagnosing why the message failed and deciding whether it can be retried after a fix is deployed.

// Conceptual flow of a message through retry and DLT // // 1. Consumer receives record from "orders" topic // 2. Listener throws an exception // 3. DefaultErrorHandler retries N times with backoff // 4. All retries exhausted — record is forwarded to "orders.DLT" // 5. DLT consumer picks up the record for inspection or alerting // The DLT topic name defaults to: <original-topic>.DLT // For example: orders -> orders.DLT The DLT topic must exist before the recoverer tries to publish to it. Either create it manually, set auto.create.topics.enable=true on the broker, or use a NewTopic bean in your Spring configuration.

DeadLetterPublishingRecoverer is the core class that Spring Kafka provides for publishing failed records to a DLT. It implements the ConsumerRecordRecoverer functional interface, which DefaultErrorHandler calls after all retry attempts are exhausted.

When invoked, it takes the failed ConsumerRecord and the Exception, constructs a new ProducerRecord with the original key, value, and enriched headers, and publishes it to the DLT using a KafkaTemplate. By default, the target topic name is the original topic name with a .DLT suffix appended.

import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; // Basic construction — uses the default .DLT suffix naming KafkaTemplate<Object, Object> kafkaTemplate = ...; // injected DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate); // When a record from topic "payments" fails, it publishes to "payments.DLT" // The record retains the original key and value // Headers are added with failure metadata (exception, offset, etc.)

The recoverer works with any KafkaTemplate whose key/value serializers are compatible with the original record. If your consumers use different serializer types across topics, you can supply multiple templates and a template resolver function.

import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; // Using multiple KafkaTemplates for different serialization formats KafkaTemplate<String, String> stringTemplate = ...; KafkaTemplate<String, byte[]> bytesTemplate = ...; DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer( // Template resolver: choose template based on the failed record's topic (record, exception) -> { if (record.topic().startsWith("raw-")) { return bytesTemplate; } return stringTemplate; } ); If the KafkaTemplate serializers do not match the record's key/value types, the DLT publish itself will fail and the record will be lost. Always ensure serializer compatibility between the consumer deserializer and the DLT producer serializer.

The standard way to wire a DLT is to create a DeadLetterPublishingRecoverer, pass it to a DefaultErrorHandler along with a backoff policy, and register the error handler on the ConcurrentKafkaListenerContainerFactory.

import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.FixedBackOff; @Configuration public class KafkaDltConfig { // Ensure the DLT topic exists @Bean public NewTopic ordersDlt() { return TopicBuilder.name("orders.DLT") .partitions(3) .replicas(2) .build(); } @Bean public DefaultErrorHandler errorHandler( KafkaTemplate<Object, Object> kafkaTemplate) { // Publish failed records to <topic>.DLT DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate); // Retry 3 times with 2-second intervals, then send to DLT return new DefaultErrorHandler(recoverer, new FixedBackOff(2000L, 3L)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> consumerFactory, DefaultErrorHandler errorHandler) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setCommonErrorHandler(errorHandler); return factory; } }

With this configuration in place, any @KafkaListener that uses the default container factory will automatically benefit from retry and DLT routing. The listener itself does not need any DLT-related code.

import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class OrderConsumer { @KafkaListener(topics = "orders", groupId = "order-group") public void consume(String order) { // If this throws, DefaultErrorHandler retries 3 times // then DeadLetterPublishingRecoverer sends to orders.DLT processOrder(order); } private void processOrder(String order) { if (order == null || order.isBlank()) { throw new IllegalArgumentException("Order payload is empty"); } // business logic } } If you declare DefaultErrorHandler as a bean, Spring Boot auto-configuration will wire it into the default container factory automatically. You only need the explicit ConcurrentKafkaListenerContainerFactory bean when you need additional factory customizations.

By default, DeadLetterPublishingRecoverer appends .DLT to the original topic name. You can override this by providing a BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> that determines the destination topic and partition for each failed record.

import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; KafkaTemplate<Object, Object> kafkaTemplate = ...; // Route all failures to a single centralized DLT DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer( kafkaTemplate, (record, ex) -> new TopicPartition("central-dlt", -1) // -1 = let partitioner decide );

A more advanced pattern routes different exception types to different DLT topics. This lets you separate transient failures (which might succeed on retry from a different DLT consumer) from permanent failures (which need manual intervention).

import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.FixedBackOff; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RoutedDltConfig { @Bean public DefaultErrorHandler errorHandler( KafkaTemplate<Object, Object> kafkaTemplate) { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer( kafkaTemplate, (record, ex) -> { // Route deserialization and JSON errors to a parse-errors DLT if (ex.getCause() instanceof JsonProcessingException) { return new TopicPartition("parse-errors.DLT", -1); } // Route validation errors to a validation DLT if (ex.getCause() instanceof IllegalArgumentException) { return new TopicPartition("validation-errors.DLT", -1); } // Everything else goes to the default <topic>.DLT return new TopicPartition(record.topic() + ".DLT", -1); } ); return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L)); } }

You can also preserve the original partition assignment so that DLT records land in the same partition number as the source record. This is useful when you want to maintain ordering within the DLT.

// Preserve original partition in the DLT DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer( kafkaTemplate, (record, ex) -> new TopicPartition( record.topic() + ".DLT", record.partition() // same partition number as the source ) ); When preserving the original partition number, make sure the DLT topic has at least as many partitions as the source topic. If the DLT has fewer partitions, the publish will fail with an InvalidTopicException.

When DeadLetterPublishingRecoverer publishes a record to the DLT, it adds several headers that capture the context of the failure. These headers are invaluable for debugging and building automated reprocessing pipelines.

The standard headers added by Spring Kafka include:

import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; @Service public class DltMetadataConsumer { @KafkaListener(topics = "orders.DLT", groupId = "dlt-inspector") public void inspectDlt( String payload, @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic, @Header(KafkaHeaders.DLT_ORIGINAL_PARTITION) int originalPartition, @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset, @Header(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP) long originalTimestamp, @Header(KafkaHeaders.DLT_EXCEPTION_FQCN) String exceptionClass, @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage) { System.out.printf( "DLT record from %s[%d]@%d (ts=%d)%n Exception: %s — %s%n Payload: %s%n", originalTopic, originalPartition, originalOffset, originalTimestamp, exceptionClass, exceptionMessage, payload ); } }

You can also access all headers from the raw ConsumerRecord if you prefer to iterate over them programmatically rather than binding each one with @Header.

import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; @Service public class DltHeaderIterator { @KafkaListener(topics = "orders.DLT", groupId = "dlt-header-reader") public void readAllHeaders(ConsumerRecord<String, String> record) { System.out.println("DLT record key: " + record.key()); System.out.println("DLT record value: " + record.value()); for (Header header : record.headers()) { String headerValue = header.value() != null ? new String(header.value(), StandardCharsets.UTF_8) : "null"; System.out.printf(" Header: %s = %s%n", header.key(), headerValue); } } } The KafkaHeaders constants (like KafkaHeaders.DLT_ORIGINAL_TOPIC) were introduced in Spring Kafka 2.7. If you are on an older version, use the raw string header names such as "kafka_dlt-original-topic".

A DLT is only useful if someone or something processes the messages in it. The simplest approach is a dedicated @KafkaListener that reads from the DLT topic, logs the failure details, and persists the record to a database or sends an alert.

import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; @Service public class DltProcessor { private final FailedMessageRepository failedMessageRepo; private final AlertService alertService; public DltProcessor(FailedMessageRepository failedMessageRepo, AlertService alertService) { this.failedMessageRepo = failedMessageRepo; this.alertService = alertService; } @KafkaListener(topics = "orders.DLT", groupId = "dlt-processor-group") public void processDlt( String payload, @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic, @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset, @Header(KafkaHeaders.DLT_EXCEPTION_FQCN) String exceptionClass, @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage) { // 1. Persist to a database for later analysis FailedMessage failed = new FailedMessage(); failed.setOriginalTopic(originalTopic); failed.setOriginalOffset(originalOffset); failed.setPayload(payload); failed.setExceptionClass(exceptionClass); failed.setExceptionMessage(exceptionMessage); failedMessageRepo.save(failed); // 2. Send an alert if the exception is unexpected if (!exceptionClass.contains("IllegalArgumentException")) { alertService.sendAlert( "DLT message from " + originalTopic + " offset " + originalOffset + ": " + exceptionClass + " — " + exceptionMessage ); } } }

For retry-from-DLT strategies, you can build a scheduled job or an admin endpoint that reads DLT records and republishes them to the original topic after a fix has been deployed.

import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.time.Duration; import java.util.Collections; import java.util.Properties; @Service public class DltReplayService { private final KafkaTemplate<String, String> kafkaTemplate; public DltReplayService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } /** * Replay all messages from the DLT back to the original topic. * Call this from a REST endpoint or a scheduled task after deploying a fix. */ public int replayDlt(String dltTopic, String originalTopic, int maxRecords) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "dlt-replay-" + System.currentTimeMillis()); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("max.poll.records", String.valueOf(maxRecords)); int replayed = 0; try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(dltTopic)); ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10)); for (ConsumerRecord<String, String> record : records) { kafkaTemplate.send(originalTopic, record.key(), record.value()); replayed++; } } return replayed; } } import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class DltReplayController { private final DltReplayService dltReplayService; public DltReplayController(DltReplayService dltReplayService) { this.dltReplayService = dltReplayService; } @PostMapping("/admin/dlt/replay") public String replay( @RequestParam(defaultValue = "orders.DLT") String dltTopic, @RequestParam(defaultValue = "orders") String originalTopic, @RequestParam(defaultValue = "100") int maxRecords) { int count = dltReplayService.replayDlt(dltTopic, originalTopic, maxRecords); return "Replayed " + count + " records from " + dltTopic + " to " + originalTopic; } } Be cautious when replaying DLT messages. If the root cause has not been fixed, replayed records will fail again and re-enter the DLT, creating an infinite loop. Always verify the fix before replaying, and consider adding a replay-count header to detect and break cycles.

@RetryableTopic provides a declarative, annotation-driven approach that automatically creates retry topics and a DLT. When a listener method annotated with @RetryableTopic throws an exception, the failed message is published to progressively delayed retry topics. After all retry attempts are exhausted, the message lands in a DLT named <topic>-dlt (note the hyphen, not dot, by default with @RetryableTopic).

The @DltHandler annotation marks a method in the same class that processes messages arriving at the DLT. This keeps the retry, DLT, and main consumer logic co-located.

import org.springframework.kafka.annotation.DltHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.RetryableTopic; import org.springframework.kafka.retrytopic.DltStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.retry.annotation.Backoff; import org.springframework.stereotype.Service; @Service public class PaymentConsumer { @RetryableTopic( attempts = "4", backoff = @Backoff(delay = 2000, multiplier = 2.0, maxDelay = 16000), autoCreateTopics = "true", topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, dltStrategy = DltStrategy.FAIL_ON_ERROR ) @KafkaListener(topics = "payments", groupId = "payment-group") public void consume(String payment) { System.out.println("Processing payment: " + payment); // Simulate a failure if (payment.contains("INVALID")) { throw new RuntimeException("Payment validation failed"); } } @DltHandler public void handleDlt( String payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.DLT_EXCEPTION_FQCN) String exceptionClass) { System.err.printf("Payment DLT [%s]: %s (exception: %s)%n", topic, payment, exceptionClass); // Persist, alert, or enqueue for manual review } }

The dltStrategy attribute controls what happens when the @DltHandler method itself throws an exception:

import org.springframework.kafka.annotation.RetryableTopic; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.DltHandler; import org.springframework.kafka.retrytopic.DltStrategy; import org.springframework.retry.annotation.Backoff; import org.springframework.stereotype.Service; @Service public class NotificationConsumer { // ALWAYS_RETRY_ON_ERROR: if the DLT handler fails (e.g., DB is down), // keep retrying instead of stopping the consumer @RetryableTopic( attempts = "3", backoff = @Backoff(delay = 5000), dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR ) @KafkaListener(topics = "notifications", groupId = "notification-group") public void consume(String notification) { // process notification } @DltHandler public void handleDlt(String notification) { // This method will be retried if it throws saveToDatabase(notification); // may throw if DB is temporarily down } private void saveToDatabase(String notification) { // JPA or JDBC call } }

You can also exclude certain exception types from retry so they go directly to the DLT without waiting through the retry chain.

import org.springframework.kafka.annotation.RetryableTopic; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.DltHandler; import org.springframework.retry.annotation.Backoff; import org.springframework.stereotype.Service; @Service public class InventoryConsumer { @RetryableTopic( attempts = "4", backoff = @Backoff(delay = 1000, multiplier = 2.0), exclude = { IllegalArgumentException.class, NullPointerException.class } ) @KafkaListener(topics = "inventory-updates", groupId = "inventory-group") public void consume(String update) { if (update == null) { throw new NullPointerException("Null payload"); // goes straight to DLT } processUpdate(update); // RuntimeException will be retried } @DltHandler public void handleDlt(String update) { System.err.println("Inventory DLT: " + update); } private void processUpdate(String update) { // business logic that may throw transient exceptions } } With @RetryableTopic, topics are created with the naming pattern <topic>-retry-0, <topic>-retry-1, ..., <topic>-dlt (or with delay-value suffixes when using SUFFIX_WITH_DELAY_VALUE). These are different from the .DLT suffix used by DeadLetterPublishingRecoverer directly.

Getting the most out of Dead Letter Topics requires attention to monitoring, serialization, exception classification, and consumer isolation. The following practices will help you build a robust DLT strategy.

Monitor DLT Consumer Lag

Treat DLT consumer lag as a critical metric. A growing lag on a DLT means failed messages are accumulating faster than they are being processed. Set up alerts on DLT consumer group lag using tools like Kafka's built-in kafka-consumer-groups.sh, Burrow, or your monitoring platform.

# application.properties — enable JMX metrics for consumer lag monitoring spring.kafka.consumer.properties[spring.json.trusted.packages]=* management.endpoints.web.exposure.include=health,metrics,prometheus management.metrics.tags.application=my-kafka-app
Classify Non-Retryable Exceptions

Always register exceptions that will never succeed on retry as non-retryable. This avoids wasting time and resources on doomed retries and gets failed records to the DLT immediately.

import org.apache.kafka.common.errors.SerializationException; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.serializer.DeserializationException; DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, backOff); // These exceptions will skip all retries and go straight to DLT errorHandler.addNotRetryableExceptions( DeserializationException.class, // malformed message bytes SerializationException.class, // serializer mismatch IllegalArgumentException.class, // invalid input NullPointerException.class, // missing required data ClassCastException.class // type mismatch );
Match DLT Serialization to Source

The KafkaTemplate used by DeadLetterPublishingRecoverer must use serializers compatible with the source consumer's deserializers. If your consumer uses JsonDeserializer, the DLT template should use JsonSerializer. For maximum flexibility, use ByteArraySerializer which preserves the raw bytes without re-serialization.

import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DltProducerConfig { @Bean public KafkaTemplate<byte[], byte[]> dltKafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", ByteArraySerializer.class); props.put("value.serializer", ByteArraySerializer.class); ProducerFactory<byte[], byte[]> factory = new DefaultKafkaProducerFactory<>(props); return new KafkaTemplate<>(factory); } }
Isolate DLT Consumer Groups

Always use a separate consumer group for DLT listeners. This prevents DLT processing from interfering with main topic consumption and allows you to scale, pause, or restart DLT consumers independently.

import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class IsolatedDltConsumer { // Main consumer — uses its own group @KafkaListener(topics = "orders", groupId = "orders-main-group") public void consumeOrders(String order) { processOrder(order); } // DLT consumer — dedicated group, can be scaled independently @KafkaListener(topics = "orders.DLT", groupId = "orders-dlt-group") public void consumeDlt(String failedOrder) { handleFailedOrder(failedOrder); } private void processOrder(String order) { /* ... */ } private void handleFailedOrder(String order) { /* ... */ } }
Set Retention and Cleanup Policies

DLT topics can grow indefinitely if not managed. Configure appropriate retention policies so that old DLT records are eventually cleaned up. For compliance or debugging purposes, you may want a longer retention on DLT topics than on regular topics.

import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.TopicBuilder; @Configuration public class DltTopicConfig { @Bean public NewTopic ordersDlt() { return TopicBuilder.name("orders.DLT") .partitions(3) .replicas(2) .config("retention.ms", String.valueOf(7 * 24 * 60 * 60 * 1000L)) // 7 days .config("cleanup.policy", "delete") .build(); } } Do not set DLT retention too low. If the DLT consumer is down or lagging and retention expires, unprocessed DLT records will be deleted by Kafka, resulting in silent message loss. A common pattern is to set DLT retention to at least 2x the main topic retention.