Kafka でのデッドレター キューとエラー処理
分散システムでは、メッセージは失敗します。破損した逆シリアル化、ビジネス ロジックが例外をスローする、 ダウンストリーム サービスに到達できない - これらすべてのケースにおいて、消費者は何をすべきかを決定する必要があります。 このガイドでは次の内容について説明します。 Kafka でのエラー処理の基本パターン: デッド レター キュー、バックオフを伴う指数関数的な再試行、ポイズン ピルの検出および再処理戦略。
Kafka のエラー問題
エラーが発生した場合にメッセージが自動的に再エンコードされる RabbitMQ や SQS とは異なり、 Kafka には異なるセマンティクスがあります。コンシューマはオフセットを明示的にコミットします。消費者がコミットしない場合 再起動すると、同じメッセージが再度読み取られます。これにより、次のような実際のリスクが生じます。 不正な形式のメッセージ (毒薬) 無限ループでコンシューマ グループ全体をブロックする可能性があります。 同じパーティション内の後続のメッセージが処理されないようにします。
Kafka で処理する主なエラー シナリオは次の 3 つです。
- 一時的なエラー: ダウンストリーム サービスが一時的に利用不可、ネットワーク タイムアウト - 再試行は合理的です
- 永続的なエラー: 不正な形式のメッセージ、回復不可能なビジネス ルール違反 - 再試行は無駄です
- 逆シリアル化エラー: ペイロード スキームが互換性のない方法で変更されました — ポイズン ピル シナリオ
配信セマンティクスとエラー管理
- 最大 1 回: 処理前にコミットし、クラッシュ時にメッセージが失われます。運用環境では決して使用しないでください。
- 少なくとも 1 回: 処理が成功した後にコミット、再試行時に重複の可能性 - 標準
- 必ず 1 回: べき等コンシューマ + Kafka トランザクションが必要 — クリティカルなユースケースの場合
パターン 1: デッドレターキュー (DLQ)
La デッドレターキュー これはメッセージが送信される別の Kafka トピックです 最大回数試行しても処理に失敗します。 コンシューマをブロックしたりメッセージを失ったりする代わりに、メッセージを隔離トピックに移動します。 手動分析や将来の再処理に使用します。
標準の命名規則は次のとおりです。 {topic-originale}.DLT o {topic-originale}-dlq。
DLQ 内のメッセージには、元のメッセージとエラーに関するメタデータが含まれている必要があります。
(スタックトレース、試行回数、失敗タイムスタンプ) Kafka ヘッダー経由。
// KafkaDLQHandler.java - Pattern base per Dead Letter Queue
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Headers;
import java.time.Duration;
import java.util.*;
public class KafkaDLQHandler {
private static final String SOURCE_TOPIC = "ordini-effettuati";
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final int MAX_RETRY_ATTEMPTS = 3;
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final Map<String, Integer> retryCount = new HashMap<>();
public KafkaDLQHandler(String bootstrapServers) {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("group.id", "servizio-inventario-dlq");
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("enable.auto.commit", false);
consumerProps.put("auto.offset.reset", "earliest");
this.consumer = new KafkaConsumer<>(consumerProps);
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", bootstrapServers);
producerProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks", "all");
this.producer = new KafkaProducer<>(producerProps);
}
public void processWithDLQ() {
consumer.subscribe(List.of(SOURCE_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageKey = record.topic() + "-" + record.partition() + "-" + record.offset();
try {
// Tenta l'elaborazione del messaggio
elaboraOrdine(record.value());
// Successo: rimuovi dal contatore retry e fai commit
retryCount.remove(messageKey);
consumer.commitSync();
} catch (RecoverableException e) {
// Errore transitorio: incrementa contatore
int attempts = retryCount.getOrDefault(messageKey, 0) + 1;
retryCount.put(messageKey, attempts);
if (attempts >= MAX_RETRY_ATTEMPTS) {
// Troppi tentativi: manda in DLQ
sendToDLQ(record, e, attempts);
retryCount.remove(messageKey);
consumer.commitSync();
} else {
// Ritenta: non fare commit, il messaggio verra riletto
System.err.println("Tentativo " + attempts + "/" + MAX_RETRY_ATTEMPTS +
" fallito per offset " + record.offset() + ": " + e.getMessage());
sleep(calculateBackoff(attempts));
}
} catch (PermanentException e) {
// Errore permanente: va direttamente in DLQ senza retry
sendToDLQ(record, e, 1);
consumer.commitSync();
}
}
}
}
private void sendToDLQ(ConsumerRecord<String, String> originalRecord,
Exception error, int attempts) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
originalRecord.key(),
originalRecord.value()
);
// Arricchisci con headers per il debugging
Headers headers = dlqRecord.headers();
headers.add("dlq-original-topic", originalRecord.topic().getBytes());
headers.add("dlq-original-partition",
String.valueOf(originalRecord.partition()).getBytes());
headers.add("dlq-original-offset",
String.valueOf(originalRecord.offset()).getBytes());
headers.add("dlq-error-message", error.getMessage().getBytes());
headers.add("dlq-error-class", error.getClass().getName().getBytes());
headers.add("dlq-retry-count", String.valueOf(attempts).getBytes());
headers.add("dlq-failed-at",
String.valueOf(System.currentTimeMillis()).getBytes());
// Copia anche gli header originali
originalRecord.headers().forEach(h ->
headers.add("original-" + h.key(), h.value()));
producer.send(dlqRecord, (metadata, ex) -> {
if (ex != null) {
System.err.println("CRITICO: impossibile scrivere in DLQ: " + ex.getMessage());
} else {
System.out.printf("Messaggio inviato a DLQ: topic=%s, offset=%d, errore=%s%n",
DLQ_TOPIC, metadata.offset(), error.getMessage());
}
});
}
private long calculateBackoff(int attempt) {
// Exponential backoff: 1s, 2s, 4s, 8s, ...
return (long) Math.min(Math.pow(2, attempt - 1) * 1000, 30000);
}
private void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
パターン 2: 指数バックオフを使用した再試行
Il 指数バックオフを使用して再試行します これは、一時的なエラーを処理するための標準パターンです。 試行が失敗するたびに、次の試行までの待機時間が増加し、過負荷が回避されます。 下流のサービスはすでに困難に陥っています。追加 ジッター (ランダムノイズ) バックオフ時に次の問題を回避します 雷鳴の群れ: すべてのコンシューマーが同時に再起動します 正確な秒まで。
// RetryWithBackoff.java - Strategia di retry con exponential backoff + jitter
import java.util.Random;
import java.util.function.Supplier;
public class RetryWithBackoff {
private static final Random random = new Random();
/**
* Esegue l'operazione con retry esponenziale + jitter.
*
* @param operation La lambda da eseguire
* @param maxRetries Numero massimo di tentativi
* @param baseDelayMs Delay base in millisecondi (default: 1000ms)
* @param maxDelayMs Delay massimo in millisecondi (default: 30000ms)
*/
public static <T> T execute(Supplier<T> operation,
int maxRetries,
long baseDelayMs,
long maxDelayMs) throws Exception {
int attempt = 0;
Exception lastException = null;
while (attempt < maxRetries) {
try {
return operation.get();
} catch (RetryableException e) {
lastException = e;
attempt++;
if (attempt >= maxRetries) {
throw new MaxRetriesExceededException(
"Superato il numero massimo di tentativi: " + maxRetries, e);
}
// Calcola delay con full jitter
long delay = calculateJitteredDelay(attempt, baseDelayMs, maxDelayMs);
System.err.printf("Tentativo %d/%d fallito. Prossimo retry tra %dms%n",
attempt, maxRetries, delay);
Thread.sleep(delay);
}
}
throw new MaxRetriesExceededException("Nessun tentativo riuscito", lastException);
}
/**
* Full jitter: delay casuale tra 0 e il backoff esponenziale.
* Evita il thundering herd distribuendo i retry nel tempo.
*/
private static long calculateJitteredDelay(int attempt, long baseDelay, long maxDelay) {
long exponentialDelay = (long) Math.min(Math.pow(2, attempt) * baseDelay, maxDelay);
// Full jitter: random tra 0 e exponentialDelay
return (long) (random.nextDouble() * exponentialDelay);
}
// Eccezioni custom per distinguere errori recuperabili da permanenti
public static class RetryableException extends RuntimeException {
public RetryableException(String message, Throwable cause) { super(message, cause); }
}
public static class MaxRetriesExceededException extends Exception {
public MaxRetriesExceededException(String message, Throwable cause) { super(message, cause); }
}
}
// Utilizzo nel consumer Kafka
RetryWithBackoff.execute(
() -> {
chiamataServizioEsterno(record.value());
return null;
},
maxRetries: 3,
baseDelayMs: 1000,
maxDelayMs: 30000
);
パターン 3: リトライトピック (ノンブロッキングリトライ)
消費者における睡眠アプローチの問題点は、 パーティション全体をロックする: 再試行を待っている間、同じパーティションからの他のメッセージは処理されません。 消費者の遅れが増大する原因となります。
パターン トピックの再試行 (o ノンブロッキング再試行) この問題は次のように解決されます。 パーティションをロックする代わりに、失敗したメッセージは別の再試行トピックに移動されます。 遅延が設定されています。プライマリ コンシューマは引き続き新しいメッセージを処理します。 Spring Kafka 2.7 以降は、このパターンをネイティブに実装します。
// Struttura dei topic con retry non-bloccante
// Topic principale: ordini-effettuati
// Topic retry 1: ordini-effettuati-retry-1000 (delay 1s)
// Topic retry 2: ordini-effettuati-retry-2000 (delay 2s)
// Topic retry 3: ordini-effettuati-retry-5000 (delay 5s)
// Topic DLQ: ordini-effettuati.DLT
// Con Spring Kafka @RetryableTopic
import org.springframework.kafka.annotation.*;
import org.springframework.kafka.retrytopic.*;
@Component
public class OrdineConsumerNonBlocking {
@RetryableTopic(
attempts = "4", // 1 tentativo originale + 3 retry
backoff = @Backoff(
delay = 1000,
multiplier = 2.0,
maxDelay = 10000
),
dltTopicSuffix = ".DLT",
retryTopicSuffix = "-retry",
// Non riprova per errori non recuperabili
exclude = {
DeserializationException.class,
PermanentBusinessException.class
}
)
@KafkaListener(topics = "ordini-effettuati", groupId = "servizio-inventario")
public void consumeOrdine(ConsumerRecord<String, String> record) {
// Spring Kafka gestisce automaticamente i retry e la DLQ
elaboraOrdine(record.value());
}
// Listener per la DLQ: analisi e alerting
@DltHandler
public void handleDlt(ConsumerRecord<String, String> record,
@Header KafkaHeaders.DLT_EXCEPTION_MESSAGE String errorMessage) {
System.err.printf("Messaggio in DLQ: key=%s, errore=%s%n",
record.key(), errorMessage);
// Invia alert, log su monitoring, notifica operatori...
alertingService.sendDLQAlert(record.topic(), record.key(), errorMessage);
}
}
パターン 4: 毒薬の検出
Un 毒薬 それは常に消費者のクラッシュを引き起こすメッセージです。 再試行回数に関係なく。典型的なケースは、逆シリアル化エラーです。 メッセージ値が予期された形式ではありません (破損した JSON、互換性のない Avro スキーマ)。
ポイズン ピルのリスクは無限ループです。消費者は失敗し、コミットせず、再起動し、 同じメッセージを読み取ると、再び失敗します。これにより、パーティションが完全にロックされます。 主な防御策は、 エラー処理デシリアライザー これは例外をスローしません ただし、破損したペイロードを管理可能なエラー オブジェクトにラップします。
// Configurazione con ErrorHandlingDeserializer (Spring Kafka)
// Questo deserializer cattura l'eccezione invece di propagarla,
// permettendo al consumer di mandare il messaggio in DLQ
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Ordine> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "servizio-inventario");
// ErrorHandlingDeserializer wrappa il deserializer originale
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
// Il deserializer "reale" (delegato)
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
JsonDeserializer.class);
// Target type per la deserializzazione JSON
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.Ordine");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Ordine> kafkaListenerContainerFactory(
ConsumerFactory<String, Ordine> consumerFactory,
KafkaTemplate<String, Ordine> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Ordine> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// DeadLetterPublishingRecoverer: invia automaticamente in DLQ su DeserializationException
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", -1)),
new FixedBackOff(1000L, 3L) // 3 tentativi, 1s di attesa
));
return factory;
}
}
DLQ からの再処理
DLQ は永続的なゴミ箱ではありません。DLQ はメッセージが送信される隔離スペースです。 問題が解決された後に再処理する必要があります。次の 2 つのアプローチがあります。
- 手動再処理: オペレーターは DLQ 内のメッセージを検査し、原因を特定します。 問題を修正し (修正プログラムを展開し、ダウンストリーム サービスを復元します)、メッセージを再送信します。 元のトピックでは。
- 自動再処理: 別のコンシューマが定期的に DLQ を読み取り、試行します。 スケジュール ポリシー (例: 1 時間ごと、特定のアラートの後) を使用してメッセージを再処理します。
// DLQReprocessor.java - Consumer della DLQ per reprocessing selettivo
public class DLQReprocessor {
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final String SOURCE_TOPIC = "ordini-effettuati";
/**
* Rimanda in produzione i messaggi dalla DLQ filtrando per tipo di errore.
* Utile dopo aver deployato un fix per un errore specifico.
*/
public void reprocessByErrorType(String targetErrorClass) {
Properties consumerProps = buildConsumerProps("dlq-reprocessor");
Properties producerProps = buildProducerProps();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
consumer.subscribe(List.of(DLQ_TOPIC));
int reprocessed = 0, skipped = 0;
ConsumerRecords<String, String> records;
do {
records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> dlqRecord : records) {
String errorClass = getHeader(dlqRecord, "dlq-error-class");
String originalTopic = getHeader(dlqRecord, "dlq-original-topic");
if (targetErrorClass.equals(errorClass)) {
// Rimanda al topic originale
ProducerRecord<String, String> reprocessRecord = new ProducerRecord<>(
originalTopic != null ? originalTopic : SOURCE_TOPIC,
dlqRecord.key(),
dlqRecord.value()
);
// Aggiungi header per tracciabilita del reprocessing
reprocessRecord.headers().add("reprocessed-from-dlq",
String.valueOf(System.currentTimeMillis()).getBytes());
producer.send(reprocessRecord);
reprocessed++;
} else {
skipped++;
}
}
consumer.commitSync();
} while (!records.isEmpty());
System.out.printf("Reprocessing completato: %d riprocessati, %d saltati%n",
reprocessed, skipped);
}
}
private String getHeader(ConsumerRecord<String, String> record, String key) {
var header = record.headers().lastHeader(key);
return header != null ? new String(header.value()) : null;
}
}
注意: DLQ 内のメッセージの順序
DLQ から元のトピックにメッセージを送り返すと、元の相対的な順序が失われます。 すでに正常に処理された他のメッセージへ。順序が重要なユースケースの場合 (例: 順次状態更新)、再処理ではこれを考慮する必要があります。 処理するには、コンシューマに冪等性ロジックを適用する必要がある場合があります。 新しいメッセージの後に到着する「古い」メッセージ。
Kafka でのエラー処理のベスト プラクティス
- 一時的なエラーと永続的なエラーを常に区別する: カスタム例外またはエラー タイプ列挙型を使用します。 決して修復されないエラー (スキーマ違反など) に対する無駄な再試行を避けるため。
- DLQ は運用環境では必須です: DLQ を持たない消費者は、次のことを行うことができます。 メッセージが表示されずに失われるか、ループに陥る可能性があります。オプションではありません。
-
DLQのサイズを監視する: DLQ 内にメッセージが蓄積されている場合は、問題の兆候です。
アラートを追加
kafka_consumer_group_partition_lag{topic="*.DLT"}. - DLQ メッセージをメタデータで強化する: 失敗のタイムスタンプ、スタックトレース、試行回数、 元のトピックとオフセット。このデータがないとデバッグは非常に困難です。
- メインコンシューマループではスリープを使用しないでください: パーティションをロックし、ラグを引き起こします。 トピックの再試行パターン (非ブロッキング) を使用するか、コミットと DLQ に同意します。
- DLQ で長期保存を設定する: DLQ 内のメッセージを検査する必要があります。 retention.ms を少なくとも 30 日に設定します (再処理が冪等である場合は圧縮も可能)。
シリーズの次のステップ
- 第 11 条 – 本番環境における Kafka:操作ガイドのシリーズを終了します クラスターのサイジング、最適な保持およびレプリケーション係数の構成を完了します。 地理的な災害復旧には MirrorMaker 2 が使用されます。
他シリーズとの連携
- イベント駆動型アーキテクチャ – 非同期システムのデッドレターキュー: 同じ DLQ パターンが SQS、SNS、およびその他のメッセージング システムに適用されます。第 708 条 EDA シリーズでは、タイムアウトと maxReceiveCount の可視性を備えた AWS コンテキストでの DLQ について説明します。
- Kafka の 1 回限りのセマンティクス (第 4 条): 発生した重複を排除します。 再試行から、Kafka のトランザクション API により、エンドツーエンドで 1 回だけの保証が可能になります。







