3つの配送保証

セットアップに入る前に、Kafka がサポートする 3 つの配信モードを理解することが重要です。 そしてその実際的な意味:

  • 最大 1 回: メッセージは 1 回だけ送信され、再試行は行われません。ブローカーがそれを受け取らない場合、 それは永久に失われます。重複はゼロ、データ損失の可能性あり。重要でないメトリクスには許容されますが、 デバッグ ログ、一部のイベントの損失が許容できるクリック ストリーム イベント。
  • 少なくとも 1 回: エラーがある場合、プロデューサーは再試行します。メッセージは少なくとも 1 回は届きますが、 ただし、ブローカーが受信してもタイムアウト前に確認を送信しなかった場合は、複数回(重複して)受信する可能性があります。 コンシューマは重複を処理できるように冪等である必要があります。運用環境で最も一般的なシナリオ。
  • 必ず 1 回: メッセージは、損失や重複なしに、正確に 1 回処理されます。 べき等プロデューサー (1 回限りのプロデューサー側の場合) または Kafka トランザクション (1 回限りの場合) が必要です プロデューサーとコンシューマー間のエンドツーエンド)。重大なパフォーマンスのオーバーヘッド。

黄金律

分散システムでは、オーバーヘッドなしで正確に 1 回を保証することは不可能です。システムの 90% 米国での制作におけるカフカ 少なくとも1回 冪等のコンシューマを使用した場合 (コンシューマ側の重複排除) データベースまたはキャッシュ経由)。 Kafka 経由の 1 回限りのトランザクションは、財務パイプライン、請求に使用されます。 システムやその他の場所でイベントの重複が発生すると、実害が発生します。

acks パラメータ: 待機する確認の数

パラメータ acks プロデューサーの数は、最初に受信を確認する必要があるレプリカの数を定義します プロデューサーがリクエストが完了したとみなしていること:

acks=0 (ファイアアンドフォーゲット)

プロデューサーはレコードを送信し、ブローカーからの応答を待ちません。最大のスループット、最小のレイテンシ、 ただし、保証はありません: ブローカーがダウンした場合、またはレコードがブローカーに書き込まれ、ブローカーが最初にダウンした場合 返信すると、記録は失われます。セマンティクスは次のとおりです せいぜい1回.

// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record

acks=1 (リーダーのみ)

プロデューサは、パーティションの主要なブローカーからの確認のみを待ちます。フォロワーはまだいないかもしれません 確認が到着したらレコードを複製しました。確認送信直後にリーダーが倒れた場合 しかし、フォロワーが返信する前に記録は失われます。セマンティクス 少なくとも1回 非常に短い障害ウィンドウでデータが失われるリスクがあります。

// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);

// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
//   -> possibile duplicato se broker ha ricevuto ma non risposto

acks=all (または acks=-1): すべての ISR

プロデューサーは、すべてのレコードにレコードが書き込まれるのを待ちます。 ISR (同期レプリカ) パーティションの。 と min.insync.replicas=2 e replication.factor=3、これは少なくとも 2 人の返信 (リーダー + 1 人のフォロワー) が確認する必要があります。そうして初めてプロデューサーは ack を受け取ります。 セマンティクス 少なくとも1回 少なくともデータが失われることはありません min.insync.replicas ブローカーがアクティブです。

// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all");  // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000);  // attende fino a 60s se il broker è congestionato

// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)

// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova

再試行による重複の問題

このシナリオを考えてみましょう acks=all e retries=3:

  1. プロデューサーは R1 レコードをリード ブローカーに送信します
  2. ブローカーは R1 をディスクに書き込み、プロデューサーに ack を送信します。
  3. ACK が失われる (プロデューサーに到達する前にネットワークがタイムアウトする)
  4. プロデューサーは ack を受信せずに入力します。 request.timeout.ms、書き込みが失敗したと思います
  5. プロデューサーは再試行して R1 を再度送信します
  6. ブローカーは R1 を 2 度目に受信し、それを別のレコードとして書き込みます
  7. トピックには重複した R1 が含まれています

これは at-least-once の正しい動作です。各データは少なくとも 1 回到着しますが、重複して到着する可能性があります。 プロデューサ レベルで重複を削除するには、べき等プロデューサー.

べき等プロデューサー

Kafka 0.11 (2017) で導入された冪等プロデューサーは、プロデューサーの再試行によって生じる重複を排除します。 このメカニズムは、次の 2 つの概念に基づいています。

  • プロデューサー ID (PID): べき等プロデューサーが接続すると、ブローカーは一意の PID を割り当てます。 PID はプロデューサーの存続期間中存続します。プロデューサーが再起動すると、新しい PID を取得します。
  • シーケンス番号: 送信される各レコードには、単調増加するシーケンス番号 (0、1、2、...) が含まれます。 ブローカーは、各 PID + パーティションについて受信した最後のシーケンス番号を追跡します。レコードが届いたら シーケンス番号がすでに認識されている場合は、警告なしに破棄されます (ブローカー側での重複排除)。
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Abilitare idempotenza
props.put("enable.idempotence", true);

// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException

// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)

冪等プロデューサーの限界

べき等プロデューサーは、すべてのレコードが確実に表示されるようにします。 ちょうど一度だけ Kafka ログ内 プロデューサーの現在のセッション用。プロデューサーが再起動すると (新しい PID)、実行中のレコードが 書き換えられる可能性があります。そして何よりも、消費者による処理については何も保証されません。 コンシューマーがレコードを処理し、オフセットをコミットする前にクラッシュした場合、次回の起動時に 同じレコードを再処理します。 1 回限りのエンドツーエンドの場合は、トランザクション API が必要です。

max.in.flight.requests.per.connection と並べ替え

パラメータ max.in.flight.requests.per.connection (MIFR) 実稼働リクエストの数を制御します 彼らは同時に単一のブローカーに飛ぶことができます。仕分けに重大な影響を与える 再試行の場合のメッセージの数:

  • MIFR=1: 各リクエストは、別のリクエストを送信する前に確認する必要があります。確実な仕分け、 ただし、スループットは低下します (パイプラインなし)。
  • MIFR > 1 (冪等性なし): パイプラインがアクティブで、スループットが高くなりますが、バッチ N が失敗した場合 バッチ N-1 はすでに実行中です。N の再試行後、レコードは N-1、N の順に表示されます。 N-1 に先行する必要がありました。並べ替えは保証されなくなりました。
  • MIFR ≤ 5 (冪等): と enable.idempotence=true、カフカは保証します シーケンス番号のおかげで、処理中のリクエストが最大 5 つであってもソートできます。それはデフォルト値です 冪等性が有効であり、保証を維持するために最大サポートされている場合。
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1);  // nessun pipelining
// Througput limitato ma ordine garantito

// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry

// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer

実稼働用の完全なプロデューサー構成

// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class ProducerFactory {

    /**
     * Crea un producer ottimizzato per throughput elevato
     * con garanzie at-least-once e idempotenza abilitata.
     */
    public static KafkaProducer<String, byte[]> createHighThroughputProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        // Connessione
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Durabilità e idempotenza
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // acks=all e retries=MAX impostati automaticamente

        // Batching: aspetta fino a 10ms per accumulare records
        // Riduce numero di richieste al broker, aumenta throughput
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     // 64KB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale

        // Compressione: snappy è bilanciata tra CPU e ratio
        // lz4 per massima velocità; zstd per massimo ratio
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // Timeout: se il buffer è pieno, blocca fino a max.block.ms
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);

        // Request timeout: quanto aspettare risposta dal broker
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

        // Delivery timeout: timeout totale inclusi tutti i retry
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

        return new KafkaProducer<>(props);
    }

    /**
     * Crea un producer per eventi critici (financial, billing)
     * con latenza minima e massime garanzie.
     */
    public static KafkaProducer<String, byte[]> createLowLatencyProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Massima durabilità
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Nessun batching: invia immediatamente ogni record
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);

        // Nessuna compressione: elimina latenza CPU
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");

        return new KafkaProducer<>(props);
    }
}

先進的なコンシューマ: コミットと再処理

自動コミット: シンプルだが危険

Con enable.auto.commit=true (デフォルト)、Kafka は次の間隔でオフセットを自動的にコミットします。 auto.commit.interval.ms ミリ秒 (デフォルト: 5000ms = 5 秒)。問題: レコードが実際にいつ処理されるかに関係なく、コミットはバックグラウンドで行われます。

自動コミットの問題のあるシナリオ:

  1. ポーリングはオフセット 1 ~ 100 の 100 レコードを返します
  2. 消費者がレコードの処理を開始します
  3. 5 秒後にタイマーが開始します。オフセット 100 が自動的にコミットされます。
  4. レコード 1 ~ 60 のみを処理した後にコンシューマがクラッシュする
  5. 再起動すると、コンシューマーはオフセット 100 から開始します。レコード 61 ~ 100 がスキップされました (データ損失)

同期手動コミット

// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");

// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);

// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000);  // 5 minuti

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

try {
    while (true) {
        ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record : records) {
            elaboraOrdine(record);
        }

        // commitSync() bloca fino a conferma dal broker
        // In caso di errore, lancia CommitFailedException
        // Garantisce at-least-once: il commit avviene dopo l'elaborazione
        consumer.commitSync();
    }
} catch (CommitFailedException e) {
    // Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
    // Gestisci il rebalance e riprendi
    log.error("Commit fallito, probabile rebalance", e);
}

パーティションごとのコミット (細かい粒度)

// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

while (true) {
    ConsumerRecords<String, String> records =
        consumer.poll(Duration.ofMillis(100));

    // Raggruppa i record per partizione
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    for (ConsumerRecord<String, String> record : records) {
        // Elabora il record
        elaboraOrdine(record);

        // Traccia l'offset per questa partizione
        // IMPORTANTE: commita offset+1 (il prossimo record da leggere)
        offsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
    }

    // Commit di tutti gli offset accumulati
    consumer.commitSync(offsets);
}

リバランス: その仕組みと管理方法

Un リバランス コンシューマ グループが変更されたとき、つまりコンシューマが開始、終了、またはクラッシュしたときに発生します。 リバランス中に、 グループ内のすべての消費者 彼らは本を読むのをやめ(世界を止めて)、 グループコーディネーターがパーティションを再割り当てします。 Kafka の最近のバージョン (2.4 以降) では利用可能です の 協同組合のリバランス (増分協調リバランス)これにより影響が軽減されます。 割り当て先を変更するパーティションのみが取り消され、再割り当てされます。

// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
    Collections.singletonList("ordini-effettuati"),
    new ConsumerRebalanceListener() {

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // Chiamato prima che le partizioni vengano riassegnate
            // Commita gli offset delle partizioni che stiamo per perdere
            log.info("Partizioni revocate: {}", partitions);

            // Commit degli offset non ancora committati per le partizioni revocate
            Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
            for (TopicPartition tp : partitions) {
                if (currentOffsets.containsKey(tp)) {
                    toCommit.put(tp, currentOffsets.get(tp));
                }
            }
            if (!toCommit.isEmpty()) {
                consumer.commitSync(toCommit);
            }
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Chiamato dopo che le nuove partizioni sono state assegnate
            log.info("Partizioni assegnate: {}", partitions);
            // Possiamo inizializzare state locale per le nuove partizioni
        }
    }
);

高度なオフセット管理を備えた Python のコンシューマ

from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging

log = logging.getLogger(__name__)

class AdvancedKafkaConsumer:
    """Consumer Kafka con commit manuale e gestione rebalance."""

    def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
        self.topic = topic
        self.pending_offsets: Dict[tuple, int] = {}

        config = {
            "bootstrap.servers": bootstrap_servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            # Cooperative rebalance: meno interruzioni
            "partition.assignment.strategy": "cooperative-sticky",
            "session.timeout.ms": 30000,
            "max.poll.interval.ms": 300000,
        }

        self.consumer = Consumer(config)
        self.consumer.subscribe([topic], on_assign=self._on_assign,
                                 on_revoke=self._on_revoke)

    def _on_assign(self, consumer, partitions):
        log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")

    def _on_revoke(self, consumer, partitions):
        """Commita gli offset prima di perdere le partizioni."""
        log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
        to_commit = []
        for p in partitions:
            key = (p.topic, p.partition)
            if key in self.pending_offsets:
                to_commit.append(
                    TopicPartition(p.topic, p.partition,
                                   self.pending_offsets[key])
                )
        if to_commit:
            consumer.commit(offsets=to_commit, asynchronous=False)

    def process_messages(self, handler_fn, batch_size: int = 100):
        """Loop principale di elaborazione."""
        batch_count = 0

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)

                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() != KafkaError._PARTITION_EOF:
                        log.error(f"Errore Kafka: {msg.error()}")
                    continue

                # Elabora il messaggio
                try:
                    handler_fn(msg)

                    # Traccia l'offset (offset+1 = prossimo da leggere)
                    self.pending_offsets[(msg.topic(), msg.partition())] = \
                        msg.offset() + 1
                    batch_count += 1

                except Exception as e:
                    log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
                    # Qui potresti implementare una DLQ (Dead Letter Queue)
                    # o skippare il record problematico

                # Commit ogni N record
                if batch_count >= batch_size:
                    self._commit_pending()
                    batch_count = 0

        finally:
            # Commit finale prima di chiudere
            self._commit_pending()
            self.consumer.close()

    def _commit_pending(self):
        """Commita tutti gli offset pendenti."""
        if not self.pending_offsets:
            return
        offsets = [
            TopicPartition(topic, partition, offset)
            for (topic, partition), offset in self.pending_offsets.items()
        ]
        self.consumer.commit(offsets=offsets, asynchronous=False)
        self.pending_offsets.clear()
        log.debug(f"Committati {len(offsets)} offset")

パフォーマンスチューニング: 主要パラメータ

プロデューサー側

パラメータ デフォルト 効果
linger.ms 0 より大きなバッチを蓄積するために N ミリ秒待機します
batch.size 16384 (16KB) パーティションあたりの最大バッチサイズ
buffer.memory 33554432 (32MB) すべてのバッチの合計メモリ内バッファー
compression.type そうではない なし/gzip/snappy/lz4/zstd

消費者側

パラメータ デフォルト 効果
fetch.min.bytes 1 フェッチから返される最小データ サイズ
fetch.max.wait.ms 500 fetch.min.bytes に達しない場合の最大待機時間
max.poll.records 500 単一のpoll()の最大レコード数
max.partition.fetch.bytes 1048576 (1MB) フェッチごとのパーティションあたりの最大データ

EmbeddedKafka を使用したテスト

Kafka を使用するコードをテストするには、使用可能なクラスターが必要です。単体テストと結合テストの場合、 春のカフカがお届けします @EmbeddedKafka これにより、テスト中にインメモリ ブローカーが起動されます。

// Dipendenza Maven
// <dependency>
//   <groupId>org.springframework.kafka</groupId>
//   <artifactId>spring-kafka-test</artifactId>
//   <scope>test</scope>
// </dependency>

import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = {"ordini-test"},
    brokerProperties = {
        "auto.create.topics.enable=false",
        "default.replication.factor=1"
    }
)
class OrdineProducerTest {

    @Autowired
    private OrdineProducer producer;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    void shouldSendOrderSuccessfully() throws Exception {
        // Crea un consumer di test per verificare che il messaggio sia arrivato
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
            "test-group", "true", embeddedKafka
        );
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");

        // Invia un ordine
        producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");

        // Verifica che il messaggio sia stato ricevuto entro 3 secondi
        ConsumerRecords<String, String> records =
            KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));

        assertThat(records.count()).isEqualTo(1);
        ConsumerRecord<String, String> record = records.iterator().next();
        assertThat(record.key()).isEqualTo("order-001");
        assertThat(record.value()).contains("99.99");

        consumer.close();
    }
}

Testcontainers を使用したテストの場合 (より現実的には、実際の Docker ブローカーを使用します):

// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
class OrdineProducerIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("apache/kafka:4.0.0")
    );

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }

    @Test
    void testConBrokerReale() {
        // Test con broker Kafka 4.0 reale in Docker
        // Garantisce che il comportamento corrisponda alla produzione
    }
}

シリーズの次のステップ

高度なプロデューサーとコンシューマーが含まれるため、最も複雑な課題に直面することができます。

  • 第 4 条 – 1 回限りのセマンティクス: 安全な Kafka トランザクション トランザクション コーディネーターとスループットに影響を与える、まさにエンドツーエンドの処理。 金融パイプラインには不可欠です。
  • 第 5 条 – レジストリスキーム: スキーマ レジストリで Avro と Protobuf を使用する方法 異なるチームのプロデューサーとコンシューマー間のスキーマの非互換性を回避するため。
  • 第 6 条 – Kafka ストリーム: Streams DSL を使用した Java での埋め込みストリーム処理、 この記事で説明したのと同じプロデューサー API とコンシューマー API を内部で使用します。

他シリーズとの連携

  • アーキテクチャ (マイクロサービスからモジュラーモノリスまで): Kafka のプロデューサーとコンシューマー 分散アーキテクチャにおけるイベント駆動パターンの具体的な実装として。
  • 高度な Java: スレッドセーフを備えた Kafka の Java API を詳しく説明します。 EmbeddedKafka を使用したライフサイクル管理とテスト。