Kafka가 전통적인 Coda와 다른 이유

실시간으로 이벤트 스트림을 처리해야 하는 분산 시스템을 설계할 때 첫 번째 유혹은 클래식 메시지 대기열을 사용하는 것입니다. RabbitMQ나 ActiveMQ 같은 거죠. 이러한 솔루션은 간단한 시나리오에 적합하지만 중요한 구조적 제한이 있습니다. 메시지를 소비하면 메시지가 삭제됩니다. 그것을 다시 읽을 가능성도 없고, 그것을 처리하는 더 독립적인 소비자가 있을 가능성도 없습니다. 다르게 하거나 사건의 전체 기록을 재생합니다.

아파치 카프카 2011년 LinkedIn에서 근본적으로 다른 철학을 가지고 태어났습니다. 기록)는 다음과 같이 작성됩니다. 로그 추가 전용 구성 가능한 기간(기본값: 7일) 동안 유지됩니다. 서로 다른 소비자가 때때로 동일한 레코드를 읽을 수 있음 각각은 서로 다른 위치를 통해 위치를 추적합니다.오프셋. 이 패턴은 Kafka를 대기열 이상의 기능으로 전환합니다. 된다 진실의 근원 시스템의 전체 이벤트 기록에 대해.

2026년의 카프카: 주요 숫자

  • 이상 사용됨 80% 스트리밍 사용 사례에 대한 Fortune 500대 기업 중
  • Kafka 4.0(2025년 3월)은 ZooKeeper를 영구적으로 제거하고 크라프트
  • 이론적인 처리량: 초당 1백만 개 이상의 메시지 브로커용(상용 하드웨어)
  • Confluent Cloud: AWS, GCP, Azure에서 지연 시간이 10ms 미만인 관리형 Kafka p99
  • 생태계: Kafka Connect, Kafka Streams, Apache Flink 통합을 통한 200개 이상의 커넥터

기본 모델: 브로커, 주제 및 파티션

브로커: 클러스터 노드

Un 브로커 그것은 단순히 Kafka 서버입니다. Kafka 클러스터는 하나 이상의 브로커로 구성되며 각각은 broker.id 고유한. 프로덕션에서는 내결함성을 보장하기 위해 일반적으로 3, 6 또는 9개의 브로커가 사용됩니다. 브로커가 글쓰기를 처리합니다. 레코드 읽기, 디스크에 로그 유지, 노드 간 복제 등이 있습니다.

Kafka 4.0과 새로운 KRaft 방식을 사용하면 하나 이상의 브로커가 다음 역할을 맡습니다. 제어 장치, 클러스터 메타데이터 관리 (어떤 파티션의 리더는 누구인지, 어떤 브로커가 활성화되어 있는지 등) 내부 Raft 합의 로그를 통해. 더 이상 필요하지 않습니다. 별도의 ZooKeeper 앙상블입니다.

주제: 기록의 논리적 범주

Un 주제 생산자가 레코드를 게시하고 소비자가 이를 읽는 데 사용되는 논리적 이름입니다. 주제별 채널이라고 생각하면 됩니다. ordini-effettuati, pagamenti-confermati, eventi-utente. 각 주제에는 보존을 위한 자체 구성이 있습니다. 파티션 수, 복제 요소 및 압축 정책.

주제는 다음과 같습니다 분할된: 각 주제는 N개의 물리적 파티션으로 나누어져 브로커에 분산됩니다. 이 배포판입니다 이는 Kafka를 쓰기와 읽기 모두에 대해 수평으로 확장 가능하게 만듭니다.

분할: 병렬성과 순서의 통일성

에이 분할 이는 깔끔하고 불변의 추가 전용 로그입니다. 파티션에 기록된 각 레코드는 오프셋 단조롭게 증가합니다(0, 1, 2, ...). 정렬이 보장됩니다 파티션 내부, 서로 다른 파티션 사이가 아닙니다.

파티션 간의 레코드 분포는 다음과 같이 결정됩니다. 파티션 키: 생산자가 키를 지정하면 레코드는 항상 동일한 파티션(키 모듈러스 파티션 수의 해시)으로 이동하여 해당 키에 대한 정렬을 보장합니다. 열쇠가 누락된 경우, Kafka는 고정 라운드 로빈 전략(회전 전 동일한 파티션에 일괄 기록)을 사용합니다.

# Creare un topic con 6 partizioni e replication factor 3
# (Kafka 4.0 con KRaft, niente --zookeeper flag)
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati \
  --partitions 6 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config min.insync.replicas=2

# Descrivere il topic per verificare la distribuzione
kafka-topics.sh --describe \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati

의 출력 --describe 각 파티션에 대해 리더 브로커, 복제본 및 동기화 복제본(ISR — 동기화 복제본). ISR은 리더의 모든 레코드를 복제한 복제본입니다. 리더가 떨어지면 하나의 ISR만 복제될 수 있습니다. 새로운 리더로 선출되어 데이터 손실이 발생하지 않습니다.

프로듀서: Kafka로 기록 쓰기

Il 생산자 주제에 대한 기록을 게시하는 구성 요소입니다. 생산자 구성에 따라 배송 보장이 결정됩니다. 가장 중요한 속성은 다음과 같습니다.

  • bootstrap.servers: 초기 연결을 위한 브로커 목록(클라이언트가 자동으로 다른 브로커를 검색함)
  • key.serializer e value.serializer: 키와 값을 직렬화하는 방법(StringSerializer, AvroSerializer 등)
  • acks: 쓰기 성공을 고려하기 전에 기다려야 할 응답 확인 횟수(0, 1, all)
  • retries: 일시적인 오류 발생 시 시도 횟수
  • linger.ms: 배치를 보내기 전에 대기하는 시간(밀리초)(대기 시간을 희생하여 처리량 증가)
  • batch.size: 최대 배치 크기(바이트)(기본값: 16KB)
// Producer Java con configurazione production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class OrdineProducer {

    public static KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

        // Garanzie di consegna: all = acks da tutti le ISR
        props.put("acks", "all");
        // Retry automatico con backoff
        props.put("retries", 3);
        props.put("retry.backoff.ms", 100);
        // Batching per throughput
        props.put("linger.ms", 5);
        props.put("batch.size", 32768);   // 32KB
        // Compressione: riduce I/O di rete del 60-80%
        props.put("compression.type", "snappy");
        // Idempotenza: evita duplicati in caso di retry
        props.put("enable.idempotence", true);

        return new KafkaProducer<>(props);
    }

    public static void inviaOrdine(KafkaProducer<String, String> producer,
                                    String ordineId, String payload) {
        // La chiave (ordineId) determina la partizione target
        ProducerRecord<String, String> record =
            new ProducerRecord<>("ordini-effettuati", ordineId, payload);

        // Invio asincrono con callback
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.err.println("Errore invio: " + exception.getMessage());
            } else {
                System.out.printf("Record inviato: topic=%s, partizione=%d, offset=%d%n",
                    metadata.topic(), metadata.partition(), metadata.offset());
            }
        });
    }
}

주의: Ack 및 처리량

보증을 늘리면 비용이 발생합니다. acks=all e min.insync.replicas=2, 생산자는 최소한 2개의 응답을 기대합니다. 계속하기 전에 기록을 작성했습니다. 이렇게 하면 대기 시간(일반적으로 1~5ms 추가)이 추가되지만 데이터 손실도 방지됩니다. 브로커가 확인 후 즉시 삭제되는 경우. 일부 손실을 허용하는 분석 시스템의 경우 acks=1 o acks=0 훨씬 더 높은 처리량을 제공합니다.

소비자: Kafka의 기록 읽기

폴링 루프

Kafka 소비자는 템플릿을 사용합니다. 당기다: 푸시 메시지를 수신하지 않고 호출을 통해 브로커에게 적극적으로 요청합니다. poll(). 이 디자인은 소비자가 자신의 용량을 초과하는 메시지의 양으로 인해 압도당하지 않도록 보장합니다. 처리의.

// Consumer Java base con gestione degli offset manuale
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

public class OrdineConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
        props.put("group.id", "servizio-inventario");
        props.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");

        // Comportamento alla prima lettura (nessun offset salvato per il gruppo)
        // "earliest" = dall'inizio; "latest" = solo nuovi messaggi
        props.put("auto.offset.reset", "earliest");

        // Disabilitiamo il commit automatico per controllo preciso
        props.put("enable.auto.commit", false);

        // Timeout max per il join al consumer group (default: 45s)
        props.put("session.timeout.ms", 30000);

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(List.of("ordini-effettuati"));

            while (true) {
                ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Offset: %d | Partizione: %d | Chiave: %s%n",
                        record.offset(), record.partition(), record.key());

                    // Elabora il record...
                    elaboraOrdine(record.value());
                }

                // Commit manuale DOPO l'elaborazione
                // Garantisce at-least-once semantics
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        }
    }

    private static void elaboraOrdine(String payload) {
        // logica di business...
    }
}

소비자 그룹: 확장 메커니즘

Il 소비자 그룹 이는 소비를 병렬로 확장하기 위한 기본 메커니즘입니다. 모든 소비자가 공유 같은 group.id 그들은 동일한 그룹의 일부이며 주제 파티션을 나눕니다. 규칙은 간단합니다. 각 파티션은 그룹당 한 번에 하나의 소비자에게만 할당될 수 있습니다..

이는 그룹의 최대 병렬 소비자 수가 파티션 수와 동일함을 의미합니다. 6개의 파티션이 있는 경우 동일한 그룹에서 6명의 소비자를 시작하면 각각 정확히 1개의 파티션을 얻습니다. 7번째 소비자를 시작하면 휴면 상태로 유지됩니다. 대기 중(빠른 장애 조치에 유용함)

소비자 그룹: 확장 시나리오

  • 소비자 1명, 파티션 6개 → 소비자가 모든 것을 처리하며 병렬성이 없습니다.
  • 소비자 3개, 파티션 6개 → 각 소비자는 2개의 파티션을 병렬로 관리합니다.
  • 소비자 6개, 파티션 6개 → 최대 병렬성, 소비자당 파티션 1개
  • 소비자 9개, 파티션 6개 → 장애 조치를 위해 활성 6개, 대기 3개
  • 서로 다른 두 그룹, 동일한 주제 → 각 그룹은 모든 메시지를 독립적으로 수신합니다.

오프셋: 위치 추적 메커니즘

L'오프셋 파티션 내의 레코드 위치를 고유하게 식별하는 정수입니다. 브로커는 기록된 각 레코드에 순차적으로 오프셋을 할당합니다. 첫 번째 레코드의 오프셋은 0이고 두 번째 레코드의 오프셋은 1입니다.

소비자 그룹은 자체적으로 저장합니다. 커밋된 오프셋 — 즉, 마지막으로 성공적으로 처리된 레코드의 오프셋 — 라는 특별한 내부 Kafka 주제에서 __consumer_offsets. 다시 시작하는 경우의 시작점입니다. 또는 소비자 장애 조치.

오류 처리에는 이러한 오프셋 간의 차이를 이해하는 것이 중요합니다.

  • 로그 끝 오프셋(LEO): 로그에 기록될 다음 레코드의 오프셋(헤드 위치)
  • 하이 워터마크(HW): 모든 ISR에 걸쳐 복제된 마지막 레코드의 오프셋(소비자는 레코드 ≤ HW만 볼 수 있음)
  • 현재 오프셋: 소비자가 다음 poll() 호출에서 읽을 다음 레코드의 오프셋
  • 커밋된 오프셋: 토픽에 저장된 오프셋 __consumer_offsets (충돌 후 다시 시작)
  • 소비자 지연: LEO와 커밋된 오프셋의 차이는 소비자가 여전히 처리해야 하는 레코드 수를 나타냅니다.
# Controllare il consumer lag di un gruppo
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --describe \
  --group servizio-inventario

# Output tipico:
# GROUP              TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# servizio-inventario ordini-effettuati    0          1250            1280            30
# servizio-inventario ordini-effettuati    1          890             890             0
# servizio-inventario ordini-effettuati    2          2100            2105            5

# Resettare gli offset al principio (per reprocessing)
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --group servizio-inventario \
  --topic ordini-effettuati \
  --reset-offsets --to-earliest \
  --execute

복제: 내구성 및 내결함성

각 파티션에는 지도자 그리고 0개 이상 추종자 (복제본). 생산자와 소비자가 소통한다 언제나 리더와 함께. 팔로어는 비동기식이지만 일반적으로 빠른 방식으로 리더의 데이터를 복제합니다.

리더가 될 만큼 "충분히 업데이트된" 추종자 집단이 조직을 형성합니다.ISR(동기화 복제본). 팔로어가 다음 이상 뒤처지면 ISR에서 제거됩니다. replica.lag.time.max.ms 밀리초(기본값: 30초) 리더가 떨어지면 Kafka 컨트롤러는 ISR 중에서 오프셋이 가장 높은 팔로어를 새 리더로 선출합니다.

의 조합 replication.factor e min.insync.replicas 내구성과 가용성 간의 균형을 정의합니다.

# Configurazione consigliata per produzione
# replication.factor=3 significa: 1 leader + 2 follower

# topic-level overrides
kafka-topics.sh --alter \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati \
  --config min.insync.replicas=2

# Con questa configurazione:
# - acks=all: producer aspetta conferma da leader + 1 follower minimo
# - Se 2 broker su 3 sono down: il cluster rifiuta scritture (ma no data loss)
# - Se solo 1 broker è down: il cluster continua normalmente

# broker-level defaults in server.properties
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3

보존 정책: 데이터가 유지되는 기간

Kafka의 기록은 주제별로 구성할 수 있는 정책에 따라 제거됩니다. 두 가지 주요 모드가 있습니다:

  • 시간 기반 보존 (retention.ms): 타임스탬프로부터 일정 기간이 지나면 레코드가 삭제됩니다. 기본값: 604800000ms = 7일. 감사 로그와 같은 중요한 주제의 경우 훨씬 더 높은 값(년)이 설정됩니다.
  • 크기 기반 보존 (retention.bytes): 파티션당 로그가 특정 크기를 초과하지 않습니다. 크기가 한도를 초과하면 오래된 세그먼트가 삭제됩니다.
  • 로그 압축 (cleanup.policy=compact): 시간/크기별로 삭제하는 대신 Kafka는 각 키의 마지막 레코드만. 상태 주제(예: CDC를 통해 복제된 데이터베이스 테이블)에 적합합니다.
# Configurare retention per diversi use case

# Topic eventi real-time: retention breve, alta velocità
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic click-stream \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=3600000 \   # 1 ora
  --config retention.bytes=1073741824  # 1GB per partizione

# Topic log audit: retention lunga per compliance
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic audit-log \
  --partitions 3 \
  --replication-factor 3 \
  --config retention.ms=31536000000 \  # 1 anno
  --config compression.type=gzip

# Topic di stato con log compaction (es. profili utente)
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic profili-utente \
  --partitions 6 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5

소비자 Python: 실제 사례

Kafka 생태계는 다양한 언어를 지원합니다. 다음은 라이브러리를 사용하는 Python 예제입니다. confluent-kafka (librdkafka를 기반으로 한 공식 Confluent 바인딩, kafka-python보다 성능이 훨씬 뛰어남):

# pip install confluent-kafka

from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import signal
import sys

TOPIC = "ordini-effettuati"
GROUP_ID = "servizio-analytics-py"

config = {
    "bootstrap.servers": "kafka1:9092,kafka2:9092",
    "group.id": GROUP_ID,
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    "session.timeout.ms": 30000,
    "max.poll.interval.ms": 300000,  # 5 minuti per elaborazioni lente
}

consumer = Consumer(config)
running = True

def graceful_shutdown(signum, frame):
    global running
    running = False

signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)

try:
    consumer.subscribe([TOPIC])
    print(f"Consumer avviato, gruppo: {GROUP_ID}")

    while running:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # Raggiunta la fine della partizione, aspetta nuovi messaggi
                print(f"Raggiunto EOF: {msg.topic()} [{msg.partition()}] offset {msg.offset()}")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            ordine = json.loads(msg.value().decode("utf-8"))
            print(f"Ricevuto ordine {ordine['id']} da partizione {msg.partition()}")

            # Elabora l'ordine...
            elabora_ordine(ordine)

            # Commit manuale dopo elaborazione riuscita
            consumer.commit(asynchronous=False)

finally:
    consumer.close()
    print("Consumer chiuso correttamente")

def elabora_ordine(ordine):
    # Logica di business...
    pass

권장 아키텍처: 파티션은 몇 개입니까?

Kafka를 시작하는 사람들이 가장 자주 묻는 질문 중 하나는 주제에 대해 얼마나 많은 파티션을 생성해야 합니까?입니다. 대답은 여러 가지 요인에 따라 달라집니다.

  • 소비자 그룹의 최대 병렬성: 파티션 수는 병렬 소비자의 최대 수입니다. 피크에 있을 것으로 예상되는 소비자 수를 추정합니다.
  • 목표 처리량: 각 파티션은 일반적으로 10-50MB/s 쓰기를 처리할 수 있습니다(디스크에 따라 다름). 필요한 최소 파티션 수를 얻으려면 총 처리량을 이 수치로 나눕니다.
  • 정렬: 특정 키에 대한 순서를 보장해야 하는 경우(예: 동일한 고객의 모든 이벤트) 해당 클라이언트는 항상 동일한 파티션에 있게 됩니다. 더 많은 파티션 = 다양한 키에 대한 더 나은 로드 분산.
  • 메모리 오버헤드: 각 파티션에는 브로커에 메모리가 필요합니다(~1-2MB 오버헤드). 총 100K 파티션으로, 큰 타격을 받기 시작했습니다.

파티션에 대한 실제 규칙

대략적인 공식: max(throughput_MB_s / 10, consumer_max_paralleli). 대부분의 애플리케이션의 경우, 6, 12 또는 24개의 파티션이 합리적인 값입니다. Kafka를 사용하면 나중에 파티션을 늘릴 수 있지만 그들을 감소시키지 않기 위해: 약간의 여유를 가지고 계획하세요.

로그 압축: 상태 주제에 대한 사용 사례

La 로그 압축 의미를 완전히 바꾸는 Kafka의 고급 기능입니다. 보존: 시간이나 크기별로 레코드를 삭제하는 대신 Kafka는 레코드만 보존합니다.각 키의 마지막 기록. 동일한 키를 가진 모든 이전 레코드는 압축 프로세스 중에 삭제됩니다.

이는 압축된 주제를 표현하는 데 이상적입니다. 현재 상태 엔터티 수: 사용자 프로필, 현재 가격, 시스템 구성, 재고. 주제에 연결하는 소비자 처음으로 압축되어 존재하는 모든 레코드를 읽어 완전한 상태를 재구성할 수 있습니다. (키당 하나), 이벤트의 전체 기록을 읽지 않고도 가능합니다.

가치가 있는 기록 null ("삭제 기록")은 주제에서 키를 삭제하는 방법입니다. 압축됨: 압축 후에 키 자체도 로그에서 사라집니다.

# Creare un topic con log compaction
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic profili-utente \
  --partitions 6 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5 \
  --config segment.ms=86400000 \
  --config delete.retention.ms=86400000

# cleanup.policy=compact: abilita compaction
# min.cleanable.dirty.ratio=0.5: compatta quando >50% del log e' "dirty"
# segment.ms=86400000: crea un nuovo segmento ogni 24h
# delete.retention.ms: quanto tenere i tombstone record prima di eliminarli

# Inviare un aggiornamento profilo (chiave = userId)
kafka-console-producer.sh \
  --bootstrap-server kafka1:9092 \
  --topic profili-utente \
  --property parse.key=true \
  --property key.separator=:
# Digita: user123:{"nome":"Mario","email":"mario@example.com","eta":30}
# Digita: user456:{"nome":"Anna","email":"anna@example.com","eta":25}
# Digita: user123:{"nome":"Mario","email":"mario.rossi@example.com","eta":31}
# Dopo compaction, nel topic rimane solo l'ultima riga per user123

Kafka 내부 주제: __consumer_offsets 및 __transaction_state

Kafka는 내부적으로 특수 주제를 사용하여 상태를 관리합니다. 이를 아는 것은 작동 방식을 이해하는 데 도움이 됩니다. 시스템의 문제를 해결하려면:

  • __consumer_offsets: 각 소비자 그룹의 커밋된 오프셋을 저장합니다. 기본적으로 50개의 파티션이 있습니다(offsets.topic.num.partitions). 소비자 그룹이 할당됩니다. group.id의 해시를 통해 파티션에. 이 주제에 복제 문제가 있는 경우 소비자 그룹 오프셋 커밋에 실패합니다.
  • __transaction_state: 진행 중인 거래의 상태를 관리합니다. 정확히 한 번 의미 체계를 보장하기 위해 Kafka 트랜잭션 API에서 사용됩니다. 기본적으로 50개의 파티션이 있습니다.
  • @metadata (KRaft에만 해당): 쿼럼 컨트롤러 메타데이터 로그입니다. 모든 클러스터 메타데이터(주제, 파티션, 브로커, ACL, 구성)를 포함합니다. 컨트롤러 내부에서만 접근 가능합니다.
# Ispezionare il topic __consumer_offsets (advanced troubleshooting)
# ATTENZIONE: operazione read-only, non modificare mai questi topic

kafka-console-consumer.sh \
  --bootstrap-server kafka1:9092 \
  --topic __consumer_offsets \
  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
  --from-beginning \
  --max-messages 20

# Output esempio:
# [servizio-inventario,ordini-effettuati,0]::OffsetAndMetadata(offset=1250, ...)
# [servizio-inventario,ordini-effettuati,1]::OffsetAndMetadata(offset=890, ...)

# Elencare tutti i consumer group attivi
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --list

# Dettaglio di un gruppo specifico
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --group servizio-inventario \
  --describe \
  --state  # include stato del gruppo (Stable, Rebalancing, Empty, Dead)

메시지 헤더 및 타임스탬프

각 Kafka 레코드는 정확한 구조를 가지고 있습니다.

  • 열쇠 (선택 사항): 바이트 단위로 직렬화된 대상 파티션을 결정합니다.
  • Value: 바이트 단위로 직렬화된 메시지 페이로드
  • 타임스탬프: 생산자 측 생성 시간(CreateTime) 또는 브로커측 수집(LogAppendTime), 구성 가능
  • 헤더: 메타데이터의 키-값 쌍(상관 ID, 이벤트 유형, 스키마 버전 등)
  • 파티션 + 오프셋: 작성 당시 브로커에 의해 할당됨
// Aggiungere headers a un ProducerRecord Java
ProducerRecord<String, String> record = new ProducerRecord<>(
    "ordini-effettuati",
    ordineId,
    payload
);

// Headers per tracciabilità e versioning
record.headers()
    .add("correlation-id", UUID.randomUUID().toString().getBytes())
    .add("schema-version", "2".getBytes())
    .add("source-service", "checkout-service".getBytes())
    .add("event-type", "OrdineCreato".getBytes());

producer.send(record);

Docker Compose: 로컬 개발을 위한 빠른 시작

복잡한 구성을 다루지 않고 로컬에서 Kafka 실험을 시작하려면, 가장 빠른 방법은 공식 Apache Kafka 4.0 이미지와 함께 Docker Compose를 사용하는 것입니다.

# docker-compose.yml minimale per sviluppo locale (single-node KRaft)
version: "3.9"
services:
  kafka:
    image: apache/kafka:4.0.0
    container_name: kafka-local
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_LISTENERS: "PLAINTEXT://kafka-local:9092,CONTROLLER://kafka-local:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-local:9093"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      CLUSTER_ID: "local-dev-cluster-id-001"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

# Avvio:
# docker-compose up -d
#
# Verifica:
# docker exec kafka-local kafka-topics.sh --bootstrap-server localhost:9092 --list

요약: 기본 개념

Kafka의 내부 구조를 살펴본 후 기억해야 할 주요 개념을 요약하면 다음과 같습니다.

  • 브로커: 클러스터 노드, 디스크 로그 및 복제 관리
  • 주제: 파티션으로 구분된 레코드의 논리적 범주
  • 분할: 정렬된 추가 전용 로그, 병렬 처리 단위; 내부에서만 주문 보장
  • 오프셋: 파티션 내 각 레코드의 점진적인 위치
  • 소비자 그룹: 스케일링 메커니즘; 그룹당 하나의 소비자에게만 할당된 각 파티션
  • ISR: 업데이트된 복제본 집합으로, 오류가 발생할 경우 새 리더가 선택됩니다.
  • 소비자 지연: 심각한 상태 표시기, LEO와 커밋된 오프셋 간의 차이
  • 보유: 기록은 구성 가능한 상태로 유지되며 사용 후에도 삭제되지 않습니다.

시리즈의 다음 단계

이제 탄탄한 기초를 갖추었으므로 시리즈의 다음 기사에서는 보다 고급 측면을 살펴보겠습니다.

  • 기사 2 - Kafka 4.0의 KRaft: ZooKeeper 없이 새 컨트롤러가 어떻게 작동하는지, Kafka 3.x의 마이그레이션 프로세스와 프로덕션에서의 운영상의 이점.
  • 제3조 - 고급 생산자 및 소비자: 세부 구성 acks, retries, max.in.flight.requests 생산자 수준에서 정확히 한 번만 보장하는 멱등적 생산자입니다.
  • 4조 - 정확히 1회 의미론: 여러 주제에 대한 원자적 쓰기를 위한 Kafka 트랜잭션, 트랜잭션 코디네이터 및 처리량에 미치는 영향

다른 시리즈와의 연계

  • 관찰 가능성 및 OpenTelemetry: OpenTelemetry로 Kafka 애플리케이션을 계측하는 방법 생산자와 소비자 간의 이벤트 전파를 추적합니다.
  • 플랫폼 엔지니어링: 내부 개발자 플랫폼의 기본 구성 요소인 Kafka 팀 간의 이벤트 기반 커뮤니케이션을 위해.
  • 포스트그레SQL AI: PostgreSQL을 동기화하기 위한 Debezium을 사용한 CDC(Change Data Capture) 패턴 이 시리즈의 7번째 주제인 Kafka에 실시간으로 전송됩니다.