クラスターのサイジング: ブローカーの数とサイズ

Kafka クラスターのサイジングは、次の 2 つの基本的な質問から始まります。 スループットとは何か 目標の書き込み速度 (MB/秒) と必要な保持期間 (データを利用可能な状態にしておく必要がある期間) はどれくらいですか? これら 2 つの質問に対する答えによって、必要なブローカーの数、ディスク サイズ、およびネットワークが決まります。

ブローカー番号の計算式

各 Kafka ブローカーは、持続的に約 100-200MB/秒 スループットの コモディティ ハードウェア (NVMe SSD ディスク、10 Gbps ネットワーク) への書き込み。この値は大きく依存します ワークロード プロファイル (メッセージ サイズ、パーティション数、ACK 設定) から。

# Formula per il sizing del cluster
#
# Throughput totale in scrittura: T_write = messaggi_al_secondo * dimensione_media_messaggio
# Throughput totale con replica:  T_total = T_write * replication_factor
# Numero di broker:               N_broker = ceil(T_total / throughput_per_broker)
#
# Esempio pratico:
# - 100.000 messaggi/s * 1 KB/messaggio = 100 MB/s throughput in scrittura
# - Replication factor 3:                 100 * 3 = 300 MB/s throughput totale
# - Throughput per broker: 150 MB/s       300 / 150 = 2 broker
# - Aggiungi 30% di margine:              ceil(2 * 1.3) = 3 broker
#
# Per la memoria RAM:
# - Kafka usa la page cache del SO per le letture recenti (hot data)
# - Regola empirica: 64-128 GB RAM per broker (o almeno 50% del dataset "caldo")
# - JVM heap: 6-8 GB (non di piu: il resto serve alla page cache)
#
# Per il disco:
# Disco_per_broker = (T_write * retention_giorni * 86400 * replication_factor) / N_broker

# Esempio: 100 MB/s, 7 giorni di retention, replication factor 3, 3 broker
# = (100 * 7 * 86400 * 3) / 3 = 181.440.000 MB ~= 173 TB totale / 3 broker = ~58 TB per broker
# Arrotonda a 80 TB con margine

本番環境の Kafka ブローカーに推奨されるハードウェア

  • CPU: 16 ~ 32 コア (Kafka は CPU 依存ではありませんが、圧縮で使用されます)
  • ラム: 64 ~ 128 GB (ページ キャッシュはパフォーマンスに不可欠です)
  • ディスコ: NVMe SSD RAID 10 以上の個別ディスク (並列 I/O 用に個別にマウント)
  • ネット: 最小 10 Gbps、高スループット クラスターの場合は 25 Gbps
  • OS: ext4 または XFS を備えた Linux、追加シーケンシャル書き込み用に最適化されたファイルシステム
  • JVM: Java 21 (LTS)、G1GC、6 ~ 8 GB ヒープ、-XX:MaxGCPauseMillis=20

最適なブローカー構成:server.properties

デフォルトの Kafka 構成は、実稼働環境にとっては最適とは言えないことがよくあります。ここにプロパティがあります カスタマイズする上で最も重要なのは server.properties:

# server.properties - configurazione production-ready per Kafka 4.0

# ===== Rete e I/O =====
# Thread per le richieste di rete
num.network.threads=8
# Thread per le operazioni di I/O su disco
num.io.threads=16
# Buffer per richieste/risposte di rete
socket.send.buffer.bytes=1048576      # 1 MB
socket.receive.buffer.bytes=1048576   # 1 MB
socket.request.max.bytes=104857600    # 100 MB

# ===== Log e Disco =====
# Separare i log su piu dischi per I/O parallelo
log.dirs=/disk1/kafka-data,/disk2/kafka-data,/disk3/kafka-data

# Numero di thread per il log recovery all'avvio
num.recovery.threads.per.data.dir=4

# Flush su disco: NON abbassare questi valori, usa acks=all invece
log.flush.interval.messages=10000000
log.flush.interval.ms=1000

# ===== Retention Default =====
# Retention temporale (7 giorni)
log.retention.hours=168
# Retention per dimensione (-1 = illimitata per default)
log.retention.bytes=-1
# Dimensione massima del segmento log (1 GB)
log.segment.bytes=1073741824
# Intervallo di controllo per la retention
log.retention.check.interval.ms=300000

# ===== Replica =====
# Numero minimo di ISR per accettare scritture (con acks=all)
min.insync.replicas=2
# Default replication factor per topic auto-creati
default.replication.factor=3
# Replication factor per topic interni
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# ===== Performance Replica =====
# Dimensione buffer per la replica
replica.fetch.max.bytes=10485760      # 10 MB
replica.socket.receive.buffer.bytes=10485760
# Timeout per il leader election
leader.imbalance.check.interval.seconds=300

# ===== Consumer Groups =====
# Timeout sessione consumer
group.initial.rebalance.delay.ms=3000
# ===== Auto Create Topics =====
# DISABILITARE in produzione per evitare topic creati per errore
auto.create.topics.enable=false

パーティションの数: 作成するパーティションの数

パーティション数の選択は、Kafka において最も重要であり、元に戻すのが最も難しいものの 1 つです。 彼らはできる 増加 パーティション( kafka-topics.sh --alter) しかし 減らないでください。パーティションを増やすと、並べ替えと再バランスに影響します。

# Regola pratica per il numero di partizioni:
# max(throughput_MB_s / 10, consumer_max_paralleli, producer_max_paralleli)
#
# Esempio:
# - Throughput target: 50 MB/s  --> almeno 5 partizioni per throughput
# - Consumer parallelismo max: 12 istanze --> almeno 12 partizioni
# - Scelta: 12 partizioni
#
# Linee guida pratiche:
# - Topic ad alto volume, molti consumer: 12, 24, 48 partizioni
# - Topic a basso volume, pochi consumer: 3, 6 partizioni
# - Topic interni (audit, DLQ): 3 partizioni sono spesso sufficienti
# - Non creare mai piu di 4000-6000 partizioni per broker (overhead memoria)

# Creare un topic con sizing ottimale
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic pagamenti-confermati \
  --partitions 12 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config retention.ms=604800000 \
  --config compression.type=snappy

# Verificare la distribuzione delle partizioni tra i broker
kafka-topics.sh --describe \
  --bootstrap-server kafka1:9092 \
  --topic pagamenti-confermati

# Se la distribuzione e sbilanciata (troppi leader sullo stesso broker):
kafka-leader-election.sh \
  --bootstrap-server kafka1:9092 \
  --election-type preferred \
  --all-topic-partitions

保持ポリシー: 時間 vs サイズ vs 圧縮

保持ポリシーは、Kafka でレコードを利用可能な期間を決定します。 間違った選択をすると、ディスク不足 (保存期間が長すぎる) または 遅いコンシューマではリプレイは不可能です (保持期間が短すぎます)。

# Configurazioni di retention per diversi use case

# Use case 1: Event streaming real-time (clickstream, metriche)
# Retention breve, alta velocita, i dati vengono aggregati immediatamente
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic click-events \
  --partitions 24 \
  --replication-factor 3 \
  --config retention.ms=3600000 \        # 1 ora
  --config retention.bytes=5368709120 \  # 5 GB per partizione
  --config compression.type=lz4          # compressione veloce

# Use case 2: Integrazione tra servizi (domain events)
# Retention media, consumer devono poter recuperare eventi recenti
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \  # 7 giorni (default)
  --config compression.type=snappy

# Use case 3: Audit log, compliance
# Retention lunga, dati critici per regolamentazione
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic audit-trail \
  --partitions 6 \
  --replication-factor 3 \
  --config retention.ms=94608000000 \  # 3 anni
  --config compression.type=gzip       # compressione massima per dati storici

# Use case 4: Change Data Capture (CDC), topic di stato
# Log compaction: mantiene solo ultimo valore per chiave
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic clienti-profilo \
  --partitions 6 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5 \
  --config delete.retention.ms=86400000  # tombstone retention 24h

ローリング再起動: ダウンタイムなしでクラスターをアップグレードする

Kafka クラスターの更新 (ソフトウェア バージョン、ブローカー構成、JVM) が実行されます と ローリングリスタート: ISR を待ちながら、一度に 1 つのブローカーを再起動します。 次のブローカーに進む前に完全に再構築してください。

# Procedura di rolling restart sicura

# 1. Verifica che il cluster sia sano prima di iniziare
kafka-topics.sh --describe \
  --bootstrap-server kafka1:9092 \
  --under-replicated-partitions
# Output atteso: nessuna partizione elencata

# 2. Per ogni broker (uno alla volta):
# a) Marca il broker come "not preferred" per evitare leader election inutili
kafka-leader-election.sh --bootstrap-server kafka1:9092 \
  --election-type unclean --all-topic-partitions

# b) Riavvia il broker
systemctl restart kafka

# c) ASPETTA che il broker si riunga al cluster e l'ISR si ricostruisca
# Monitora: tutte le under-replicated partitions devono tornare a 0
watch -n 5 "kafka-topics.sh --bootstrap-server kafka1:9092 --describe --under-replicated-partitions"

# d) Solo quando l'ISR e completo, passa al broker successivo

# 3. Dopo il rolling restart, ribilancia i leader
kafka-leader-election.sh \
  --bootstrap-server kafka1:9092 \
  --election-type preferred \
  --all-topic-partitions

# Verifica finale
kafka-broker-api-versions.sh --bootstrap-server kafka1:9092

MirrorMaker 2: Geo レプリケーションと災害復旧

ミラーメーカー 2 (MM2) は、異なるクラスター間でデータをレプリケートするための Kafka コンポーネントです。 Kafka Connect 上に構築されており、双方向レプリケーション、オフセット同期をサポートしています。 そして自動フェイルオーバー。これは次の目的で使用されます。

  • 災害復旧 (DR): プライマリ クラスターから別のデータセンターのスタンバイにレプリケートします
  • 地理的分布: データは複数のリージョンにレプリケートされ、ローカル コンシューマへの遅延が低くなります。
  • 移行: ダウンタイムなしのクラスター間の制御された移行
# mm2.properties - Configurazione MirrorMaker 2
# Replica da cluster "primary" a "secondary" (datacenter DR)

# I due cluster
clusters=primary,secondary

# Connessione ai cluster
primary.bootstrap.servers=kafka-primary-1:9092,kafka-primary-2:9092,kafka-primary-3:9092
secondary.bootstrap.servers=kafka-secondary-1:9092,kafka-secondary-2:9092,kafka-secondary-3:9092

# Abilita la replica primary -> secondary
primary->secondary.enabled=true
# Non abilitare secondary->primary per evitare loop (solo se non e' bidirezionale)
secondary->primary.enabled=false

# Topic da replicare (regex): tutti i topic produzione escludendo interni
primary->secondary.topics=^(?!(__|\.)).*$

# Sincronizza anche le configurazioni dei topic
primary->secondary.sync.topic.configs.enabled=true
primary->secondary.sync.topic.acls.enabled=true

# Sincronizza gli offset dei consumer group
primary->secondary.sync.group.offsets.enabled=true
primary->secondary.sync.group.offsets.interval.seconds=60

# Prefisso per i topic replicati (su secondary avrai "primary.ordini-effettuati")
replication.factor=3

# Performance
tasks.max=4
producer.override.acks=all
producer.override.compression.type=snappy

# Avvio di MirrorMaker 2:
# connect-mirror-maker.sh mm2.properties

MirrorMaker 2 によるフェイルオーバー: コンシューマ オフセット変換

MM2 の最も強力な機能の 1 つは、 オフセット変換: セカンダリ クラスタのオフセットはプライマリ クラスタのオフセットとは異なります (コピーには レプリケーションラグがある場合はすべてのレコード)。 MM2 はトピック内のマッピング テーブルを維持します。 mm2-offset-syncs.primary.internal オフセットを変換します。

# RemoteClusterUtils: tradurre gli offset per il failover
# Da usare durante un failover emergency per far ripartire i consumer
# dal punto corretto sul cluster secondary

from confluent_kafka.admin import AdminClient
import json

# Script di failover: trova l'offset tradotto per ogni consumer group
def translate_offsets_for_failover(primary_group_id, secondary_bootstrap):
    """
    Usa i checkpoint di MM2 per trovare l'offset equivalente
    su secondary per un consumer group originalmente su primary.
    """
    admin = AdminClient({'bootstrap.servers': secondary_bootstrap})

    # MM2 scrive i checkpoint nel topic dedicato
    # Topic: mm2-checkpoints.primary.internal
    # Leggi i checkpoint per il gruppo specifico

    # In alternativa, usa la MM2 RemoteClusterUtils Java API:
    # Map<TopicPartition, OffsetAndMetadata> translated =
    #   RemoteClusterUtils.translateOffsets(
    #     adminClient, "primary", groupId, Duration.ofMinutes(1));

    print(f"Failover completato: consumer group '{primary_group_id}' "
          f"ora punta al cluster secondary con offset tradotti")

# Dopo il failover, cambia il bootstrap.servers dei consumer
# da primary a secondary e ripartono dal punto corretto

本番用のトピックの構成: チェックリスト

# Script di creazione topic production-ready con tutti i parametri

create_production_topic() {
    TOPIC=$1
    PARTITIONS=$2
    RETENTION_DAYS=$3

    RETENTION_MS=$((RETENTION_DAYS * 24 * 3600 * 1000))

    kafka-topics.sh --create \
        --bootstrap-server kafka1:9092 \
        --topic $TOPIC \
        --partitions $PARTITIONS \
        --replication-factor 3 \
        --config min.insync.replicas=2 \
        --config retention.ms=$RETENTION_MS \
        --config compression.type=snappy \
        --config max.message.bytes=10485760 \  # 10 MB max message
        --if-not-exists

    echo "Topic $TOPIC creato: $PARTITIONS partizioni, RF=3, MIR=2, retention=${RETENTION_DAYS}d"
}

# Crea i topic principali
create_production_topic ordini-effettuati 12 7
create_production_topic pagamenti-confermati 12 30
create_production_topic audit-trail 3 1095        # 3 anni
create_production_topic ordini-effettuati.DLT 3 30  # DLQ

# Verifica di tutti i topic
kafka-topics.sh --list --bootstrap-server kafka1:9092

# Verifica configurazione di un topic specifico
kafka-configs.sh --bootstrap-server kafka1:9092 \
    --entity-type topics \
    --entity-name ordini-effettuati \
    --describe

Strimzi Operator を使用した Kubernetes 上の Kafka

Kubernetes でのデプロイメントの場合、 ストリムジ 公式の参照演算子です (CNCF サンドボックスの一部)。 Kafka クラスターの作成、更新、構成を管理します。 Kubernetes カスタム リソース定義 (CRD) 経由。

# kafka-cluster.yaml - Strimzi KafkaNodePool + Kafka CRD
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: broker
  namespace: kafka
  labels:
    strimzi.io/cluster: production-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: persistent-claim
    size: 2Ti       # 2 TB per broker
    class: fast-ssd
  resources:
    requests:
      memory: 32Gi
      cpu: "4"
    limits:
      memory: 64Gi
      cpu: "8"
  jvmOptions:
    -Xms: 6144m
    -Xmx: 6144m
    gcLoggingEnabled: false
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: production-cluster
  namespace: kafka
spec:
  kafka:
    version: 4.0.0
    metadataVersion: "4.0"
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      auto.create.topics.enable: "false"
      default.replication.factor: 3
      min.insync.replicas: 2
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.retention.hours: 168
      compression.type: snappy
  zookeeper:
    replicas: 0  # KRaft: niente ZooKeeper in Kafka 4.0

パフォーマンス チューニング: すべてを変える構成

本番環境における一般的なアンチパターン

  • 速度の場合は acks=0: 耐久性ゼロの保証、ブローカーに障害が発生した場合にメッセージが表示されずに失われる
  • レプリケーション.ファクター=1: 冗長性がありません。ブローカーを失うとデータが破壊されます。
  • auto.create.topics.enable=true: タイプミスによりトピックが作成されたため、確認できません
  • JVM ヒープ > 8GB: GC の長い一時停止が発生し、ブローカーのパフォーマンスが低下します。
  • パーティションが少なすぎます: 並列処理のボトルネックはダウンタイムなしに解決することは不可能です
# Configurazioni producer per massimo throughput (batch processing)
props.put("batch.size", 65536);          # 64 KB per batch
props.put("linger.ms", 20);              # Aspetta 20ms per riempire il batch
props.put("compression.type", "snappy"); # Snappy: buon bilanciamento velocita/ratio
props.put("buffer.memory", 67108864);    # 64 MB buffer totale

# Configurazioni consumer per massimo throughput
props.put("fetch.min.bytes", 65536);     # 64 KB fetch minimo
props.put("fetch.max.wait.ms", 500);     # Aspetta max 500ms se non ci sono dati
props.put("max.partition.fetch.bytes", 10485760);  # 10 MB per fetch per partizione
props.put("max.poll.records", 500);      # 500 record per poll()

# Per sistemi a bassa latenza (trading, real-time alerts):
props.put("linger.ms", 0);              # Invia immediatamente
props.put("batch.size", 1);             # Nessun batching
props.put("acks", "1");                 # Solo leader ack (senza aspettare ISR)

概要: 運用中の Kafka のチェックリスト

  • 最低 3 つのブローカーを備えた KRaft クラスター (高可用性の場合は 5 つ以上)
  • replication.factor=3, min.insync.replicas=2 すべての重要なトピックについて
  • auto.create.topics.enable=false: トピックを明示的に管理する
  • JVM ヒープ 6 ~ 8 GB、G1GC 構成、残りの RAM はページ キャッシュ用
  • OS とは別の専用ディスク (NVMe SSD) にログオンします
  • モニタリング: JMX Exporter + Prometheus + Grafana (第 9 条を参照)
  • アラート対象: オフライン パーティション、レプリケーションが不十分なパーティション、コンシューマ ラグ
  • DLQ は消費者グループごとに構成されます (記事 10 を参照)
  • MirrorMaker 2 によるジオレプリケーションとセカンダリ データセンターでの DR
  • ローリング再起動手順は文書化され、定期的にテストされます
  • 成長傾向に基づいて四半期ごとに見直されるキャパシティ計画

シリーズ終了: 次のステップ

Apache Kafka シリーズをすべて完了しました。関連トピックについて詳しく知る方法は次のとおりです。

  • イベント駆動型アーキテクチャ (シリーズ 39): EDA パターンのバックボーンとして Kafka を適用します。 複雑なマイクロサービス システムの Saga、CQRS、および Outbox。
  • PostgreSQL AI と Debezium CDC: Kafka をパイプラインとして使用して変更を伝播します。 データベースから下流のサービスまでリアルタイムで送信します。

他シリーズとの連携

  • イベント駆動型アーキテクチャ – Saga パターンと CQRS: メッセージブローカーとしての Kafka Saga パターンと CQRS プロジェクションをマイクロサービス システムに実装します。
  • PrometheusとGrafanaによる監視(第9条): 所有しているクラスター メトリック このガイドで設定されているものは、JMX Exporter を介して公開され、Grafana に表示されます。