Kafka のエラー問題

エラーが発生した場合にメッセージが自動的に再エンコードされる RabbitMQ や SQS とは異なり、 Kafka には異なるセマンティクスがあります。コンシューマはオフセットを明示的にコミットします。消費者がコミットしない場合 再起動すると、同じメッセージが再度読み取られます。これにより、次のような実際のリスクが生じます。 不正な形式のメッセージ (毒薬) 無限ループでコンシューマ グループ全体をブロックする可能性があります。 同じパーティション内の後続のメッセージが処理されないようにします。

Kafka で処理する主なエラー シナリオは次の 3 つです。

  • 一時的なエラー: ダウンストリーム サービスが一時的に利用不可、ネットワーク タイムアウト - 再試行は合理的です
  • 永続的なエラー: 不正な形式のメッセージ、回復不可能なビジネス ルール違反 - 再試行は無駄です
  • 逆シリアル化エラー: ペイロード スキームが互換性のない方法で変更されました — ポイズン ピル シナリオ

配信セマンティクスとエラー管理

  • 最大 1 回: 処理前にコミットし、クラッシュ時にメッセージが失われます。運用環境では決して使用しないでください。
  • 少なくとも 1 回: 処理が成功した後にコミット、再試行時に重複の可能性 - 標準
  • 必ず 1 回: べき等コンシューマ + Kafka トランザクションが必要 — クリティカルなユースケースの場合

パターン 1: デッドレターキュー (DLQ)

La デッドレターキュー これはメッセージが送信される別の Kafka トピックです 最大回数試行しても処理に失敗します。 コンシューマをブロックしたりメッセージを失ったりする代わりに、メッセージを隔離トピックに移動します。 手動分析や将来の再処理に使用します。

標準の命名規則は次のとおりです。 {topic-originale}.DLT o {topic-originale}-dlq。 DLQ 内のメッセージには、元のメッセージとエラーに関するメタデータが含まれている必要があります。 (スタックトレース、試行回数、失敗タイムスタンプ) Kafka ヘッダー経由。

// KafkaDLQHandler.java - Pattern base per Dead Letter Queue
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Headers;

import java.time.Duration;
import java.util.*;

public class KafkaDLQHandler {

    private static final String SOURCE_TOPIC = "ordini-effettuati";
    private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
    private static final int MAX_RETRY_ATTEMPTS = 3;

    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> producer;
    private final Map<String, Integer> retryCount = new HashMap<>();

    public KafkaDLQHandler(String bootstrapServers) {
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", bootstrapServers);
        consumerProps.put("group.id", "servizio-inventario-dlq");
        consumerProps.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("enable.auto.commit", false);
        consumerProps.put("auto.offset.reset", "earliest");
        this.consumer = new KafkaConsumer<>(consumerProps);

        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", bootstrapServers);
        producerProps.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("acks", "all");
        this.producer = new KafkaProducer<>(producerProps);
    }

    public void processWithDLQ() {
        consumer.subscribe(List.of(SOURCE_TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                String messageKey = record.topic() + "-" + record.partition() + "-" + record.offset();

                try {
                    // Tenta l'elaborazione del messaggio
                    elaboraOrdine(record.value());

                    // Successo: rimuovi dal contatore retry e fai commit
                    retryCount.remove(messageKey);
                    consumer.commitSync();

                } catch (RecoverableException e) {
                    // Errore transitorio: incrementa contatore
                    int attempts = retryCount.getOrDefault(messageKey, 0) + 1;
                    retryCount.put(messageKey, attempts);

                    if (attempts >= MAX_RETRY_ATTEMPTS) {
                        // Troppi tentativi: manda in DLQ
                        sendToDLQ(record, e, attempts);
                        retryCount.remove(messageKey);
                        consumer.commitSync();
                    } else {
                        // Ritenta: non fare commit, il messaggio verra riletto
                        System.err.println("Tentativo " + attempts + "/" + MAX_RETRY_ATTEMPTS +
                            " fallito per offset " + record.offset() + ": " + e.getMessage());
                        sleep(calculateBackoff(attempts));
                    }

                } catch (PermanentException e) {
                    // Errore permanente: va direttamente in DLQ senza retry
                    sendToDLQ(record, e, 1);
                    consumer.commitSync();
                }
            }
        }
    }

    private void sendToDLQ(ConsumerRecord<String, String> originalRecord,
                           Exception error, int attempts) {
        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
            DLQ_TOPIC,
            originalRecord.key(),
            originalRecord.value()
        );

        // Arricchisci con headers per il debugging
        Headers headers = dlqRecord.headers();
        headers.add("dlq-original-topic", originalRecord.topic().getBytes());
        headers.add("dlq-original-partition",
            String.valueOf(originalRecord.partition()).getBytes());
        headers.add("dlq-original-offset",
            String.valueOf(originalRecord.offset()).getBytes());
        headers.add("dlq-error-message", error.getMessage().getBytes());
        headers.add("dlq-error-class", error.getClass().getName().getBytes());
        headers.add("dlq-retry-count", String.valueOf(attempts).getBytes());
        headers.add("dlq-failed-at",
            String.valueOf(System.currentTimeMillis()).getBytes());

        // Copia anche gli header originali
        originalRecord.headers().forEach(h ->
            headers.add("original-" + h.key(), h.value()));

        producer.send(dlqRecord, (metadata, ex) -> {
            if (ex != null) {
                System.err.println("CRITICO: impossibile scrivere in DLQ: " + ex.getMessage());
            } else {
                System.out.printf("Messaggio inviato a DLQ: topic=%s, offset=%d, errore=%s%n",
                    DLQ_TOPIC, metadata.offset(), error.getMessage());
            }
        });
    }

    private long calculateBackoff(int attempt) {
        // Exponential backoff: 1s, 2s, 4s, 8s, ...
        return (long) Math.min(Math.pow(2, attempt - 1) * 1000, 30000);
    }

    private void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    }
}

パターン 2: 指数バックオフを使用した再試行

Il 指数バックオフを使用して再試行します これは、一時的なエラーを処理するための標準パターンです。 試行が失敗するたびに、次の試行までの待機時間が増加し、過負荷が回避されます。 下流のサービスはすでに困難に陥っています。追加 ジッター (ランダムノイズ) バックオフ時に次の問題を回避します 雷鳴の群れ: すべてのコンシューマーが同時に再起動します 正確な秒まで。

// RetryWithBackoff.java - Strategia di retry con exponential backoff + jitter
import java.util.Random;
import java.util.function.Supplier;

public class RetryWithBackoff {

    private static final Random random = new Random();

    /**
     * Esegue l'operazione con retry esponenziale + jitter.
     *
     * @param operation  La lambda da eseguire
     * @param maxRetries Numero massimo di tentativi
     * @param baseDelayMs Delay base in millisecondi (default: 1000ms)
     * @param maxDelayMs Delay massimo in millisecondi (default: 30000ms)
     */
    public static <T> T execute(Supplier<T> operation,
                                 int maxRetries,
                                 long baseDelayMs,
                                 long maxDelayMs) throws Exception {
        int attempt = 0;
        Exception lastException = null;

        while (attempt < maxRetries) {
            try {
                return operation.get();
            } catch (RetryableException e) {
                lastException = e;
                attempt++;

                if (attempt >= maxRetries) {
                    throw new MaxRetriesExceededException(
                        "Superato il numero massimo di tentativi: " + maxRetries, e);
                }

                // Calcola delay con full jitter
                long delay = calculateJitteredDelay(attempt, baseDelayMs, maxDelayMs);
                System.err.printf("Tentativo %d/%d fallito. Prossimo retry tra %dms%n",
                    attempt, maxRetries, delay);

                Thread.sleep(delay);
            }
        }

        throw new MaxRetriesExceededException("Nessun tentativo riuscito", lastException);
    }

    /**
     * Full jitter: delay casuale tra 0 e il backoff esponenziale.
     * Evita il thundering herd distribuendo i retry nel tempo.
     */
    private static long calculateJitteredDelay(int attempt, long baseDelay, long maxDelay) {
        long exponentialDelay = (long) Math.min(Math.pow(2, attempt) * baseDelay, maxDelay);
        // Full jitter: random tra 0 e exponentialDelay
        return (long) (random.nextDouble() * exponentialDelay);
    }

    // Eccezioni custom per distinguere errori recuperabili da permanenti
    public static class RetryableException extends RuntimeException {
        public RetryableException(String message, Throwable cause) { super(message, cause); }
    }

    public static class MaxRetriesExceededException extends Exception {
        public MaxRetriesExceededException(String message, Throwable cause) { super(message, cause); }
    }
}

// Utilizzo nel consumer Kafka
RetryWithBackoff.execute(
    () -> {
        chiamataServizioEsterno(record.value());
        return null;
    },
    maxRetries: 3,
    baseDelayMs: 1000,
    maxDelayMs: 30000
);

パターン 3: リトライトピック (ノンブロッキングリトライ)

消費者における睡眠アプローチの問題点は、 パーティション全体をロックする: 再試行を待っている間、同じパーティションからの他のメッセージは処理されません。 消費者の遅れが増大する原因となります。

パターン トピックの再試行 (o ノンブロッキング再試行) この問題は次のように解決されます。 パーティションをロックする代わりに、失敗したメッセージは別の再試行トピックに移動されます。 遅延が設定されています。プライマリ コンシューマは引き続き新しいメッセージを処理します。 Spring Kafka 2.7 以降は、このパターンをネイティブに実装します。

// Struttura dei topic con retry non-bloccante
// Topic principale:   ordini-effettuati
// Topic retry 1:      ordini-effettuati-retry-1000    (delay 1s)
// Topic retry 2:      ordini-effettuati-retry-2000    (delay 2s)
// Topic retry 3:      ordini-effettuati-retry-5000    (delay 5s)
// Topic DLQ:          ordini-effettuati.DLT

// Con Spring Kafka @RetryableTopic
import org.springframework.kafka.annotation.*;
import org.springframework.kafka.retrytopic.*;

@Component
public class OrdineConsumerNonBlocking {

    @RetryableTopic(
        attempts = "4",              // 1 tentativo originale + 3 retry
        backoff = @Backoff(
            delay = 1000,
            multiplier = 2.0,
            maxDelay = 10000
        ),
        dltTopicSuffix = ".DLT",
        retryTopicSuffix = "-retry",
        // Non riprova per errori non recuperabili
        exclude = {
            DeserializationException.class,
            PermanentBusinessException.class
        }
    )
    @KafkaListener(topics = "ordini-effettuati", groupId = "servizio-inventario")
    public void consumeOrdine(ConsumerRecord<String, String> record) {
        // Spring Kafka gestisce automaticamente i retry e la DLQ
        elaboraOrdine(record.value());
    }

    // Listener per la DLQ: analisi e alerting
    @DltHandler
    public void handleDlt(ConsumerRecord<String, String> record,
                          @Header KafkaHeaders.DLT_EXCEPTION_MESSAGE String errorMessage) {
        System.err.printf("Messaggio in DLQ: key=%s, errore=%s%n",
            record.key(), errorMessage);
        // Invia alert, log su monitoring, notifica operatori...
        alertingService.sendDLQAlert(record.topic(), record.key(), errorMessage);
    }
}

パターン 4: 毒薬の検出

Un 毒薬 それは常に消費者のクラッシュを引き起こすメッセージです。 再試行回数に関係なく。典型的なケースは、逆シリアル化エラーです。 メッセージ値が予期された形式ではありません (破損した JSON、互換性のない Avro スキーマ)。

ポイズン ピルのリスクは無限ループです。消費者は失敗し、コミットせず、再起動し、 同じメッセージを読み取ると、再び失敗します。これにより、パーティションが完全にロックされます。 主な防御策は、 エラー処理デシリアライザー これは例外をスローしません ただし、破損したペイロードを管理可能なエラー オブジェクトにラップします。

// Configurazione con ErrorHandlingDeserializer (Spring Kafka)
// Questo deserializer cattura l'eccezione invece di propagarla,
// permettendo al consumer di mandare il messaggio in DLQ

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Ordine> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "servizio-inventario");

        // ErrorHandlingDeserializer wrappa il deserializer originale
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            ErrorHandlingDeserializer.class);

        // Il deserializer "reale" (delegato)
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
            StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
            JsonDeserializer.class);

        // Target type per la deserializzazione JSON
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.Ordine");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example");

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Ordine> kafkaListenerContainerFactory(
        ConsumerFactory<String, Ordine> consumerFactory,
        KafkaTemplate<String, Ordine> kafkaTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, Ordine> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);

        // DeadLetterPublishingRecoverer: invia automaticamente in DLQ su DeserializationException
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate,
                (record, ex) -> new TopicPartition(record.topic() + ".DLT", -1)),
            new FixedBackOff(1000L, 3L)  // 3 tentativi, 1s di attesa
        ));

        return factory;
    }
}

DLQ からの再処理

DLQ は永続的なゴミ箱ではありません。DLQ はメッセージが送信される隔離スペースです。 問題が解決された後に再処理する必要があります。次の 2 つのアプローチがあります。

  • 手動再処理: オペレーターは DLQ 内のメッセージを検査し、原因を特定します。 問題を修正し (修正プログラムを展開し、ダウンストリーム サービスを復元します)、メッセージを再送信します。 元のトピックでは。
  • 自動再処理: 別のコンシューマが定期的に DLQ を読み取り、試行します。 スケジュール ポリシー (例: 1 時間ごと、特定のアラートの後) を使用してメッセージを再処理します。
// DLQReprocessor.java - Consumer della DLQ per reprocessing selettivo
public class DLQReprocessor {

    private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
    private static final String SOURCE_TOPIC = "ordini-effettuati";

    /**
     * Rimanda in produzione i messaggi dalla DLQ filtrando per tipo di errore.
     * Utile dopo aver deployato un fix per un errore specifico.
     */
    public void reprocessByErrorType(String targetErrorClass) {
        Properties consumerProps = buildConsumerProps("dlq-reprocessor");
        Properties producerProps = buildProducerProps();

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
             KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {

            consumer.subscribe(List.of(DLQ_TOPIC));

            int reprocessed = 0, skipped = 0;
            ConsumerRecords<String, String> records;

            do {
                records = consumer.poll(Duration.ofSeconds(5));

                for (ConsumerRecord<String, String> dlqRecord : records) {
                    String errorClass = getHeader(dlqRecord, "dlq-error-class");
                    String originalTopic = getHeader(dlqRecord, "dlq-original-topic");

                    if (targetErrorClass.equals(errorClass)) {
                        // Rimanda al topic originale
                        ProducerRecord<String, String> reprocessRecord = new ProducerRecord<>(
                            originalTopic != null ? originalTopic : SOURCE_TOPIC,
                            dlqRecord.key(),
                            dlqRecord.value()
                        );
                        // Aggiungi header per tracciabilita del reprocessing
                        reprocessRecord.headers().add("reprocessed-from-dlq",
                            String.valueOf(System.currentTimeMillis()).getBytes());

                        producer.send(reprocessRecord);
                        reprocessed++;
                    } else {
                        skipped++;
                    }
                }

                consumer.commitSync();

            } while (!records.isEmpty());

            System.out.printf("Reprocessing completato: %d riprocessati, %d saltati%n",
                reprocessed, skipped);
        }
    }

    private String getHeader(ConsumerRecord<String, String> record, String key) {
        var header = record.headers().lastHeader(key);
        return header != null ? new String(header.value()) : null;
    }
}

注意: DLQ 内のメッセージの順序

DLQ から元のトピックにメッセージを送り返すと、元の相対的な順序が失われます。 すでに正常に処理された他のメッセージへ。順序が重要なユースケースの場合 (例: 順次状態更新)、再処理ではこれを考慮する必要があります。 処理するには、コンシューマに冪等性ロジックを適用する必要がある場合があります。 新しいメッセージの後に到着する「古い」メッセージ。

Kafka でのエラー処理のベスト プラクティス

  • 一時的なエラーと永続的なエラーを常に区別する: カスタム例外またはエラー タイプ列挙型を使用します。 決して修復されないエラー (スキーマ違反など) に対する無駄な再試行を避けるため。
  • DLQ は運用環境では必須です: DLQ を持たない消費者は、次のことを行うことができます。 メッセージが表示されずに失われるか、ループに陥る可能性があります。オプションではありません。
  • DLQのサイズを監視する: DLQ 内にメッセージが蓄積されている場合は、問題の兆候です。 アラートを追加 kafka_consumer_group_partition_lag{topic="*.DLT"}.
  • DLQ メッセージをメタデータで強化する: 失敗のタイムスタンプ、スタックトレース、試行回数、 元のトピックとオフセット。このデータがないとデバッグは非常に困難です。
  • メインコンシューマループではスリープを使用しないでください: パーティションをロックし、ラグを引き起こします。 トピックの再試行パターン (非ブロッキング) を使用するか、コミットと DLQ に同意します。
  • DLQ で長期保存を設定する: DLQ 内のメッセージを検査する必要があります。 retention.ms を少なくとも 30 日に設定します (再処理が冪等である場合は圧縮も可能)。

シリーズの次のステップ

  • 第 11 条 – 本番環境における Kafka:操作ガイドのシリーズを終了します クラスターのサイジング、最適な保持およびレプリケーション係数の構成を完了します。 地理的な災害復旧には MirrorMaker 2 が使用されます。

他シリーズとの連携

  • イベント駆動型アーキテクチャ – 非同期システムのデッドレターキュー: 同じ DLQ パターンが SQS、SNS、およびその他のメッセージング システムに適用されます。第 708 条 EDA シリーズでは、タイムアウトと maxReceiveCount の可視性を備えた AWS コンテキストでの DLQ について説明します。
  • Kafka の 1 回限りのセマンティクス (第 4 条): 発生した重複を排除します。 再試行から、Kafka のトランザクション API により、エンドツーエンドで 1 回だけの保証が可能になります。