Contents
- What Is a Dead Letter Topic
- DeadLetterPublishingRecoverer
- Basic DLT Configuration
- Custom DLT Topic Naming
- DLT Headers & Metadata
- Processing DLT Messages
- @RetryableTopic DLT
- Best Practices
A Dead Letter Topic (DLT) is a separate Kafka topic that receives messages which could not be successfully processed by a consumer after all retry attempts have been exhausted. Rather than silently dropping the failed record or letting it block the partition indefinitely, the consumer forwards it to the DLT where it is stored for later inspection or reprocessing.
DLTs matter because message loss in event-driven systems can lead to data inconsistency, missed business events, or silent corruption. Without a DLT, you have limited options when a message fails: you can retry indefinitely (blocking the consumer), skip the message (losing data), or crash the application. A DLT gives you a fourth option — park the message safely and continue processing the rest of the partition.
When a message lands in a DLT, Spring Kafka enriches it with headers that capture the original topic, partition, offset, the exception class and message, and a timestamp. This metadata is essential for diagnosing why the message failed and deciding whether it can be retried after a fix is deployed.
// Conceptual flow of a message through retry and DLT
//
// 1. Consumer receives record from "orders" topic
// 2. Listener throws an exception
// 3. DefaultErrorHandler retries N times with backoff
// 4. All retries exhausted — record is forwarded to "orders.DLT"
// 5. DLT consumer picks up the record for inspection or alerting
// The DLT topic name defaults to: <original-topic>.DLT
// For example: orders -> orders.DLT
The DLT topic must exist before the recoverer tries to publish to it. Either create it manually, set auto.create.topics.enable=true on the broker, or use a NewTopic bean in your Spring configuration.
DeadLetterPublishingRecoverer is the core class that Spring Kafka provides for publishing failed records to a DLT. It implements the ConsumerRecordRecoverer functional interface, which DefaultErrorHandler calls after all retry attempts are exhausted.
When invoked, it takes the failed ConsumerRecord and the Exception, constructs a new ProducerRecord with the original key, value, and enriched headers, and publishes it to the DLT using a KafkaTemplate. By default, the target topic name is the original topic name with a .DLT suffix appended.
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
// Basic construction — uses the default .DLT suffix naming
KafkaTemplate<Object, Object> kafkaTemplate = ...; // injected
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate);
// When a record from topic "payments" fails, it publishes to "payments.DLT"
// The record retains the original key and value
// Headers are added with failure metadata (exception, offset, etc.)
The recoverer works with any KafkaTemplate whose key/value serializers are compatible with the original record. If your consumers use different serializer types across topics, you can supply multiple templates and a template resolver function.
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
// Using multiple KafkaTemplates for different serialization formats
KafkaTemplate<String, String> stringTemplate = ...;
KafkaTemplate<String, byte[]> bytesTemplate = ...;
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
// Template resolver: choose template based on the failed record's topic
(record, exception) -> {
if (record.topic().startsWith("raw-")) {
return bytesTemplate;
}
return stringTemplate;
}
);
If the KafkaTemplate serializers do not match the record's key/value types, the DLT publish itself will fail and the record will be lost. Always ensure serializer compatibility between the consumer deserializer and the DLT producer serializer.
The standard way to wire a DLT is to create a DeadLetterPublishingRecoverer, pass it to a DefaultErrorHandler along with a backoff policy, and register the error handler on the ConcurrentKafkaListenerContainerFactory.
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
@Configuration
public class KafkaDltConfig {
// Ensure the DLT topic exists
@Bean
public NewTopic ordersDlt() {
return TopicBuilder.name("orders.DLT")
.partitions(3)
.replicas(2)
.build();
}
@Bean
public DefaultErrorHandler errorHandler(
KafkaTemplate<Object, Object> kafkaTemplate) {
// Publish failed records to <topic>.DLT
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate);
// Retry 3 times with 2-second intervals, then send to DLT
return new DefaultErrorHandler(recoverer, new FixedBackOff(2000L, 3L));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory,
DefaultErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
With this configuration in place, any @KafkaListener that uses the default container factory will automatically benefit from retry and DLT routing. The listener itself does not need any DLT-related code.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-group")
public void consume(String order) {
// If this throws, DefaultErrorHandler retries 3 times
// then DeadLetterPublishingRecoverer sends to orders.DLT
processOrder(order);
}
private void processOrder(String order) {
if (order == null || order.isBlank()) {
throw new IllegalArgumentException("Order payload is empty");
}
// business logic
}
}
If you declare DefaultErrorHandler as a bean, Spring Boot auto-configuration will wire it into the default container factory automatically. You only need the explicit ConcurrentKafkaListenerContainerFactory bean when you need additional factory customizations.
By default, DeadLetterPublishingRecoverer appends .DLT to the original topic name. You can override this by providing a BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> that determines the destination topic and partition for each failed record.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
KafkaTemplate<Object, Object> kafkaTemplate = ...;
// Route all failures to a single centralized DLT
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, ex) -> new TopicPartition("central-dlt", -1) // -1 = let partitioner decide
);
A more advanced pattern routes different exception types to different DLT topics. This lets you separate transient failures (which might succeed on retry from a different DLT consumer) from permanent failures (which need manual intervention).
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RoutedDltConfig {
@Bean
public DefaultErrorHandler errorHandler(
KafkaTemplate<Object, Object> kafkaTemplate) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, ex) -> {
// Route deserialization and JSON errors to a parse-errors DLT
if (ex.getCause() instanceof JsonProcessingException) {
return new TopicPartition("parse-errors.DLT", -1);
}
// Route validation errors to a validation DLT
if (ex.getCause() instanceof IllegalArgumentException) {
return new TopicPartition("validation-errors.DLT", -1);
}
// Everything else goes to the default <topic>.DLT
return new TopicPartition(record.topic() + ".DLT", -1);
}
);
return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
}
}
You can also preserve the original partition assignment so that DLT records land in the same partition number as the source record. This is useful when you want to maintain ordering within the DLT.
// Preserve original partition in the DLT
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, ex) -> new TopicPartition(
record.topic() + ".DLT",
record.partition() // same partition number as the source
)
);
When preserving the original partition number, make sure the DLT topic has at least as many partitions as the source topic. If the DLT has fewer partitions, the publish will fail with an InvalidTopicException.
A DLT is only useful if someone or something processes the messages in it. The simplest approach is a dedicated @KafkaListener that reads from the DLT topic, logs the failure details, and persists the record to a database or sends an alert.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
@Service
public class DltProcessor {
private final FailedMessageRepository failedMessageRepo;
private final AlertService alertService;
public DltProcessor(FailedMessageRepository failedMessageRepo,
AlertService alertService) {
this.failedMessageRepo = failedMessageRepo;
this.alertService = alertService;
}
@KafkaListener(topics = "orders.DLT", groupId = "dlt-processor-group")
public void processDlt(
String payload,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset,
@Header(KafkaHeaders.DLT_EXCEPTION_FQCN) String exceptionClass,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage) {
// 1. Persist to a database for later analysis
FailedMessage failed = new FailedMessage();
failed.setOriginalTopic(originalTopic);
failed.setOriginalOffset(originalOffset);
failed.setPayload(payload);
failed.setExceptionClass(exceptionClass);
failed.setExceptionMessage(exceptionMessage);
failedMessageRepo.save(failed);
// 2. Send an alert if the exception is unexpected
if (!exceptionClass.contains("IllegalArgumentException")) {
alertService.sendAlert(
"DLT message from " + originalTopic + " offset " + originalOffset
+ ": " + exceptionClass + " — " + exceptionMessage
);
}
}
}
For retry-from-DLT strategies, you can build a scheduled job or an admin endpoint that reads DLT records and republishes them to the original topic after a fix has been deployed.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
@Service
public class DltReplayService {
private final KafkaTemplate<String, String> kafkaTemplate;
public DltReplayService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* Replay all messages from the DLT back to the original topic.
* Call this from a REST endpoint or a scheduled task after deploying a fix.
*/
public int replayDlt(String dltTopic, String originalTopic, int maxRecords) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "dlt-replay-" + System.currentTimeMillis());
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.records", String.valueOf(maxRecords));
int replayed = 0;
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(dltTopic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) {
kafkaTemplate.send(originalTopic, record.key(), record.value());
replayed++;
}
}
return replayed;
}
}
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DltReplayController {
private final DltReplayService dltReplayService;
public DltReplayController(DltReplayService dltReplayService) {
this.dltReplayService = dltReplayService;
}
@PostMapping("/admin/dlt/replay")
public String replay(
@RequestParam(defaultValue = "orders.DLT") String dltTopic,
@RequestParam(defaultValue = "orders") String originalTopic,
@RequestParam(defaultValue = "100") int maxRecords) {
int count = dltReplayService.replayDlt(dltTopic, originalTopic, maxRecords);
return "Replayed " + count + " records from " + dltTopic + " to " + originalTopic;
}
}
Be cautious when replaying DLT messages. If the root cause has not been fixed, replayed records will fail again and re-enter the DLT, creating an infinite loop. Always verify the fix before replaying, and consider adding a replay-count header to detect and break cycles.
@RetryableTopic provides a declarative, annotation-driven approach that automatically creates retry topics and a DLT. When a listener method annotated with @RetryableTopic throws an exception, the failed message is published to progressively delayed retry topics. After all retry attempts are exhausted, the message lands in a DLT named <topic>-dlt (note the hyphen, not dot, by default with @RetryableTopic).
The @DltHandler annotation marks a method in the same class that processes messages arriving at the DLT. This keeps the retry, DLT, and main consumer logic co-located.
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Service;
@Service
public class PaymentConsumer {
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 2000, multiplier = 2.0, maxDelay = 16000),
autoCreateTopics = "true",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE,
dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(topics = "payments", groupId = "payment-group")
public void consume(String payment) {
System.out.println("Processing payment: " + payment);
// Simulate a failure
if (payment.contains("INVALID")) {
throw new RuntimeException("Payment validation failed");
}
}
@DltHandler
public void handleDlt(
String payment,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.DLT_EXCEPTION_FQCN) String exceptionClass) {
System.err.printf("Payment DLT [%s]: %s (exception: %s)%n",
topic, payment, exceptionClass);
// Persist, alert, or enqueue for manual review
}
}
The dltStrategy attribute controls what happens when the @DltHandler method itself throws an exception:
- DltStrategy.FAIL_ON_ERROR — the DLT consumer stops and the failed DLT record is not committed. This is the safest option because it prevents DLT message loss. The record will be redelivered when the consumer restarts.
- DltStrategy.ALWAYS_RETRY_ON_ERROR — the DLT consumer keeps retrying the DLT record with the same backoff policy. Use this when the DLT handler has transient dependencies (like a database) that may recover.
- DltStrategy.NO_DLT — disables DLT creation entirely. Failed records are simply logged and skipped after retry exhaustion. Use this only when message loss is acceptable.
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Service;
@Service
public class NotificationConsumer {
// ALWAYS_RETRY_ON_ERROR: if the DLT handler fails (e.g., DB is down),
// keep retrying instead of stopping the consumer
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 5000),
dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR
)
@KafkaListener(topics = "notifications", groupId = "notification-group")
public void consume(String notification) {
// process notification
}
@DltHandler
public void handleDlt(String notification) {
// This method will be retried if it throws
saveToDatabase(notification); // may throw if DB is temporarily down
}
private void saveToDatabase(String notification) {
// JPA or JDBC call
}
}
You can also exclude certain exception types from retry so they go directly to the DLT without waiting through the retry chain.
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Service;
@Service
public class InventoryConsumer {
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
exclude = { IllegalArgumentException.class, NullPointerException.class }
)
@KafkaListener(topics = "inventory-updates", groupId = "inventory-group")
public void consume(String update) {
if (update == null) {
throw new NullPointerException("Null payload"); // goes straight to DLT
}
processUpdate(update); // RuntimeException will be retried
}
@DltHandler
public void handleDlt(String update) {
System.err.println("Inventory DLT: " + update);
}
private void processUpdate(String update) {
// business logic that may throw transient exceptions
}
}
With @RetryableTopic, topics are created with the naming pattern <topic>-retry-0, <topic>-retry-1, ..., <topic>-dlt (or with delay-value suffixes when using SUFFIX_WITH_DELAY_VALUE). These are different from the .DLT suffix used by DeadLetterPublishingRecoverer directly.
Getting the most out of Dead Letter Topics requires attention to monitoring, serialization, exception classification, and consumer isolation. The following practices will help you build a robust DLT strategy.
Monitor DLT Consumer Lag
Treat DLT consumer lag as a critical metric. A growing lag on a DLT means failed messages are accumulating faster than they are being processed. Set up alerts on DLT consumer group lag using tools like Kafka's built-in kafka-consumer-groups.sh, Burrow, or your monitoring platform.
# application.properties — enable JMX metrics for consumer lag monitoring
spring.kafka.consumer.properties[spring.json.trusted.packages]=*
management.endpoints.web.exposure.include=health,metrics,prometheus
management.metrics.tags.application=my-kafka-app
Classify Non-Retryable Exceptions
Always register exceptions that will never succeed on retry as non-retryable. This avoids wasting time and resources on doomed retries and gets failed records to the DLT immediately.
import org.apache.kafka.common.errors.SerializationException;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.DeserializationException;
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, backOff);
// These exceptions will skip all retries and go straight to DLT
errorHandler.addNotRetryableExceptions(
DeserializationException.class, // malformed message bytes
SerializationException.class, // serializer mismatch
IllegalArgumentException.class, // invalid input
NullPointerException.class, // missing required data
ClassCastException.class // type mismatch
);
Match DLT Serialization to Source
The KafkaTemplate used by DeadLetterPublishingRecoverer must use serializers compatible with the source consumer's deserializers. If your consumer uses JsonDeserializer, the DLT template should use JsonSerializer. For maximum flexibility, use ByteArraySerializer which preserves the raw bytes without re-serialization.
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DltProducerConfig {
@Bean
public KafkaTemplate<byte[], byte[]> dltKafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", ByteArraySerializer.class);
props.put("value.serializer", ByteArraySerializer.class);
ProducerFactory<byte[], byte[]> factory =
new DefaultKafkaProducerFactory<>(props);
return new KafkaTemplate<>(factory);
}
}
Isolate DLT Consumer Groups
Always use a separate consumer group for DLT listeners. This prevents DLT processing from interfering with main topic consumption and allows you to scale, pause, or restart DLT consumers independently.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class IsolatedDltConsumer {
// Main consumer — uses its own group
@KafkaListener(topics = "orders", groupId = "orders-main-group")
public void consumeOrders(String order) {
processOrder(order);
}
// DLT consumer — dedicated group, can be scaled independently
@KafkaListener(topics = "orders.DLT", groupId = "orders-dlt-group")
public void consumeDlt(String failedOrder) {
handleFailedOrder(failedOrder);
}
private void processOrder(String order) { /* ... */ }
private void handleFailedOrder(String order) { /* ... */ }
}
Set Retention and Cleanup Policies
DLT topics can grow indefinitely if not managed. Configure appropriate retention policies so that old DLT records are eventually cleaned up. For compliance or debugging purposes, you may want a longer retention on DLT topics than on regular topics.
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class DltTopicConfig {
@Bean
public NewTopic ordersDlt() {
return TopicBuilder.name("orders.DLT")
.partitions(3)
.replicas(2)
.config("retention.ms", String.valueOf(7 * 24 * 60 * 60 * 1000L)) // 7 days
.config("cleanup.policy", "delete")
.build();
}
}
Do not set DLT retention too low. If the DLT consumer is down or lagging and retention expires, unprocessed DLT records will be deleted by Kafka, resulting in silent message loss. A common pattern is to set DLT retention to at least 2x the main topic retention.