Kafka の 3 つの配信保証

1 回限りの取り組みに取り組む前に、3 つの配信保証を理解することが重要です 分散メッセージング システムが提供できるものです。ネットワーク障害が発生した場合 またはブローカーがクラッシュした場合、動作は根本的に異なります。

最大 1 回: メッセージは最大 1 回送信されます。もし エラーが発生した場合、再試行しないとメッセージは失われます。メトリクスのみに適しています 多少の損失は許容できる批判や大量のログは禁止します。

少なくとも 1 回: メッセージは少なくとも 1 回は配信されますが、 再試行すると重複する可能性があります。そして、Kafka のデフォルト保証と e コンシューマがべき等であれば、ほとんどの場合これで十分です。

必ず 1 回: メッセージは 1 回だけ配信されます。 これは分散システムでの実装が最も困難な保証であり、コストがかかります。 レイテンシーとスループットの点で。

何を学ぶか

  • 冪等プロデューサがプロデューサ レベルで重複を排除する方法
  • トランザクション API: アトミック トランザクションで複数の書き込みをラップする方法
  • Kafka ブローカーにおけるトランザクション コーディネーターの役割
  • 1 回限りのエンドツーエンド: Kafka ストリームによる読み取り、プロセス、書き込み
  • スループットへの影響と、EOS が本当に必要な場合
  • EOS を有効にして実稼働用に Java セットアップを完了する

ステップ 1: べき等プロデューサー

正確にオンスの最初の層と べき等プロデューサー、 Kafka 0.11 で導入されました。それが解決する問題: プロデューサーが メッセージを送信しても、ブローカーから ACK を受信しない場合 (タイムアウト、ネットワーク障害)、ブローカーは、 メッセージが受信されたかどうか。したがって、再試行すると重複が作成される可能性があります。

べき等プロデューサーは、各プロデューサーに プロデューサー ID (PID) ユニークで一つの シーケンス番号 メッセージごとに。ブローカーは最後に受信したシーケンス番号を追跡します。 各 PID から再試行を自動的に重複排除します。

// Configurazione producer idempotente in Java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// ABILITA producer idempotente
// Questo richiede automaticamente: acks=all, retries=MAX_INT, max.in.flight.requests.per.connection <= 5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Queste impostazioni vengono forzate automaticamente con idempotenza:
// props.put(ProducerConfig.ACKS_CONFIG, "all");          // auto
// props.put(ProducerConfig.RETRIES_CONFIG, MAX_VALUE);   // auto
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // auto (max)

// Timeout per la risposta del broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// L'uso e identico a un producer non-idempotente
ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",
    "order-123",
    "{\"orderId\": \"123\", \"amount\": 99.99}"
);

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.printf("Inviato: partition=%d, offset=%d%n",
        metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
    // Con idempotenza, i retry sono sicuri: nessun duplicato
    System.err.println("Errore invio: " + e.getMessage());
} finally {
    producer.close();
}

ステップ 2: トランザクション API

べき等プロデューサーは、単一プロデューサーの書き込みが重複しないことを保証します。 しかし、もし私がそれについて書かなければならないとしたらどうしますか 同時に 2 つのトピック 原子的に?たとえば、読み取り、プロセス、書き込みでは次のことを行う必要があります。

  1. から読む topic-input
  2. に結果を書き込みます topic-output
  3. オフセットをコミットする __consumer_offsets

ポイント 2 とポイント 3 の間でクラッシュした場合、メッセージは出力 ma に書き込まれます。 オフセットはコミットされていません。再起動時に、同じメッセージを再処理して書き込みます。 重複です。の トランザクション API みんなを包み込むことで解決する アトミック トランザクションには 3 つのステップがあります。

// Producer transazionale completo
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// OBBLIGATORIO per transazioni: transactional.id univoco per ogni istanza producer
// Formato consigliato: {app-name}-{partition-number}
// IMPORTANTE: due producer con lo stesso transactional.id non possono essere attivi
// contemporaneamente (il secondo termina la transazione del primo - "fencing")
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-0");

// Il producer transazionale abilita automaticamente l'idempotenza
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

// OBBLIGATORIO: inizializza le transazioni PRIMA di qualsiasi operazione
// Questo registra il producer con il Transaction Coordinator
producer.initTransactions();

// Consumer per leggere l'input
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group");
// CRITICO per EOS: disabilita il commit automatico degli offset
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Leggi solo messaggi "committed" (non messaggi in transazione in corso)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("orders-input"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (records.isEmpty()) continue;

    try {
        // INIZIA la transazione
        producer.beginTransaction();

        for (ConsumerRecord<String, String> record : records) {
            // Processa il messaggio
            String processed = processOrder(record.value());

            // SCRIVI sull'output topic (dentro la transazione)
            producer.send(new ProducerRecord<>("orders-processed", record.key(), processed));
        }

        // COMMITTA gli offset del consumer DENTRO la transazione
        // Questo garantisce che il commit dell'offset e la scrittura sull'output
        // siano atomici: o entrambi succedono, o nessuno
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            offsetsToCommit.put(partition, new OffsetAndMetadata(lastOffset + 1));
        }
        producer.sendOffsetsToTransaction(offsetsToCommit, consumer.groupMetadata());

        // COMMIT della transazione: ora sia gli offset che i messaggi output
        // sono visibili ai consumer con isolation.level=read_committed
        producer.commitTransaction();

    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
        // Errori non recuperabili: termina il producer
        producer.close();
        throw e;
    } catch (KafkaException e) {
        // Errore recuperabile: abortisci la transazione e riprova
        producer.abortTransaction();
        // Il consumer rilegge i messaggi non committati alla prossima iterazione
    }
}

トランザクション コーディネーター: 仕組み

Il トランザクションコーディネーター Kafka ブローカーのコンポーネント トランザクションのライフサイクルを管理します。すべての取引が記録される 特別な内部トピックについて: __transaction_state (分割された スケーラビリティ、デフォルトで 50 パーティション)。

# Fasi di una transazione Kafka:

# 1. initTransactions() chiamato dal producer
#    -> Transaction Coordinator assegna epoch al transactional.id
#    -> Se c'era una transazione in corso con lo stesso ID (producer precedente):
#       - Termina la vecchia transazione (producer fencing)
#       - Garantisce che una sola istanza sia attiva

# 2. beginTransaction() - solo lato producer (no network call)

# 3. send() per ogni messaggio
#    -> Producer invia al partition leader con il PID e epoch
#    -> Partition leader scrive in un "transaction buffer" (non ancora visibile)

# 4. sendOffsetsToTransaction()
#    -> Transaction Coordinator registra quali consumer group offset
#       fanno parte di questa transazione

# 5. commitTransaction()
#    -> Producer invia PREPARE_COMMIT al Transaction Coordinator
#    -> TC scrive PREPARE_COMMIT su __transaction_state (durable)
#    -> TC notifica tutti i partition leader coinvolti
#    -> Partition leader scrivono COMMIT marker sul log
#    -> I messaggi diventano visibili ai consumer read_committed
#    -> TC scrive COMPLETE_COMMIT su __transaction_state

# 6. abortTransaction()
#    -> TC scrive ABORT marker: i messaggi in buffer sono ignorati
#    -> Consumer read_committed non vedono mai i messaggi abortiti

# Verificare lo stato delle transazioni (Kafka CLI):
kafka-transactions.sh --bootstrap-server kafka-1:9092 \
  --list
# TransactionalId     Producer Epoch  State        LastModifiedTime
# order-processor-0   5               Complete     2026-03-20 10:30:00

プロデューサーフェンス: ゾンビインスタンスからの保護

# Il fencing protegge da scenari di split-brain:
# - Producer A inizia una transazione (transactional.id="proc-0", epoch=5)
# - Producer A va in pausa (GC pause, network partition)
# - Il sistema avvia Producer B con lo stesso transactional.id
# - Producer B chiama initTransactions(): TC bumpa epoch a 6, invalida A
# - Producer A riprende e prova a fare commit: errore ProducerFencedException
#   (epoch 5 < epoch corrente 6)
# - Producer A non puo piu committare: safe!

# Implicazione pratica:
# Non usare lo stesso transactional.id per piu istanze simultanee
# In Kubernetes: usa il pod name come parte del transactional.id
# Ma: se un pod muore e un nuovo pod prende lo stesso name,
# il nuovo pod "recinta" correttamente il vecchio

# Configurazione consigliata per Kubernetes:
TRANSACTIONAL_ID="${APP_NAME}-${KAFKA_PARTITION}-${POD_NAME}"
# Oppure usa l'indice del pod nel StatefulSet:
TRANSACTIONAL_ID="${APP_NAME}-${POD_INDEX}"

Kafka ストリームを使用した EOS エンドツーエンド

カフカストリーム (連載記事06参照)EOS対応 単一の構成でネイティブに: processing.guarantee=exactly_once_v2。 ライブラリはトランザクションのプロデューサーとコンシューマーを自動的に管理します。 read_committed 同じトランザクションでオフセットをコミットします。

// Kafka Streams con EOS abilitato
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-enrichment-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");

// EOS v2: piu efficiente di v1 (introdotto in Kafka 2.5)
// v2 riduce il numero di transazioni necessarie rispetto a v1
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
    StreamsConfig.EXACTLY_ONCE_V2);

// Con EOS, il commit interval deve essere maggiore (overhead transazionale)
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 200);  // default 100ms

StreamsBuilder builder = new StreamsBuilder();

// Pipeline EOS: ogni record viene processato esattamente una volta
KStream<String, String> inputStream = builder.stream("orders-raw");

inputStream
    .filter((key, value) -> value != null && !value.isEmpty())
    .mapValues(value -> enrichOrder(value))
    .peek((key, value) -> log.info("Processed order: {}", key))
    .to("orders-enriched");

KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);

// Gestione graceful shutdown (importante per transazioni)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    // close() con timeout attende il completamento delle transazioni in corso
    streams.close(Duration.ofSeconds(10));
}));

streams.start();

スループットとレイテンシへの影響

1 回限りは無料ではありません。一般的なベンチマークで定量化されたコストは次のとおりです。

# Benchmark approssimativo (Kafka 3.x/4.x su hardware standard):
# Throughput (messaggi/sec, payload 1KB):

# At-least-once (acks=all, no transactions):
# Producer: ~200.000 msg/s
# Consumer: ~500.000 msg/s

# Exactly-once con Transactional API:
# Producer: ~100.000 msg/s (-50%)
# Consumer (read_committed): ~400.000 msg/s (-20%)

# Latenza P99 aggiuntiva:
# initTransactions(): una tantum al startup (~100ms)
# beginTransaction() -> commitTransaction(): +5-15ms per batch
# Overhead dipende da: numero partizioni coinvolte, dimensione batch

# Ottimizzazioni per ridurre l'overhead EOS:
# 1. Aumenta linger.ms e batch.size per ridurre il numero di transazioni
# 2. Aumenta commit.interval.ms in Kafka Streams
# 3. Usa EXACTLY_ONCE_V2 (non v1): riduce le transazioni del 50%
# 4. Minimizza il numero di partizioni per transazione

# Configurazioni consigliate per EOS in produzione:
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);          # accoda per 5ms
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     # batch 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); # 64MB buffer

EOS を使用する場合と使用しない場合

チェックリスト: EOS と必要?

  • EOSを使用する 用途: 支払い、金融送金、 在庫の更新、重要なステータス イベント (注文の作成/キャンセル) 重複が実際のビジネス上の問題を引き起こす場合。
  • EOSは必要ない 対象: 分析ログ、モニタリングメトリクス、 多少の重複や損失が許容される場合のクリック/ビュー イベント。使用する 冪等なコンシューマに対して少なくとも 1 回。
  • EOSの代替: コンシューマを冪等にします (次のことを確認します) データベース内の一意の ID を使用してメッセージをすでに処理しています) + 少なくとも 1 回。 多くの場合、Kafka レベルでは EOS よりもシンプルで効率的です。

注意: EOS は外部副作用をカバーしません。

Kafka 内で 1 回だけでは、Kafka クラスター内でのみセマンティクスが保証されます。 コンシューマが外部データベース (PostgreSQL、Redis など) に書き込む場合、 Kafka EOS は、これらの操作については何も保証しません。ハイブリッドシステム用 (Kafka + 外部データベース)、次のような追加のパターンが必要です トランザクション送信ボックスまたはサーガ。

結論

Kafka の 1 回限りのセマンティクスは、最も洗練された実装の 1 つです。 分散システムの歴史の中で。冪等プロデューサは重複を排除します ネットワーク レベルでは、トランザクション API は複数のトピック間のアトミック性を保証します。 Kafka Streams は、これらすべてを 1 つの構成にカプセル化します。費用 レイテンシーとスループットは現実のものですが、適切な最適化を行えば管理可能です。

完全なシリーズ: Apache Kafka

  • 第01条 — Apache Kafka の基礎: トピック、パーティション、コンシューマー グループ
  • 第02条 — Kafka 4.0 の KRaft: さようなら ZooKeeper
  • 第03条 — 高度な Kafka プロデューサーとコンシューマー: Ack、冪等性、および再試行
  • 第04条(本) — Kafka の 1 回限りのセマンティクス: トランザクションと調整
  • 第05条 — スキーマ レジストリ: Avro、Protobuf、および Schema Evolution
  • 第06条 — Kafka ストリーム: KTable とウィンドウ処理
  • 第07条 — Kafka Connect: Debezium CDC と DB の統合
  • 第08条 — Kafka + Apache Flink: リアルタイムのパイプライン分析