Dead Letter Queue and Error Handling in Kafka
In a distributed system, messages fail. Corrupt deserialization, business logic throwing exception, downstream service unreachable — in all these cases the consumer must decide what to do. This guide covers the Fundamental patterns for error handling in Kafka: Dead Letter Queue, exponential retry with backoff, poison pill detection and reprocessing strategies.
The Error Problem in Kafka
Unlike RabbitMQ or SQS where the message is automatically re-encoded in case of error, Kafka has different semantics: the consumer commits offsets explicitly. If the consumer does not commit and restarts, it will read the same messages again. This creates a real risk: a malformed message (poison pill) can block the entire consumer group in infinite loop, preventing subsequent messages in the same partition from being processed.
The three main error scenarios to handle in Kafka are:
- Transient errors: Downstream service temporarily unavailable, network timeout — retry makes sense
- Permanent errors: malformed message, non-recoverable business rule violation — retry is useless
- Deserialization errors: The payload scheme has changed in an incompatible way — the poison pill scenario
Delivery Semantics and Error Management
- At-most-once: commit before processing, messages lost on crash — never use in production
- At-least-once: commit after successful processing, possible duplicates on retry — standard
- Exactly-once: Requires idempotent consumer + Kafka transactions — for critical use cases
Pattern 1: Dead Letter Queue (DLQ)
La Dead Letter Queue it is a separate Kafka topic where messages are sent which fail to be processed after a maximum number of attempts. Instead of blocking the consumer or losing the message, we move it to a quarantine topic for manual analysis or future reprocessing.
The standard naming convention is {topic-originale}.DLT o {topic-originale}-dlq.
The message in the DLQ should include the original message plus metadata about the error
(stacktrace, number of attempts, failure timestamp) via Kafka headers.
// KafkaDLQHandler.java - Pattern base per Dead Letter Queue
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Headers;
import java.time.Duration;
import java.util.*;
public class KafkaDLQHandler {
private static final String SOURCE_TOPIC = "ordini-effettuati";
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final int MAX_RETRY_ATTEMPTS = 3;
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final Map<String, Integer> retryCount = new HashMap<>();
public KafkaDLQHandler(String bootstrapServers) {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("group.id", "servizio-inventario-dlq");
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("enable.auto.commit", false);
consumerProps.put("auto.offset.reset", "earliest");
this.consumer = new KafkaConsumer<>(consumerProps);
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", bootstrapServers);
producerProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks", "all");
this.producer = new KafkaProducer<>(producerProps);
}
public void processWithDLQ() {
consumer.subscribe(List.of(SOURCE_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageKey = record.topic() + "-" + record.partition() + "-" + record.offset();
try {
// Tenta l'elaborazione del messaggio
elaboraOrdine(record.value());
// Successo: rimuovi dal contatore retry e fai commit
retryCount.remove(messageKey);
consumer.commitSync();
} catch (RecoverableException e) {
// Errore transitorio: incrementa contatore
int attempts = retryCount.getOrDefault(messageKey, 0) + 1;
retryCount.put(messageKey, attempts);
if (attempts >= MAX_RETRY_ATTEMPTS) {
// Troppi tentativi: manda in DLQ
sendToDLQ(record, e, attempts);
retryCount.remove(messageKey);
consumer.commitSync();
} else {
// Ritenta: non fare commit, il messaggio verra riletto
System.err.println("Tentativo " + attempts + "/" + MAX_RETRY_ATTEMPTS +
" fallito per offset " + record.offset() + ": " + e.getMessage());
sleep(calculateBackoff(attempts));
}
} catch (PermanentException e) {
// Errore permanente: va direttamente in DLQ senza retry
sendToDLQ(record, e, 1);
consumer.commitSync();
}
}
}
}
private void sendToDLQ(ConsumerRecord<String, String> originalRecord,
Exception error, int attempts) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
originalRecord.key(),
originalRecord.value()
);
// Arricchisci con headers per il debugging
Headers headers = dlqRecord.headers();
headers.add("dlq-original-topic", originalRecord.topic().getBytes());
headers.add("dlq-original-partition",
String.valueOf(originalRecord.partition()).getBytes());
headers.add("dlq-original-offset",
String.valueOf(originalRecord.offset()).getBytes());
headers.add("dlq-error-message", error.getMessage().getBytes());
headers.add("dlq-error-class", error.getClass().getName().getBytes());
headers.add("dlq-retry-count", String.valueOf(attempts).getBytes());
headers.add("dlq-failed-at",
String.valueOf(System.currentTimeMillis()).getBytes());
// Copia anche gli header originali
originalRecord.headers().forEach(h ->
headers.add("original-" + h.key(), h.value()));
producer.send(dlqRecord, (metadata, ex) -> {
if (ex != null) {
System.err.println("CRITICO: impossibile scrivere in DLQ: " + ex.getMessage());
} else {
System.out.printf("Messaggio inviato a DLQ: topic=%s, offset=%d, errore=%s%n",
DLQ_TOPIC, metadata.offset(), error.getMessage());
}
});
}
private long calculateBackoff(int attempt) {
// Exponential backoff: 1s, 2s, 4s, 8s, ...
return (long) Math.min(Math.pow(2, attempt - 1) * 1000, 30000);
}
private void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
Pattern 2: Retry with Exponential Backoff
Il retry with exponential backoff is the standard pattern for handling transient errors: each failed attempt increases the waiting time before the next attempt, avoiding overloading the downstream service already in difficulty. Add jitter (random noise) at backoff avoids the problem of thundering herd: all consumers restarting simultaneously to the exact second.
// RetryWithBackoff.java - Strategia di retry con exponential backoff + jitter
import java.util.Random;
import java.util.function.Supplier;
public class RetryWithBackoff {
private static final Random random = new Random();
/**
* Esegue l'operazione con retry esponenziale + jitter.
*
* @param operation La lambda da eseguire
* @param maxRetries Numero massimo di tentativi
* @param baseDelayMs Delay base in millisecondi (default: 1000ms)
* @param maxDelayMs Delay massimo in millisecondi (default: 30000ms)
*/
public static <T> T execute(Supplier<T> operation,
int maxRetries,
long baseDelayMs,
long maxDelayMs) throws Exception {
int attempt = 0;
Exception lastException = null;
while (attempt < maxRetries) {
try {
return operation.get();
} catch (RetryableException e) {
lastException = e;
attempt++;
if (attempt >= maxRetries) {
throw new MaxRetriesExceededException(
"Superato il numero massimo di tentativi: " + maxRetries, e);
}
// Calcola delay con full jitter
long delay = calculateJitteredDelay(attempt, baseDelayMs, maxDelayMs);
System.err.printf("Tentativo %d/%d fallito. Prossimo retry tra %dms%n",
attempt, maxRetries, delay);
Thread.sleep(delay);
}
}
throw new MaxRetriesExceededException("Nessun tentativo riuscito", lastException);
}
/**
* Full jitter: delay casuale tra 0 e il backoff esponenziale.
* Evita il thundering herd distribuendo i retry nel tempo.
*/
private static long calculateJitteredDelay(int attempt, long baseDelay, long maxDelay) {
long exponentialDelay = (long) Math.min(Math.pow(2, attempt) * baseDelay, maxDelay);
// Full jitter: random tra 0 e exponentialDelay
return (long) (random.nextDouble() * exponentialDelay);
}
// Eccezioni custom per distinguere errori recuperabili da permanenti
public static class RetryableException extends RuntimeException {
public RetryableException(String message, Throwable cause) { super(message, cause); }
}
public static class MaxRetriesExceededException extends Exception {
public MaxRetriesExceededException(String message, Throwable cause) { super(message, cause); }
}
}
// Utilizzo nel consumer Kafka
RetryWithBackoff.execute(
() -> {
chiamataServizioEsterno(record.value());
return null;
},
maxRetries: 3,
baseDelayMs: 1000,
maxDelayMs: 30000
);
Pattern 3: Retry Topic (Non-Blocking Retry)
The problem with the sleep approach in the consumer is that lock the entire partition: while we wait for the retry, no other messages from the same partition are processed, causing growth in consumer lag.
The pattern Retry Topic (o Non-Blocking Retry) solves this problem: instead of locking the partition, the failed message is moved to a separate retry topic with a configured delay. The primary consumer continues to process new messages. Spring Kafka 2.7+ implements this pattern natively.
// Struttura dei topic con retry non-bloccante
// Topic principale: ordini-effettuati
// Topic retry 1: ordini-effettuati-retry-1000 (delay 1s)
// Topic retry 2: ordini-effettuati-retry-2000 (delay 2s)
// Topic retry 3: ordini-effettuati-retry-5000 (delay 5s)
// Topic DLQ: ordini-effettuati.DLT
// Con Spring Kafka @RetryableTopic
import org.springframework.kafka.annotation.*;
import org.springframework.kafka.retrytopic.*;
@Component
public class OrdineConsumerNonBlocking {
@RetryableTopic(
attempts = "4", // 1 tentativo originale + 3 retry
backoff = @Backoff(
delay = 1000,
multiplier = 2.0,
maxDelay = 10000
),
dltTopicSuffix = ".DLT",
retryTopicSuffix = "-retry",
// Non riprova per errori non recuperabili
exclude = {
DeserializationException.class,
PermanentBusinessException.class
}
)
@KafkaListener(topics = "ordini-effettuati", groupId = "servizio-inventario")
public void consumeOrdine(ConsumerRecord<String, String> record) {
// Spring Kafka gestisce automaticamente i retry e la DLQ
elaboraOrdine(record.value());
}
// Listener per la DLQ: analisi e alerting
@DltHandler
public void handleDlt(ConsumerRecord<String, String> record,
@Header KafkaHeaders.DLT_EXCEPTION_MESSAGE String errorMessage) {
System.err.printf("Messaggio in DLQ: key=%s, errore=%s%n",
record.key(), errorMessage);
// Invia alert, log su monitoring, notifica operatori...
alertingService.sendDLQAlert(record.topic(), record.key(), errorMessage);
}
}
Pattern 4: Poison Pill Detection
Un poison pill it is a message that always causes a consumer crash, regardless of the number of retries. The classic case is a deserialization error: the message value is not in the expected format (corrupt JSON, incompatible Avro schema).
The risk of the poison pill is the infinite loop: the consumer fails, does not commit, restarts, reads the same message, fails again. This locks the partition completely. The main defense is to use a ErrorHandlingDeserializer which throws no exception but wraps the corrupt payload in a manageable error object.
// Configurazione con ErrorHandlingDeserializer (Spring Kafka)
// Questo deserializer cattura l'eccezione invece di propagarla,
// permettendo al consumer di mandare il messaggio in DLQ
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Ordine> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "servizio-inventario");
// ErrorHandlingDeserializer wrappa il deserializer originale
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
// Il deserializer "reale" (delegato)
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
JsonDeserializer.class);
// Target type per la deserializzazione JSON
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.Ordine");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Ordine> kafkaListenerContainerFactory(
ConsumerFactory<String, Ordine> consumerFactory,
KafkaTemplate<String, Ordine> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Ordine> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// DeadLetterPublishingRecoverer: invia automaticamente in DLQ su DeserializationException
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", -1)),
new FixedBackOff(1000L, 3L) // 3 tentativi, 1s di attesa
));
return factory;
}
}
Reprocessing from DLQ
A DLQ is not a permanent trash can: it is a quarantine space from which messages they must be reprocessed after the problem is resolved. There are two approaches:
- Manual reprocessing: an operator inspects messages in DLQ, identifies the cause, corrects the problem (deploy a fix, restore the downstream service), then resends the messages in the original topic.
- Automatic reprocessing: A separate consumer periodically reads the DLQ and tries to reprocess messages, with a scheduling policy (e.g. every hour, after a specific alert).
// DLQReprocessor.java - Consumer della DLQ per reprocessing selettivo
public class DLQReprocessor {
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final String SOURCE_TOPIC = "ordini-effettuati";
/**
* Rimanda in produzione i messaggi dalla DLQ filtrando per tipo di errore.
* Utile dopo aver deployato un fix per un errore specifico.
*/
public void reprocessByErrorType(String targetErrorClass) {
Properties consumerProps = buildConsumerProps("dlq-reprocessor");
Properties producerProps = buildProducerProps();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
consumer.subscribe(List.of(DLQ_TOPIC));
int reprocessed = 0, skipped = 0;
ConsumerRecords<String, String> records;
do {
records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> dlqRecord : records) {
String errorClass = getHeader(dlqRecord, "dlq-error-class");
String originalTopic = getHeader(dlqRecord, "dlq-original-topic");
if (targetErrorClass.equals(errorClass)) {
// Rimanda al topic originale
ProducerRecord<String, String> reprocessRecord = new ProducerRecord<>(
originalTopic != null ? originalTopic : SOURCE_TOPIC,
dlqRecord.key(),
dlqRecord.value()
);
// Aggiungi header per tracciabilita del reprocessing
reprocessRecord.headers().add("reprocessed-from-dlq",
String.valueOf(System.currentTimeMillis()).getBytes());
producer.send(reprocessRecord);
reprocessed++;
} else {
skipped++;
}
}
consumer.commitSync();
} while (!records.isEmpty());
System.out.printf("Reprocessing completato: %d riprocessati, %d saltati%n",
reprocessed, skipped);
}
}
private String getHeader(ConsumerRecord<String, String> record, String key) {
var header = record.headers().lastHeader(key);
return header != null ? new String(header.value()) : null;
}
}
Attention: Order of Messages in the DLQ
When you send messages from the DLQ back to the original topic, the original relative order is lost to other messages already processed successfully. For use cases where order is critical (e.g. sequential state updates), reprocessing must take this into account: you may need to apply some idempotency logic in the consumer to handle "old" messages that arrive after newer ones.
Best Practices for Error Handling in Kafka
- Always distinguish transient errors from permanent ones: Use custom exceptions or an error type enum to avoid useless retries on errors that will never heal (e.g. schema violation).
- DLQ is mandatory in production: Any consumer who does not have a DLQ can lose messages silently or get stuck in loops. It's not optional.
-
Monitor the size of the DLQ: An accumulation of messages in DLQ is a sign of a problem.
Add alert on
kafka_consumer_group_partition_lag{topic="*.DLT"}. - Enrich DLQ messages with metadata: failure timestamp, stacktrace, number of attempts, original topic and offset. Without this data debugging is very difficult.
- Don't use sleep in the main consumer loop: locks the partition and causes lag. Use the Retry Topic pattern (non-blocking) or agree to commit and DLQ.
- Set a long retention on the DLQ: Messages in DLQ need to be inspected. Set retention.ms to at least 30 days (or even compaction if reprocessing is idempotent).
Next Steps in the Series
- Article 11 – Kafka in Production: closes the series with the operational guide complete for cluster sizing, optimal retention and replication factor configuration, and MirrorMaker 2 for geographic disaster recovery.
Link with Other Series
- Event-Driven Architecture – Dead Letter Queue in Asynchronous Systems: the same DLQ pattern applies to SQS, SNS and other messaging systems. Article 708 of the EDA series covers DLQ in the AWS context with timeout and maxReceiveCount visibility.
- Kafka Exactly-Once Semantics (Article 4): to eliminate the duplicates caused From retries, Kafka's transactional API allows for exactly-once end-to-end guarantees.







