カフカが伝統的なコーダと異なる理由

イベントのストリームをリアルタイムで処理する必要がある分散システムを設計する場合、最初に誘惑されるのは、従来のメッセージ キューを使用することです。 RabbitMQ や ActiveMQ など。これらのソリューションは単純なシナリオではうまく機能しますが、重要な構造上の制限があります。 メッセージが消費されると、メッセージは削除されます。それを再読したり、それを処理する独立した消費者が増える可能性はありません。 別の方法で、またはイベントの履歴全体を再生します。

アパッチ カフカ は、2011 年に LinkedIn で根本的に異なる哲学、つまりメッセージ (と呼ばれる) を持って誕生しました。 記録) で書かれています。 ログ追加のみ そして、設定可能な期間 (デフォルト: 7 日間) そのままになります。異なる消費者が同じレコードを読み取ることができる場合がある それぞれが異なり、それぞれがその位置を追跡します。オフセット。このパターンは、Kafka を単なるキュー以上のものにします。 になります 真実の情報源 システムのイベント履歴全体について。

2026 年のカフカ: 主要な数字

  • 以上に使用 80% フォーチュン 500 企業のストリーミング ユースケースでのトップ
  • Kafka 4.0 (2025 年 3 月) では ZooKeeper が完全に削除され、以下に移行しました。 クラフト
  • 理論上のスループット: 100 万以上のメッセージ/秒 ブローカー向け(コモディティハードウェア)
  • Confluent Cloud: マネージド Kafka を AWS、GCP、Azure でレイテンシ < 10ms で利用可能 p99
  • エコシステム: Kafka Connect、Kafka Streams、Apache Flink 統合を介した 200 以上のコネクタ

基本モデル: ブローカー、トピック、パーティション

ブローカー: クラスター ノード

Un ブローカ それは単なる Kafka サーバーです。 Kafka クラスターは 1 つ以上のブローカーで構成され、各ブローカーは broker.id 個性的。運用環境では、通常、フォールト トレランスを保証するために 3、6、または 9 個のブローカーが使用されます。ブローカーが書き込みを処理します レコードの読み取り、ディスクへのログの維持、ノード間でのレプリケーションなどです。

Kafka 4.0 と新しい KRaft の方法では、1 つ以上のブローカーが次の役割も引き受けます。 コントローラ、クラスターのメタデータの管理 (誰がどのパーティションのリーダーであるか、どのブローカーがアクティブであるかなど) 内部 Raft コンセンサス ログ経由。もう必要はありません 別の ZooKeeper アンサンブルの。

トピック: レコードの論理カテゴリ

Un トピック プロデューサーがレコードを公開し、コンシューマーがレコードを読み取るときの論理名です。これはテーマ別のチャンネルと考えることができます。 ordini-effettuati, pagamenti-confermati, eventi-utente。各トピックには独自の保持設定があり、 パーティションの数、レプリケーション係数、圧縮ポリシー。

トピックスは、 分割された: 各トピックは N 個の物理パーティションに分割され、ブローカー間で分散されます。こちらの配信です これにより、Kafka は書き込みと読み取りの両方に対して水平方向にスケーラブルになります。

パーティション: 並列処理と順序付けの統合

Una パーティション これは、整理整頓された不変の追加専用のログです。パーティションに書き込まれる各レコードは、 オフセット 単調増加 (0、1、2、...)。ソートは保証されています パーティションの内側異なるパーティション間ではありません。

パーティション間のレコードの分散は次のように決定されます。 パーティションキー: プロデューサーがキーを指定した場合、レコードは 常に同じパーティション (パーティションの数を法とするキーのハッシュ) に移動し、そのキーのソートが保証されます。鍵が紛失した場合は、 Kafka はスティッキー ラウンドロビン戦略 (ローテーション前に同じパーティション上でバッチ レコード) を使用します。

# Creare un topic con 6 partizioni e replication factor 3
# (Kafka 4.0 con KRaft, niente --zookeeper flag)
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati \
  --partitions 6 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config min.insync.replicas=2

# Descrivere il topic per verificare la distribuzione
kafka-topics.sh --describe \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati

の出力 --describe パーティションごとに、リーダー ブローカー、レプリカ、同期レプリカ (ISR — 同期レプリカ)。 ISR は、リーダーのすべてのレコードを複製したレプリカです。リーダーがダウンした場合、ISR は 1 つだけです。 新しいリーダーとして選出され、データ損失が発生しないようにします。

プロデューサー: Kafka でレコードを作成する

Il プロデューサー これは、トピックに関するレコードを公開するコンポーネントです。プロデューサーの構成によって配信の保証が決まります。 最も重要なプロパティは次のとおりです。

  • bootstrap.servers: 初期接続用のブローカーのリスト (クライアントは他のブローカーを自動的に検出します)
  • key.serializer e value.serializer: キーと値をシリアル化する方法 (StringSerializer、AvroSerializer など)
  • acks: 書き込みが成功したとみなすまでに何回応答確認を待つか (0, 1, all)
  • retries: 一時的なエラーが発生した場合の試行回数
  • linger.ms: バッチを送信する前に待機するミリ秒 (待ち時間を犠牲にしてスループットが向上します)
  • batch.size: 最大バッチ サイズ (バイト単位) (デフォルト: 16KB)
// Producer Java con configurazione production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class OrdineProducer {

    public static KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

        // Garanzie di consegna: all = acks da tutti le ISR
        props.put("acks", "all");
        // Retry automatico con backoff
        props.put("retries", 3);
        props.put("retry.backoff.ms", 100);
        // Batching per throughput
        props.put("linger.ms", 5);
        props.put("batch.size", 32768);   // 32KB
        // Compressione: riduce I/O di rete del 60-80%
        props.put("compression.type", "snappy");
        // Idempotenza: evita duplicati in caso di retry
        props.put("enable.idempotence", true);

        return new KafkaProducer<>(props);
    }

    public static void inviaOrdine(KafkaProducer<String, String> producer,
                                    String ordineId, String payload) {
        // La chiave (ordineId) determina la partizione target
        ProducerRecord<String, String> record =
            new ProducerRecord<>("ordini-effettuati", ordineId, payload);

        // Invio asincrono con callback
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.err.println("Errore invio: " + exception.getMessage());
            } else {
                System.out.printf("Record inviato: topic=%s, partizione=%d, offset=%d%n",
                    metadata.topic(), metadata.partition(), metadata.offset());
            }
        });
    }
}

注意: ACK とスループット

保証を増やすにはコストがかかります。 acks=all e min.insync.replicas=2、プロデューサーは少なくとも 2 つの応答を期待しています 続行する前に記録を書いてください。これによりレイテンシーが増加します (通常は 1 ~ 5 ミリ秒余分にかかります) が、データ損失がないことも保証されます。 確認後すぐにブローカーがドロップした場合。ある程度の損失を許容する分析システムの場合、 acks=1 o acks=0 はるかに高いスループットを提供します。

消費者: カフカの記録を読む

ポーリング ループ

Kafka コンシューマはテンプレートを使用します 引く: プッシュ メッセージは受信しませんが、呼び出しを通じてブローカーから積極的にリクエストします。 poll()。この設計により、消費者は容量を超えた大量のメッセージに圧倒されなくなります。 加工の。

// Consumer Java base con gestione degli offset manuale
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

public class OrdineConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
        props.put("group.id", "servizio-inventario");
        props.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");

        // Comportamento alla prima lettura (nessun offset salvato per il gruppo)
        // "earliest" = dall'inizio; "latest" = solo nuovi messaggi
        props.put("auto.offset.reset", "earliest");

        // Disabilitiamo il commit automatico per controllo preciso
        props.put("enable.auto.commit", false);

        // Timeout max per il join al consumer group (default: 45s)
        props.put("session.timeout.ms", 30000);

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(List.of("ordini-effettuati"));

            while (true) {
                ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Offset: %d | Partizione: %d | Chiave: %s%n",
                        record.offset(), record.partition(), record.key());

                    // Elabora il record...
                    elaboraOrdine(record.value());
                }

                // Commit manuale DOPO l'elaborazione
                // Garantisce at-least-once semantics
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        }
    }

    private static void elaboraOrdine(String payload) {
        // logica di business...
    }
}

Consumer Group: スケーリングのメカニズム

Il 消費者団体 これは、消費を並行してスケーリングするための基本的なメカニズムです。すべての消費者が共有する 同じ group.id これらは同じグループの一部であり、トピック パーティションを分割します。ルールは簡単です: 各パーティションは、一度にグループごとに 1 つのコンシューマにのみ割り当てることができます.

これは、グループ内の並列コンシューマーの最大数がパーティションの数と等しいことを意味します。 6 つのパーティションがある場合、 同じグループ内で 6 つのコンシューマーを開始すると、それぞれが 1 つのパーティションを取得します。 7 番目のコンシューマーを開始すると、休止状態のままになります。 スタンバイ状態 (迅速なフェイルオーバーに役立ちます)。

消費者グループ: スケーリングシナリオ

  • 1 コンシューマ、6 パーティション → コンシューマーがすべてを処理し、並列処理は行われません
  • 3 コンシューマー、6 パーティション → 各コンシューマーは 2 つのパーティションを並行して管理します
  • 6 コンシューマー、6 パーティション → 最大並列処理、コンシューマごとに 1 パーティション
  • 9 コンシューマ、6 パーティション → 6 つはアクティブ、3 つはフェイルオーバー用にスタンバイ
  • 2 つの異なるグループ、同じトピック → 各グループはすべてのメッセージを個別に受信します

オフセット: 位置追跡メカニズム

L'オフセット パーティション内のレコードの場所を一意に識別する整数です。 ブローカーは、書き込まれる各レコードにオフセットを順番に割り当てます。最初のレコードのオフセットは 0、2 番目のレコードのオフセットは 1 というようになります。

消費者グループは独自のデータを保存します コミットされたオフセット — つまり、最後に正常に処理されたレコードのオフセット — という特別な内部 Kafka トピック内 __consumer_offsets。これは再起動の場合の開始点です またはコンシューマのフェイルオーバー。

これらのオフセットの違いを理解することは、エラー処理にとって重要です。

  • ログ終了オフセット (LEO): ログに書き込まれる次のレコードのオフセット (先頭位置)
  • ハイウォーターマーク (HW): すべての ISR にわたって複製された最後のレコードのオフセット (コンシューマーにはレコード ≤ HW のみが表示されます)
  • 電流オフセット: コンシューマが次のpoll()呼び出しで読み取る次のレコードのオフセット
  • コミットされたオフセット: トピックに保存されたオフセット __consumer_offsets (クラッシュ後に再起動する場所)
  • 消費者ラグ: LEO と Committed Offset の差。コンシューマーがまだ処理する必要があるレコードの数を示します。
# Controllare il consumer lag di un gruppo
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --describe \
  --group servizio-inventario

# Output tipico:
# GROUP              TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# servizio-inventario ordini-effettuati    0          1250            1280            30
# servizio-inventario ordini-effettuati    1          890             890             0
# servizio-inventario ordini-effettuati    2          2100            2105            5

# Resettare gli offset al principio (per reprocessing)
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --group servizio-inventario \
  --topic ordini-effettuati \
  --reset-offsets --to-earliest \
  --execute

レプリケーション: 耐久性と耐障害性

各パーティションには、 リーダー そしてゼロ以上 フォロワー (レプリカ)。生産者と消費者のコミュニケーション いつもリーダーと一緒。フォロワーは、リーダーからのデータを非同期ですが通常は高速な方法で複製します。

リーダーになるために「十分にアップデートされた」フォロワーの集合が、ISR (同期レプリカ)。 フォロワーが一定以上遅れた場合、フォロワーは ISR から削除されます。 replica.lag.time.max.ms ミリ秒 (デフォルト: 30 秒)。 リーダーが失墜すると、Kafka コントローラーは ISR の中で最もオフセットが大きいフォロワーを新しいリーダーとして選出します。

の組み合わせ replication.factor e min.insync.replicas 耐久性と可用性の間のトレードオフを定義します。

# Configurazione consigliata per produzione
# replication.factor=3 significa: 1 leader + 2 follower

# topic-level overrides
kafka-topics.sh --alter \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati \
  --config min.insync.replicas=2

# Con questa configurazione:
# - acks=all: producer aspetta conferma da leader + 1 follower minimo
# - Se 2 broker su 3 sono down: il cluster rifiuta scritture (ma no data loss)
# - Se solo 1 broker è down: il cluster continua normalmente

# broker-level defaults in server.properties
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3

保持ポリシー: データが保持される期間

Kafka のレコードは、トピックごとに構成できるポリシーに従って削除されます。 2 つの主なモードがあります。

  • 時間ベースの保持 (retention.ms): レコードはタイムスタンプから一定期間後に削除されます。 デフォルト: 604800000ms = 7 日。監査ログなどの重要なトピックの場合、はるかに高い値(年)が設定されます。
  • サイズベースの保持 (retention.bytes): パーティションごとのログは特定のサイズを超えません。 サイズが制限を超えると、古いセグメントが削除されます。
  • ログの圧縮 (cleanup.policy=compact): 時間/サイズによって削除する代わりに、Kafka は保持します。 各キーの最後のレコードのみ。状態トピック (CDC 経由でレプリケートされたデータベース テーブルなど) に最適です。
# Configurare retention per diversi use case

# Topic eventi real-time: retention breve, alta velocità
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic click-stream \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=3600000 \   # 1 ora
  --config retention.bytes=1073741824  # 1GB per partizione

# Topic log audit: retention lunga per compliance
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic audit-log \
  --partitions 3 \
  --replication-factor 3 \
  --config retention.ms=31536000000 \  # 1 anno
  --config compression.type=gzip

# Topic di stato con log compaction (es. profili utente)
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic profili-utente \
  --partitions 6 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5

コンシューマ Python: 実用的な例

Kafka エコシステムは多くの言語をサポートしています。以下はライブラリを使用した Python の例です。 confluent-kafka (librdkafka に基づく公式の Confluent バインディングは、kafka-python よりもはるかにパフォーマンスが優れています):

# pip install confluent-kafka

from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import signal
import sys

TOPIC = "ordini-effettuati"
GROUP_ID = "servizio-analytics-py"

config = {
    "bootstrap.servers": "kafka1:9092,kafka2:9092",
    "group.id": GROUP_ID,
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    "session.timeout.ms": 30000,
    "max.poll.interval.ms": 300000,  # 5 minuti per elaborazioni lente
}

consumer = Consumer(config)
running = True

def graceful_shutdown(signum, frame):
    global running
    running = False

signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)

try:
    consumer.subscribe([TOPIC])
    print(f"Consumer avviato, gruppo: {GROUP_ID}")

    while running:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # Raggiunta la fine della partizione, aspetta nuovi messaggi
                print(f"Raggiunto EOF: {msg.topic()} [{msg.partition()}] offset {msg.offset()}")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            ordine = json.loads(msg.value().decode("utf-8"))
            print(f"Ricevuto ordine {ordine['id']} da partizione {msg.partition()}")

            # Elabora l'ordine...
            elabora_ordine(ordine)

            # Commit manuale dopo elaborazione riuscita
            consumer.commit(asynchronous=False)

finally:
    consumer.close()
    print("Consumer chiuso correttamente")

def elabora_ordine(ordine):
    # Logica di business...
    pass

推奨されるアーキテクチャ: パーティションの数は?

Kafka を使い始める人にとって最も一般的な質問の 1 つは、「トピックに対して作成するパーティションの数はいくつですか?」というものです。 答えはいくつかの要因によって異なります。

  • コンシューマ グループの最大並列処理数: パーティションの数は、並列コンシューマーの最大数です。 ピーク時に予想される消費者の数を見積もります。
  • 目標スループット: 各パーティションは通常、10 ~ 50 MB/秒の書き込みを処理できます (ディスクによって異なります)。 合計スループットをこの数値で割ると、必要なパーティションの最小数が得られます。
  • 仕分け: 特定のキーの順序を保証する必要がある場合 (例: 同じ顧客のすべてのイベント)、 そのクライアントは常に同じパーティション上に存在します。パーティションが多いほど、さまざまなキーの負荷分散が向上します。
  • メモリのオーバーヘッド: 各パーティションにはブローカーにメモリが必要です (約 1 ~ 2 MB のオーバーヘッド)。合計 100K のパーティションでは、 負担がかかり始めています。

パーティションの実際的なルール

近似式: max(throughput_MB_s / 10, consumer_max_paralleli)。ほとんどのアプリケーションでは、 6、12、または 24 のパーティションが妥当な値です。 Kafka では後でパーティションを拡張できますが、 彼らを貶めないように:少し余裕を持った計画を立ててください。

ログの圧縮: 状態トピックの使用例

La ログ圧縮 セマンティクスを完全に変更する Kafka の高度な機能です 保持: Kafka は、時間やサイズに基づいてレコードを削除するのではなく、各キーの最後のレコード。 同じキーを持つ古いレコードはすべて、圧縮プロセス中に削除されます。

これにより、コンパクトなトピックが、 現在の状態 エンティティの数: ユーザープロファイル、現在の価格、システム構成、在庫。トピックに接続する消費者 初めて圧縮されると、存在するすべてのレコードを読み取ることで完全な状態を再構築できます。 (キーごとに 1 つ)、イベントの履歴全体を読み取る必要はありません。

価値のあるレコード null (「廃棄レコード」) はトピックからキーを削除する方法です 圧縮: 圧縮後、キー自体もログから消えます。

# Creare un topic con log compaction
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic profili-utente \
  --partitions 6 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5 \
  --config segment.ms=86400000 \
  --config delete.retention.ms=86400000

# cleanup.policy=compact: abilita compaction
# min.cleanable.dirty.ratio=0.5: compatta quando >50% del log e' "dirty"
# segment.ms=86400000: crea un nuovo segmento ogni 24h
# delete.retention.ms: quanto tenere i tombstone record prima di eliminarli

# Inviare un aggiornamento profilo (chiave = userId)
kafka-console-producer.sh \
  --bootstrap-server kafka1:9092 \
  --topic profili-utente \
  --property parse.key=true \
  --property key.separator=:
# Digita: user123:{"nome":"Mario","email":"mario@example.com","eta":30}
# Digita: user456:{"nome":"Anna","email":"anna@example.com","eta":25}
# Digita: user123:{"nome":"Mario","email":"mario.rossi@example.com","eta":31}
# Dopo compaction, nel topic rimane solo l'ultima riga per user123

Kafka の内部トピック: __consumer_offsets と __transaction_state

Kafka は内部的に特別なトピックを使用して状態を管理します。それらを知ることは、それらがどのように機能するかを理解するのに役立ちます システムのトラブルシューティングを行うには、次の手順を実行します。

  • __consumer_offsets: 各コンシューマ グループのコミットされたオフセットを格納します。 デフォルトでは 50 個のパーティションがあります (offsets.topic.num.partitions)。コンシューマグループが割り当てられます group.id のハッシュを介してパーティションにコピーします。このトピックにレプリケーションの問題がある場合は、コンシューマー グループ オフセットのコミットに失敗します。
  • __transaction_state: 進行中のトランザクションのステータスを管理します。 Kafka トランザクション API によって使用され、1 回限りのセマンティクスを保証します。 デフォルトでは 50 個のパーティションがあります。
  • @metadata (KRaft のみ): クォーラム コントローラーのメタデータ ログ。 すべてのクラスターのメタデータ (トピック、パーティション、ブローカー、ACL、構成) が含まれます。 コントローラの内部からのみアクセス可能です。
# Ispezionare il topic __consumer_offsets (advanced troubleshooting)
# ATTENZIONE: operazione read-only, non modificare mai questi topic

kafka-console-consumer.sh \
  --bootstrap-server kafka1:9092 \
  --topic __consumer_offsets \
  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
  --from-beginning \
  --max-messages 20

# Output esempio:
# [servizio-inventario,ordini-effettuati,0]::OffsetAndMetadata(offset=1250, ...)
# [servizio-inventario,ordini-effettuati,1]::OffsetAndMetadata(offset=890, ...)

# Elencare tutti i consumer group attivi
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --list

# Dettaglio di un gruppo specifico
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --group servizio-inventario \
  --describe \
  --state  # include stato del gruppo (Stable, Rebalancing, Empty, Dead)

メッセージヘッダーとタイムスタンプ

Kafka の各レコードには正確な構造があります。

  • Key (オプション): バイト単位でシリアル化されたターゲット パーティションを決定します。
  • 価値: バイト単位でシリアル化されたメッセージ ペイロード
  • タイムスタンプ: プロデューサー側の作成時間 (CreateTime) またはブローカー側の取り込み (LogAppendTime)、構成可能
  • ヘッダー: メタデータのキーと値のペア (相関 ID、イベント タイプ、スキーマ バージョンなど)
  • パーティション + オフセット: 執筆時点でブローカーによって割り当てられました
// Aggiungere headers a un ProducerRecord Java
ProducerRecord<String, String> record = new ProducerRecord<>(
    "ordini-effettuati",
    ordineId,
    payload
);

// Headers per tracciabilità e versioning
record.headers()
    .add("correlation-id", UUID.randomUUID().toString().getBytes())
    .add("schema-version", "2".getBytes())
    .add("source-service", "checkout-service".getBytes())
    .add("event-type", "OrdineCreato".getBytes());

producer.send(record);

Docker Compose: ローカル開発のクイック スタート

複雑な構成を扱わずにローカルで Kafka の実験を開始するには、次のようにします。 最も簡単な方法は、公式の Apache Kafka 4.0 イメージで Docker Compose を使用することです。

# docker-compose.yml minimale per sviluppo locale (single-node KRaft)
version: "3.9"
services:
  kafka:
    image: apache/kafka:4.0.0
    container_name: kafka-local
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_LISTENERS: "PLAINTEXT://kafka-local:9092,CONTROLLER://kafka-local:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-local:9093"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      CLUSTER_ID: "local-dev-cluster-id-001"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

# Avvio:
# docker-compose up -d
#
# Verifica:
# docker exec kafka-local kafka-topics.sh --bootstrap-server localhost:9092 --list

概要: 基本的な概念

Kafka の内部構造を調べた後、覚えておくべき重要な概念を以下にまとめます。

  • ブローカ: クラスタ ノード、ディスク ログとレプリケーションを管理します
  • トピックス: パーティションに分割されたレコードの論理カテゴリ
  • パーティション: ソートされた追加専用ログ、並列処理の単位。注文は内部でのみ保証されます
  • オフセット: パーティション内の各レコードのプログレッシブ位置
  • 消費者グループ: スケーリング機構。各パーティションはグループごとに 1 つのコンシューマのみに割り当てられます
  • ISR: 更新されたレプリカのセット。障害が発生した場合、そこから新しいリーダーが選出されます。
  • 消費者ラグ: クリティカルヘルスインジケーター、LEO とコミットされたオフセットの違い
  • 保持: レコードは構成可能なままであり、消費後に削除されません。

シリーズの次のステップ

しっかりとした基礎ができたので、シリーズの次の記事では、より高度な側面について詳しく説明します。

  • 記事 2 – Kafka 4.0 の KRaft: ZooKeeper なしで新しいコントローラーがどのように動作するか、 Kafka 3.x からの移行プロセスと運用環境での運用上の利点について説明します。
  • 第 3 条 – 先進的な生産者と消費者: の詳細な構成 acks, retries, max.in.flight.requests 冪等プロデューサーは、プロデューサー レベルで 1 回限りの保証を行います。
  • 第 4 条 – 1 回限りのセマンティクス: 複数のトピックに対するアトミックな書き込みのための Kafka トランザクション、 トランザクション コーディネーターとスループットへの影響。

他シリーズとの連携

  • 可観測性と OpenTelemetry: OpenTelemetry を使用して Kafka アプリケーションをインストルメント化する方法 プロデューサーとコンシューマー間のイベントの伝播を追跡します。
  • プラットフォームエンジニアリング: 内部開発者プラットフォームの基本コンポーネントとしての Kafka チーム間のイベント駆動型コミュニケーション用。
  • PostgreSQL AI: PostgreSQL を同期するための Debezium を使用した CDC (Change Data Capture) パターン このシリーズの第 7 条のトピックをリアルタイムで Kafka に伝えます。