Kafka의 오류 문제

오류 발생 시 메시지가 자동으로 다시 인코딩되는 RabbitMQ 또는 SQS와 달리, Kafka에는 다른 의미 체계가 있습니다. 즉, 소비자가 오프셋을 명시적으로 커밋합니다. 소비자가 약속하지 않는 경우 다시 시작하면 동일한 메시지를 다시 읽습니다. 이로 인해 실제 위험이 발생합니다. 잘못된 메시지(독약) 전체 소비자 그룹을 무한 루프로 차단할 수 있으며, 동일한 파티션의 후속 메시지가 처리되지 않도록 방지합니다.

Kafka에서 처리해야 하는 세 가지 주요 오류 시나리오는 다음과 같습니다.

  • 일시적인 오류: 다운스트림 서비스를 일시적으로 사용할 수 없음, 네트워크 시간 초과 - 다시 시도하는 것이 좋습니다.
  • 영구 오류: 잘못된 메시지, 복구 불가능한 비즈니스 규칙 위반 - 재시도는 쓸모가 없습니다.
  • 역직렬화 오류: 페이로드 체계가 호환되지 않는 방식으로 변경되었습니다. — 독극물 시나리오

배달 의미론 및 오류 관리

  • 최대 1회: 처리 전 커밋, 충돌 시 메시지 손실 — 프로덕션에서는 사용하지 마세요.
  • 적어도 한 번: 성공적인 처리 후 커밋, 재시도 시 중복 가능성 — 표준
  • 정확히 한 번: 멱등성 소비자 + Kafka 트랜잭션 필요 — 중요한 사용 사례의 경우

패턴 1: DLQ(배달 못한 편지 대기열)

La 배달 못한 편지 대기열 메시지가 전송되는 별도의 Kafka 주제입니다. 최대 시도 횟수 이후에 처리가 실패합니다. 소비자를 차단하거나 메시지를 잃는 대신 격리 주제로 이동합니다. 수동 분석 또는 향후 재처리를 위해.

표준 명명 규칙은 다음과 같습니다. {topic-originale}.DLT o {topic-originale}-dlq. DLQ의 메시지에는 원본 메시지와 오류에 대한 메타데이터가 포함되어야 합니다. (스택 추적, 시도 횟수, 실패 타임스탬프) Kafka 헤더를 통해.

// KafkaDLQHandler.java - Pattern base per Dead Letter Queue
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Headers;

import java.time.Duration;
import java.util.*;

public class KafkaDLQHandler {

    private static final String SOURCE_TOPIC = "ordini-effettuati";
    private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
    private static final int MAX_RETRY_ATTEMPTS = 3;

    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> producer;
    private final Map<String, Integer> retryCount = new HashMap<>();

    public KafkaDLQHandler(String bootstrapServers) {
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", bootstrapServers);
        consumerProps.put("group.id", "servizio-inventario-dlq");
        consumerProps.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("enable.auto.commit", false);
        consumerProps.put("auto.offset.reset", "earliest");
        this.consumer = new KafkaConsumer<>(consumerProps);

        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", bootstrapServers);
        producerProps.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("acks", "all");
        this.producer = new KafkaProducer<>(producerProps);
    }

    public void processWithDLQ() {
        consumer.subscribe(List.of(SOURCE_TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                String messageKey = record.topic() + "-" + record.partition() + "-" + record.offset();

                try {
                    // Tenta l'elaborazione del messaggio
                    elaboraOrdine(record.value());

                    // Successo: rimuovi dal contatore retry e fai commit
                    retryCount.remove(messageKey);
                    consumer.commitSync();

                } catch (RecoverableException e) {
                    // Errore transitorio: incrementa contatore
                    int attempts = retryCount.getOrDefault(messageKey, 0) + 1;
                    retryCount.put(messageKey, attempts);

                    if (attempts >= MAX_RETRY_ATTEMPTS) {
                        // Troppi tentativi: manda in DLQ
                        sendToDLQ(record, e, attempts);
                        retryCount.remove(messageKey);
                        consumer.commitSync();
                    } else {
                        // Ritenta: non fare commit, il messaggio verra riletto
                        System.err.println("Tentativo " + attempts + "/" + MAX_RETRY_ATTEMPTS +
                            " fallito per offset " + record.offset() + ": " + e.getMessage());
                        sleep(calculateBackoff(attempts));
                    }

                } catch (PermanentException e) {
                    // Errore permanente: va direttamente in DLQ senza retry
                    sendToDLQ(record, e, 1);
                    consumer.commitSync();
                }
            }
        }
    }

    private void sendToDLQ(ConsumerRecord<String, String> originalRecord,
                           Exception error, int attempts) {
        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
            DLQ_TOPIC,
            originalRecord.key(),
            originalRecord.value()
        );

        // Arricchisci con headers per il debugging
        Headers headers = dlqRecord.headers();
        headers.add("dlq-original-topic", originalRecord.topic().getBytes());
        headers.add("dlq-original-partition",
            String.valueOf(originalRecord.partition()).getBytes());
        headers.add("dlq-original-offset",
            String.valueOf(originalRecord.offset()).getBytes());
        headers.add("dlq-error-message", error.getMessage().getBytes());
        headers.add("dlq-error-class", error.getClass().getName().getBytes());
        headers.add("dlq-retry-count", String.valueOf(attempts).getBytes());
        headers.add("dlq-failed-at",
            String.valueOf(System.currentTimeMillis()).getBytes());

        // Copia anche gli header originali
        originalRecord.headers().forEach(h ->
            headers.add("original-" + h.key(), h.value()));

        producer.send(dlqRecord, (metadata, ex) -> {
            if (ex != null) {
                System.err.println("CRITICO: impossibile scrivere in DLQ: " + ex.getMessage());
            } else {
                System.out.printf("Messaggio inviato a DLQ: topic=%s, offset=%d, errore=%s%n",
                    DLQ_TOPIC, metadata.offset(), error.getMessage());
            }
        });
    }

    private long calculateBackoff(int attempt) {
        // Exponential backoff: 1s, 2s, 4s, 8s, ...
        return (long) Math.min(Math.pow(2, attempt - 1) * 1000, 30000);
    }

    private void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    }
}

패턴 2: 지수 백오프로 재시도

Il 지수 백오프로 재시도 일시적인 오류를 처리하기 위한 표준 패턴은 다음과 같습니다. 시도가 실패할 때마다 다음 시도까지의 대기 시간이 늘어나 과부하가 방지됩니다. 다운스트림 서비스는 이미 어려움을 겪고 있습니다. 추가하다 지터 (무작위 소음) 백오프 시 다음 문제를 방지합니다. 우레 같은 무리: 모든 소비자가 동시에 다시 시작 정확한 초까지.

// RetryWithBackoff.java - Strategia di retry con exponential backoff + jitter
import java.util.Random;
import java.util.function.Supplier;

public class RetryWithBackoff {

    private static final Random random = new Random();

    /**
     * Esegue l'operazione con retry esponenziale + jitter.
     *
     * @param operation  La lambda da eseguire
     * @param maxRetries Numero massimo di tentativi
     * @param baseDelayMs Delay base in millisecondi (default: 1000ms)
     * @param maxDelayMs Delay massimo in millisecondi (default: 30000ms)
     */
    public static <T> T execute(Supplier<T> operation,
                                 int maxRetries,
                                 long baseDelayMs,
                                 long maxDelayMs) throws Exception {
        int attempt = 0;
        Exception lastException = null;

        while (attempt < maxRetries) {
            try {
                return operation.get();
            } catch (RetryableException e) {
                lastException = e;
                attempt++;

                if (attempt >= maxRetries) {
                    throw new MaxRetriesExceededException(
                        "Superato il numero massimo di tentativi: " + maxRetries, e);
                }

                // Calcola delay con full jitter
                long delay = calculateJitteredDelay(attempt, baseDelayMs, maxDelayMs);
                System.err.printf("Tentativo %d/%d fallito. Prossimo retry tra %dms%n",
                    attempt, maxRetries, delay);

                Thread.sleep(delay);
            }
        }

        throw new MaxRetriesExceededException("Nessun tentativo riuscito", lastException);
    }

    /**
     * Full jitter: delay casuale tra 0 e il backoff esponenziale.
     * Evita il thundering herd distribuendo i retry nel tempo.
     */
    private static long calculateJitteredDelay(int attempt, long baseDelay, long maxDelay) {
        long exponentialDelay = (long) Math.min(Math.pow(2, attempt) * baseDelay, maxDelay);
        // Full jitter: random tra 0 e exponentialDelay
        return (long) (random.nextDouble() * exponentialDelay);
    }

    // Eccezioni custom per distinguere errori recuperabili da permanenti
    public static class RetryableException extends RuntimeException {
        public RetryableException(String message, Throwable cause) { super(message, cause); }
    }

    public static class MaxRetriesExceededException extends Exception {
        public MaxRetriesExceededException(String message, Throwable cause) { super(message, cause); }
    }
}

// Utilizzo nel consumer Kafka
RetryWithBackoff.execute(
    () -> {
        chiamataServizioEsterno(record.value());
        return null;
    },
    maxRetries: 3,
    baseDelayMs: 1000,
    maxDelayMs: 30000
);

패턴 3: 재시도 주제(비차단 재시도)

소비자의 수면 접근 방식의 문제점은 다음과 같습니다. 전체 파티션을 잠그세요: 재시도를 기다리는 동안 동일한 파티션의 다른 메시지는 처리되지 않습니다. 소비자 지연이 증가합니다.

패턴 주제 재시도 (o 비차단 재시도) 이 문제를 해결합니다. 파티션을 잠그는 대신 실패한 메시지가 별도의 재시도 항목으로 이동됩니다. 지연이 구성되어 있습니다. 기본 소비자는 계속해서 새 메시지를 처리합니다. Spring Kafka 2.7+는 이 패턴을 기본적으로 구현합니다.

// Struttura dei topic con retry non-bloccante
// Topic principale:   ordini-effettuati
// Topic retry 1:      ordini-effettuati-retry-1000    (delay 1s)
// Topic retry 2:      ordini-effettuati-retry-2000    (delay 2s)
// Topic retry 3:      ordini-effettuati-retry-5000    (delay 5s)
// Topic DLQ:          ordini-effettuati.DLT

// Con Spring Kafka @RetryableTopic
import org.springframework.kafka.annotation.*;
import org.springframework.kafka.retrytopic.*;

@Component
public class OrdineConsumerNonBlocking {

    @RetryableTopic(
        attempts = "4",              // 1 tentativo originale + 3 retry
        backoff = @Backoff(
            delay = 1000,
            multiplier = 2.0,
            maxDelay = 10000
        ),
        dltTopicSuffix = ".DLT",
        retryTopicSuffix = "-retry",
        // Non riprova per errori non recuperabili
        exclude = {
            DeserializationException.class,
            PermanentBusinessException.class
        }
    )
    @KafkaListener(topics = "ordini-effettuati", groupId = "servizio-inventario")
    public void consumeOrdine(ConsumerRecord<String, String> record) {
        // Spring Kafka gestisce automaticamente i retry e la DLQ
        elaboraOrdine(record.value());
    }

    // Listener per la DLQ: analisi e alerting
    @DltHandler
    public void handleDlt(ConsumerRecord<String, String> record,
                          @Header KafkaHeaders.DLT_EXCEPTION_MESSAGE String errorMessage) {
        System.err.printf("Messaggio in DLQ: key=%s, errore=%s%n",
            record.key(), errorMessage);
        // Invia alert, log su monitoring, notifica operatori...
        alertingService.sendDLQAlert(record.topic(), record.key(), errorMessage);
    }
}

패턴 4: 독약 감지

Un 독약 항상 소비자 충돌을 일으키는 메시지이고, 재시도 횟수에 관계없이. 전형적인 경우는 역직렬화 오류입니다. 메시지 값이 예상된 형식이 아닙니다(손상된 JSON, 호환되지 않는 Avro 스키마).

독약의 위험은 무한 루프입니다. 소비자가 실패하고, 커밋하지 않고, 다시 시작하고, 같은 메시지를 읽고 또 실패합니다. 이렇게 하면 파티션이 완전히 잠깁니다. 주요 방어 수단은 다음을 사용하는 것입니다. 오류 처리역직렬 변환기 예외가 발생하지 않습니다. 하지만 관리 가능한 오류 개체에 손상된 페이로드를 래핑합니다.

// Configurazione con ErrorHandlingDeserializer (Spring Kafka)
// Questo deserializer cattura l'eccezione invece di propagarla,
// permettendo al consumer di mandare il messaggio in DLQ

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Ordine> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "servizio-inventario");

        // ErrorHandlingDeserializer wrappa il deserializer originale
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            ErrorHandlingDeserializer.class);

        // Il deserializer "reale" (delegato)
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
            StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
            JsonDeserializer.class);

        // Target type per la deserializzazione JSON
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.Ordine");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example");

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Ordine> kafkaListenerContainerFactory(
        ConsumerFactory<String, Ordine> consumerFactory,
        KafkaTemplate<String, Ordine> kafkaTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, Ordine> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);

        // DeadLetterPublishingRecoverer: invia automaticamente in DLQ su DeserializationException
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate,
                (record, ex) -> new TopicPartition(record.topic() + ".DLT", -1)),
            new FixedBackOff(1000L, 3L)  // 3 tentativi, 1s di attesa
        ));

        return factory;
    }
}

DLQ에서 재처리

DLQ는 영구 휴지통이 아닙니다. 메시지가 보관되는 격리 공간입니다. 문제가 해결된 후 다시 처리해야 합니다. 두 가지 접근 방식이 있습니다.

  • 수동 재처리: 운영자는 DLQ의 메시지를 검사하고 원인을 파악하며, 문제를 해결하고(수정 사항 배포, 다운스트림 서비스 복원) 메시지를 다시 보냅니다. 원래 주제에서.
  • 자동 재처리: 별도의 소비자가 주기적으로 DLQ를 읽고 시도합니다. 일정 정책(예: 매시간, 특정 경고 후)을 사용하여 메시지를 재처리합니다.
// DLQReprocessor.java - Consumer della DLQ per reprocessing selettivo
public class DLQReprocessor {

    private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
    private static final String SOURCE_TOPIC = "ordini-effettuati";

    /**
     * Rimanda in produzione i messaggi dalla DLQ filtrando per tipo di errore.
     * Utile dopo aver deployato un fix per un errore specifico.
     */
    public void reprocessByErrorType(String targetErrorClass) {
        Properties consumerProps = buildConsumerProps("dlq-reprocessor");
        Properties producerProps = buildProducerProps();

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
             KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {

            consumer.subscribe(List.of(DLQ_TOPIC));

            int reprocessed = 0, skipped = 0;
            ConsumerRecords<String, String> records;

            do {
                records = consumer.poll(Duration.ofSeconds(5));

                for (ConsumerRecord<String, String> dlqRecord : records) {
                    String errorClass = getHeader(dlqRecord, "dlq-error-class");
                    String originalTopic = getHeader(dlqRecord, "dlq-original-topic");

                    if (targetErrorClass.equals(errorClass)) {
                        // Rimanda al topic originale
                        ProducerRecord<String, String> reprocessRecord = new ProducerRecord<>(
                            originalTopic != null ? originalTopic : SOURCE_TOPIC,
                            dlqRecord.key(),
                            dlqRecord.value()
                        );
                        // Aggiungi header per tracciabilita del reprocessing
                        reprocessRecord.headers().add("reprocessed-from-dlq",
                            String.valueOf(System.currentTimeMillis()).getBytes());

                        producer.send(reprocessRecord);
                        reprocessed++;
                    } else {
                        skipped++;
                    }
                }

                consumer.commitSync();

            } while (!records.isEmpty());

            System.out.printf("Reprocessing completato: %d riprocessati, %d saltati%n",
                reprocessed, skipped);
        }
    }

    private String getHeader(ConsumerRecord<String, String> record, String key) {
        var header = record.headers().lastHeader(key);
        return header != null ? new String(header.value()) : null;
    }
}

주의: DLQ의 메시지 순서

DLQ에서 원래 주제로 메시지를 다시 보내면 원래 상대 순서가 손실됩니다. 이미 성공적으로 처리된 다른 메시지에. 순서가 중요한 사용 사례의 경우 (예: 순차적 상태 업데이트) 재처리에서는 다음 사항을 고려해야 합니다. 처리하려면 소비자에 멱등성 논리를 적용해야 할 수도 있습니다. 새로운 메시지 다음에 도착하는 "오래된" 메시지.

Kafka의 오류 처리 모범 사례

  • 일시적인 오류와 영구적인 오류를 항상 구별하세요.: 사용자 정의 예외 또는 오류 유형 열거형을 사용하십시오. 결코 치유되지 않는 오류(예: 스키마 위반)에 대한 불필요한 재시도를 방지합니다.
  • DLQ는 프로덕션에서 필수입니다.: DLQ가 없는 소비자라면 누구나 가능 메시지가 자동으로 손실되거나 루프에 갇히게 됩니다. 선택사항이 아닙니다.
  • DLQ 크기 모니터링: DLQ에 메시지가 누적되면 문제가 있다는 신호입니다. 알림 추가 kafka_consumer_group_partition_lag{topic="*.DLT"}.
  • 메타데이터로 DLQ 메시지 강화: 실패 타임스탬프, 스택 추적, 시도 횟수, 원래 주제와 오프셋. 이 데이터가 없으면 디버깅이 매우 어렵습니다.
  • 주 소비자 루프에서 절전 모드를 사용하지 마세요.: 파티션을 잠그고 지연이 발생합니다. Retry Topic 패턴(비차단)을 사용하거나 커밋 및 DLQ에 동의하세요.
  • DLQ에 장기 보존 설정: DLQ의 메시지를 검사해야 합니다. Retention.ms를 최소 30일로 설정합니다(또는 재처리가 멱등적인 경우 압축도 가능).

시리즈의 다음 단계

  • 제 11조 - 생산 중인 카프카: 운영 가이드로 시리즈 마무리 클러스터 크기 조정, 최적의 보존 및 복제 요소 구성이 완료되었습니다. 지리적 재해 복구를 위한 MirrorMaker 2.

다른 시리즈와의 연계

  • 이벤트 기반 아키텍처 - 비동기식 시스템의 배달 못한 편지 대기열: 동일한 DLQ 패턴이 SQS, SNS 및 기타 메시징 시스템에 적용됩니다. 제708조 EDA 시리즈 중 시간 초과 및 maxReceiveCount 가시성과 함께 AWS 컨텍스트의 DLQ를 다룹니다.
  • Kafka 정확히 한 번 의미론(4조): 발생한 중복을 제거합니다. 재시도에서 Kafka의 트랜잭션 API는 정확히 한 번의 엔드투엔드 보장을 허용합니다.