Kafka의 세 가지 전달 보장

정확히 한 번만 다루기 전에 세 가지 배송 보장을 이해하는 것이 중요합니다. 분산 메시징 시스템이 제공할 수 있는 것입니다. 네트워크 장애가 발생한 경우 또는 브로커 충돌이 발생하면 동작이 근본적으로 다릅니다.

최대 1회: 메시지는 최대 1회 전송됩니다. 만약에 오류가 있는 경우 재시도하지 않으면 메시지가 손실됩니다. 측정항목에만 적합 일부 손실이 허용되는 비판이나 대용량 로그는 없습니다.

적어도 한 번: 메시지는 적어도 한 번은 전달되지만 재시도 시 중복될 수 있습니다. 그리고 Kafka의 기본 보증과 e 소비자가 멱등성인 경우 대부분의 경우 충분합니다.

정확히 한 번: 메시지가 정확히 한 번만 전달됩니다. 분산 시스템에서 구현하기가 가장 어렵고 비용이 많이 듭니다. 대기 시간 및 처리량 측면에서.

무엇을 배울 것인가

  • 멱등성 생산자가 생산자 수준에서 중복을 제거하는 방법
  • Transactional API: 원자성 트랜잭션에서 여러 쓰기를 래핑하는 방법
  • Kafka 브로커에서 트랜잭션 코디네이터의 역할
  • 정확히 한 번 엔드 투 엔드: Kafka Streams를 사용한 읽기-프로세스-쓰기
  • 처리량에 미치는 영향과 EOS가 실제로 필요한 시기
  • EOS가 활성화된 프로덕션을 위한 Java 설정 완료

1단계: 멱등성 생산자

정확히 온스의 첫 번째 레이어와 멱등성 생산자, Kafka 0.11에서 도입되었습니다. 해결되는 문제: 생산자가 메시지를 받았지만 브로커로부터 ack를 받지 못했습니다(시간 초과, 네트워크 오류). 메시지가 수신되었는지 여부. 따라서 재시도는 중복을 생성할 수 있습니다.

멱등성 생산자는 각 생산자에게 생산자 ID(PID) 독특하고 하나 시퀀스 번호 각 메시지마다. 브로커는 수신된 마지막 시퀀스 번호를 추적합니다. 각 PID에서 자동으로 재시도 중복을 제거합니다.

// Configurazione producer idempotente in Java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// ABILITA producer idempotente
// Questo richiede automaticamente: acks=all, retries=MAX_INT, max.in.flight.requests.per.connection <= 5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Queste impostazioni vengono forzate automaticamente con idempotenza:
// props.put(ProducerConfig.ACKS_CONFIG, "all");          // auto
// props.put(ProducerConfig.RETRIES_CONFIG, MAX_VALUE);   // auto
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // auto (max)

// Timeout per la risposta del broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// L'uso e identico a un producer non-idempotente
ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",
    "order-123",
    "{\"orderId\": \"123\", \"amount\": 99.99}"
);

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.printf("Inviato: partition=%d, offset=%d%n",
        metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
    // Con idempotenza, i retry sono sicuri: nessun duplicato
    System.err.println("Errore invio: " + e.getMessage());
} finally {
    producer.close();
}

2단계: 트랜잭션 API

멱등성 생산자는 단일 생산자의 쓰기에 대해 중복이 없음을 보장합니다. But what if I have to write about 두 가지 주제를 동시에 원자적으로? 예를 들어, 다음을 수행해야 하는 읽기-프로세스-쓰기가 있습니다.

  1. 다음에서 읽기 topic-input
  2. 결과를 쓰세요 topic-output
  3. 오프셋을 커밋합니다. __consumer_offsets

포인트 2와 포인트 3 사이에 충돌이 발생하면 메시지가 출력 ma에 기록되었습니다. 오프셋은 커밋되지 않습니다. 다시 시작하면 동일한 메시지를 다시 처리하고 씁니다. 중복. 그만큼 트랜잭션 API 모든 사람을 감싸서 이 문제를 해결합니다. 원자 트랜잭션의 세 단계입니다.

// Producer transazionale completo
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// OBBLIGATORIO per transazioni: transactional.id univoco per ogni istanza producer
// Formato consigliato: {app-name}-{partition-number}
// IMPORTANTE: due producer con lo stesso transactional.id non possono essere attivi
// contemporaneamente (il secondo termina la transazione del primo - "fencing")
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-0");

// Il producer transazionale abilita automaticamente l'idempotenza
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

// OBBLIGATORIO: inizializza le transazioni PRIMA di qualsiasi operazione
// Questo registra il producer con il Transaction Coordinator
producer.initTransactions();

// Consumer per leggere l'input
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group");
// CRITICO per EOS: disabilita il commit automatico degli offset
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Leggi solo messaggi "committed" (non messaggi in transazione in corso)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("orders-input"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (records.isEmpty()) continue;

    try {
        // INIZIA la transazione
        producer.beginTransaction();

        for (ConsumerRecord<String, String> record : records) {
            // Processa il messaggio
            String processed = processOrder(record.value());

            // SCRIVI sull'output topic (dentro la transazione)
            producer.send(new ProducerRecord<>("orders-processed", record.key(), processed));
        }

        // COMMITTA gli offset del consumer DENTRO la transazione
        // Questo garantisce che il commit dell'offset e la scrittura sull'output
        // siano atomici: o entrambi succedono, o nessuno
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            offsetsToCommit.put(partition, new OffsetAndMetadata(lastOffset + 1));
        }
        producer.sendOffsetsToTransaction(offsetsToCommit, consumer.groupMetadata());

        // COMMIT della transazione: ora sia gli offset che i messaggi output
        // sono visibili ai consumer con isolation.level=read_committed
        producer.commitTransaction();

    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
        // Errori non recuperabili: termina il producer
        producer.close();
        throw e;
    } catch (KafkaException e) {
        // Errore recuperabile: abortisci la transazione e riprova
        producer.abortTransaction();
        // Il consumer rilegge i messaggi non committati alla prossima iterazione
    }
}

트랜잭션 코디네이터: 작동 방식

Il 거래 코디네이터 Kafka 브로커의 구성 요소 트랜잭션 수명주기를 관리합니다. 모든 거래가 기록됩니다 특별한 내부 주제에 대해: __transaction_state (파티션으로 확장성, 기본적으로 50개의 파티션).

# Fasi di una transazione Kafka:

# 1. initTransactions() chiamato dal producer
#    -> Transaction Coordinator assegna epoch al transactional.id
#    -> Se c'era una transazione in corso con lo stesso ID (producer precedente):
#       - Termina la vecchia transazione (producer fencing)
#       - Garantisce che una sola istanza sia attiva

# 2. beginTransaction() - solo lato producer (no network call)

# 3. send() per ogni messaggio
#    -> Producer invia al partition leader con il PID e epoch
#    -> Partition leader scrive in un "transaction buffer" (non ancora visibile)

# 4. sendOffsetsToTransaction()
#    -> Transaction Coordinator registra quali consumer group offset
#       fanno parte di questa transazione

# 5. commitTransaction()
#    -> Producer invia PREPARE_COMMIT al Transaction Coordinator
#    -> TC scrive PREPARE_COMMIT su __transaction_state (durable)
#    -> TC notifica tutti i partition leader coinvolti
#    -> Partition leader scrivono COMMIT marker sul log
#    -> I messaggi diventano visibili ai consumer read_committed
#    -> TC scrive COMPLETE_COMMIT su __transaction_state

# 6. abortTransaction()
#    -> TC scrive ABORT marker: i messaggi in buffer sono ignorati
#    -> Consumer read_committed non vedono mai i messaggi abortiti

# Verificare lo stato delle transazioni (Kafka CLI):
kafka-transactions.sh --bootstrap-server kafka-1:9092 \
  --list
# TransactionalId     Producer Epoch  State        LastModifiedTime
# order-processor-0   5               Complete     2026-03-20 10:30:00

생산자 펜싱: 좀비 인스턴스로부터 보호

# Il fencing protegge da scenari di split-brain:
# - Producer A inizia una transazione (transactional.id="proc-0", epoch=5)
# - Producer A va in pausa (GC pause, network partition)
# - Il sistema avvia Producer B con lo stesso transactional.id
# - Producer B chiama initTransactions(): TC bumpa epoch a 6, invalida A
# - Producer A riprende e prova a fare commit: errore ProducerFencedException
#   (epoch 5 < epoch corrente 6)
# - Producer A non puo piu committare: safe!

# Implicazione pratica:
# Non usare lo stesso transactional.id per piu istanze simultanee
# In Kubernetes: usa il pod name come parte del transactional.id
# Ma: se un pod muore e un nuovo pod prende lo stesso name,
# il nuovo pod "recinta" correttamente il vecchio

# Configurazione consigliata per Kubernetes:
TRANSACTIONAL_ID="${APP_NAME}-${KAFKA_PARTITION}-${POD_NAME}"
# Oppure usa l'indice del pod nel StatefulSet:
TRANSACTIONAL_ID="${APP_NAME}-${POD_INDEX}"

Kafka Streams를 사용한 EOS 엔드투엔드

카프카 스트림 (시리즈 06 참조) EOS 지원 기본적으로 단일 구성으로: processing.guarantee=exactly_once_v2. The library automatically manages the transactional producer, the consumer read_committed 동일한 트랜잭션에서 오프셋을 커밋합니다.

// Kafka Streams con EOS abilitato
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-enrichment-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");

// EOS v2: piu efficiente di v1 (introdotto in Kafka 2.5)
// v2 riduce il numero di transazioni necessarie rispetto a v1
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
    StreamsConfig.EXACTLY_ONCE_V2);

// Con EOS, il commit interval deve essere maggiore (overhead transazionale)
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 200);  // default 100ms

StreamsBuilder builder = new StreamsBuilder();

// Pipeline EOS: ogni record viene processato esattamente una volta
KStream<String, String> inputStream = builder.stream("orders-raw");

inputStream
    .filter((key, value) -> value != null && !value.isEmpty())
    .mapValues(value -> enrichOrder(value))
    .peek((key, value) -> log.info("Processed order: {}", key))
    .to("orders-enriched");

KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);

// Gestione graceful shutdown (importante per transazioni)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    // close() con timeout attende il completamento delle transazioni in corso
    streams.close(Duration.ofSeconds(10));
}));

streams.start();

처리량 및 지연 시간에 대한 영향

정확히 한 번은 무료가 아닙니다. 일반적인 벤치마크에서 정량화된 비용은 다음과 같습니다.

# Benchmark approssimativo (Kafka 3.x/4.x su hardware standard):
# Throughput (messaggi/sec, payload 1KB):

# At-least-once (acks=all, no transactions):
# Producer: ~200.000 msg/s
# Consumer: ~500.000 msg/s

# Exactly-once con Transactional API:
# Producer: ~100.000 msg/s (-50%)
# Consumer (read_committed): ~400.000 msg/s (-20%)

# Latenza P99 aggiuntiva:
# initTransactions(): una tantum al startup (~100ms)
# beginTransaction() -> commitTransaction(): +5-15ms per batch
# Overhead dipende da: numero partizioni coinvolte, dimensione batch

# Ottimizzazioni per ridurre l'overhead EOS:
# 1. Aumenta linger.ms e batch.size per ridurre il numero di transazioni
# 2. Aumenta commit.interval.ms in Kafka Streams
# 3. Usa EXACTLY_ONCE_V2 (non v1): riduce le transazioni del 50%
# 4. Minimizza il numero di partizioni per transazione

# Configurazioni consigliate per EOS in produzione:
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);          # accoda per 5ms
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     # batch 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); # 64MB buffer

EOS를 사용해야 할 때와 사용하지 않을 때

체크리스트: EOS와 필수?

  • EOS 사용 대상: 결제, 금융 송금, 재고 업데이트, 중요한 상태 이벤트(주문 생성/취소) 중복으로 인해 실제 비즈니스 문제가 발생하는 경우
  • EOS는 필요하지 않습니다. 대상: 분석 로그, 측정항목 모니터링, 일부 중복이나 손실이 허용되는 이벤트 클릭/보기. 사용 멱등성 소비자를 사용하여 한 번 이상.
  • EOS의 대안: 소비자를 멱등성으로 만듭니다(다음인지 확인하세요). 이미 데이터베이스의 고유 ID를 통해 메시지를 처리했습니다) + 적어도 한 번. Kafka 수준에서는 EOS보다 더 간단하고 효율적인 경우가 많습니다.

참고: EOS는 외부 부작용을 보장하지 않습니다.

Kafka에서 정확히 한 번은 Kafka 클러스터 내에서만 의미 체계를 보장합니다. 소비자가 외부 데이터베이스(PostgreSQL, Redis 등)에 쓰는 경우 Kafka EOS는 이러한 작업에 대해 어떠한 것도 보장하지 않습니다. 하이브리드 시스템용 (Kafka + 외부 데이터베이스) 다음과 같은 추가 패턴이 필요합니다. 거래 발신함 또는 Saga.

결론

Kafka의 정확히 한 번 의미론은 가장 정교한 구현 중 하나입니다. 분산 시스템의 역사에서 멱등성 생산자는 중복을 제거합니다. 네트워크 수준에서 Transactional API는 여러 주제 간의 원자성을 보장합니다. Kafka Streams는 이 모든 것을 단일 구성으로 캡슐화합니다. 비용 지연 시간과 처리량이 실제적이지만 올바른 최적화를 통해 관리 가능합니다.

전체 시리즈: Apache Kafka

  • 제01조 — Apache Kafka 기본 사항: 주제, 파티션 및 소비자 그룹
  • 제02조 — Kafka 4.0의 KRaft: 안녕 ZooKeeper
  • 제03조 — 고급 Kafka 생산자 및 소비자: Ack, 멱등성 및 재시도
  • 제04조(본) — Kafka의 정확히 한 번 의미론: 트랜잭션 및 조정
  • 제05조 — 스키마 레지스트리: Avro, Protobuf 및 Schema Evolution
  • 제06조 — Kafka Streams: KTable 및 Windowing
  • 제07조 — Kafka Connect: Debezium CDC 및 DB 통합
  • 제08조 — Kafka + Apache Flink: 실시간 파이프라인 분석