세 가지 배송 보장

설정을 시작하기 전에 Kafka가 지원하는 세 가지 전달 모드를 이해하는 것이 중요합니다. 실제적인 의미는 다음과 같습니다.

  • 최대 1회: 메시지는 재시도 없이 한 번만 전송됩니다. 브로커가 이를 받지 못하는 경우, 그것은 영구적으로 손실됩니다. 중복이 없으며 데이터 손실이 발생할 수 있습니다. 중요하지 않은 측정항목에는 허용됩니다. 디버그 로그, 일부 이벤트의 손실이 허용되는 클릭 스트림 이벤트.
  • 적어도 한 번: 오류가 있으면 생산자가 다시 시도합니다. 메시지는 적어도 한 번 도착합니다. 그러나 브로커가 이를 수신했지만 시간 초과 전에 확인을 보내지 않은 경우 여러 번(중복) 도착할 수 있습니다. 중복을 처리하려면 소비자가 멱등원이어야 합니다. 프로덕션에서 가장 일반적인 시나리오입니다.
  • 정확히 한 번: 메시지는 손실이나 중복 없이 정확히 한 번 처리됩니다. 멱등성 생산자(정확히 한 번 생산자 측의 경우) 또는 Kafka 트랜잭션(정확히 한 번만 발생하는 경우)이 필요합니다. 생산자와 소비자 사이의 엔드 투 엔드). 상당한 성능 오버헤드.

황금률

분산 시스템에서는 오버헤드 없이 정확히 1회를 보장하는 것이 불가능합니다. 시스템의 90% 미국 생산의 카프카 적어도 한 번은 멱등성 소비자 사용(소비자 측 중복 제거 데이터베이스 또는 캐시를 통해). Kafka 트랜잭션을 통한 정확히 한 번은 금융 파이프라인, 청구에 사용됩니다. 시스템이나 어느 곳에서나 이벤트가 중복되면 실제 피해가 발생합니다.

acks 매개변수: 대기할 확인 횟수

매개변수 acks 생산자는 먼저 수신을 승인해야 하는 복제본 수를 정의합니다. 생산자는 요청이 완료된 것으로 간주합니다.

acks=0(실행 후 잊어버리기)

생산자는 레코드를 보내고 브로커의 응답을 기다리지 않습니다. 최대 처리량, 최소 대기 시간, 그러나 보장은 없습니다. 브로커가 다운되거나 먼저 브로커가 다운된 브로커에게 기록이 기록되는 경우 답장을 보내면 기록이 손실됩니다. 의미론은 다음과 같습니다. 최대 한 번.

// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record

acks=1(리더만)

생산자는 파티션의 선두 브로커로부터만 확인을 기다립니다. 팔로어가 아직 없을 수도 있습니다. 확인이 도착하면 기록을 복제했습니다. 리더가 확인 메시지를 보낸 후 즉시 하락하는 경우 그러나 추종자들이 응답하기 전에 기록이 손실됩니다. 의미론 적어도 한 번은 매우 짧은 오류 창에서 데이터 손실 위험이 있습니다.

// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);

// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
//   -> possibile duplicato se broker ha ricevuto ma non risposto

acks=all(또는 acks=-1): 모든 ISR

프로듀서는 레코드가 모든 레코드에 기록될 때까지 기다립니다. ISR(동기화 복제본) 파티션의. 와 함께 min.insync.replicas=2 e replication.factor=3, 이는 적어도 답글 2개(리더 + 팔로어 1명)가 확인해야 합니다. 그래야만 생산자가 ack를 받습니다. 의미론 적어도 한 번은 최소한 데이터 손실 없이 min.insync.replicas 브로커가 활발합니다.

// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all");  // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000);  // attende fino a 60s se il broker è congestionato

// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)

// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova

재시도의 중복 문제

이 시나리오를 고려해보세요. acks=all e retries=3:

  1. 생산자는 R1 레코드를 리드 브로커에게 보냅니다.
  2. 브로커는 R1을 디스크에 쓰고 ack를 생산자에게 보냅니다.
  3. ack가 손실되었습니다(네트워크가 생산자에게 도달하기 전에 시간 초과됨).
  4. Ack를 받지 못한 생산자가 입장합니다. request.timeout.ms, 글쓰기가 실패했다고 생각합니다
  5. 생산자는 다시 시도하고 R1을 다시 보냅니다.
  6. 브로커는 R1을 두 번째로 수신하고 이를 별도의 레코드로 기록합니다.
  7. 이제 주제에 중복된 R1이 포함되어 있습니다.

이는 at-least-once의 올바른 동작입니다. 각 데이터는 최소한 한 번 도착하지만 중복된 데이터도 도착할 수 있습니다. 생산자 수준에서 중복을 제거하려면 다음을 사용합니다.멱등성 생산자.

멱등성 생산자

Kafka 0.11(2017)에 도입된 멱등성 생산자는 생산자 재시도로 인한 중복을 제거합니다. 메커니즘은 두 가지 개념을 기반으로 합니다.

  • 생산자 ID(PID): 멱등성 생산자가 연결되면 브로커는 고유한 PID를 할당합니다. PID는 생산자의 수명 동안 지속됩니다. 생산자가 다시 시작되면 새 PID를 얻습니다.
  • 시퀀스 번호: 전송된 각 레코드에는 단조롭게 증가하는 시퀀스 번호(0, 1, 2, ...)가 포함됩니다. 브로커는 각 PID+파티션에 대해 수신된 마지막 시퀀스 번호를 추적합니다. 음반이 도착하면 시퀀스 번호가 이미 표시된 경우 자동으로 삭제됩니다(브로커 측에서 중복 제거).
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Abilitare idempotenza
props.put("enable.idempotence", true);

// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException

// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)

멱등성 생산자의 한계

멱등성 생산자는 모든 레코드가 나타나는지 확인합니다. 정확히 한 번 카프카 로그에서 프로듀서의 현재 세션에 대해. 생산자가 다시 시작되면(새 PID), 진행 중인 기록 다시 쓸 수 있습니다. 그리고 무엇보다도 소비자 처리에 대해서는 아무 것도 보장하지 않습니다. 소비자가 레코드를 처리한 후 오프셋을 커밋하기 전에 충돌이 발생하는 경우 다음 시작 시 동일한 레코드를 다시 처리합니다. 정확히 한 번만 엔드투엔드를 수행하려면 트랜잭션 API가 필요합니다.

max.in.flight.requests.per.connection 및 정렬

매개변수 max.in.flight.requests.per.connection (MIFR)은 생산 요청 수를 제어합니다. 그들은 동시에 단일 브로커로 날아갈 수 있습니다. 정렬에 결정적인 영향을 미칩니다. 재시도 시 메시지 수:

  • MIFR=1: 다른 요청을 보내기 전에 각 요청을 확인해야 합니다. 정렬 보장, 그러나 처리량이 감소합니다(파이프라인 없음).
  • 멱등성이 없는 MIFR > 1: 파이프라인이 활성화되어 처리량이 높지만 배치 N이 실패하는 경우 배치 N-1이 이미 실행 중입니다. N을 재시도한 후 레코드는 N-1, N 순서로 나타납니다. N-1보다 앞서야 했습니다. 정렬이 더 이상 보장되지 않습니다.
  • MIFR ≤ 5(멱등성 있음): 와 함께 enable.idempotence=true, 카프카는 보장한다 시퀀스 번호 덕분에 최대 5개의 요청이 진행 중인 경우에도 정렬이 가능합니다. 기본값입니다 멱등성이 활성화되고 보증을 유지하기 위해 최대 지원되는 경우.
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1);  // nessun pipelining
// Througput limitato ma ordine garantito

// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry

// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer

생산을 위한 완전한 생산자 구성

// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class ProducerFactory {

    /**
     * Crea un producer ottimizzato per throughput elevato
     * con garanzie at-least-once e idempotenza abilitata.
     */
    public static KafkaProducer<String, byte[]> createHighThroughputProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        // Connessione
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Durabilità e idempotenza
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // acks=all e retries=MAX impostati automaticamente

        // Batching: aspetta fino a 10ms per accumulare records
        // Riduce numero di richieste al broker, aumenta throughput
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     // 64KB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale

        // Compressione: snappy è bilanciata tra CPU e ratio
        // lz4 per massima velocità; zstd per massimo ratio
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // Timeout: se il buffer è pieno, blocca fino a max.block.ms
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);

        // Request timeout: quanto aspettare risposta dal broker
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

        // Delivery timeout: timeout totale inclusi tutti i retry
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

        return new KafkaProducer<>(props);
    }

    /**
     * Crea un producer per eventi critici (financial, billing)
     * con latenza minima e massime garanzie.
     */
    public static KafkaProducer<String, byte[]> createLowLatencyProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Massima durabilità
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Nessun batching: invia immediatamente ogni record
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);

        // Nessuna compressione: elimina latenza CPU
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");

        return new KafkaProducer<>(props);
    }
}

고급 소비자: 커밋 및 재처리

자동 커밋: 간단하지만 위험함

와 함께 enable.auto.commit=true (기본값) Kafka는 자동으로 오프셋을 커밋합니다. auto.commit.interval.ms 밀리초(기본값: 5000ms = 5초) 문제: 커밋은 레코드가 실제로 처리되는 시점에 관계없이 백그라운드에서 발생합니다.

자동 커밋에 문제가 있는 시나리오:

  1. Poll은 오프셋 1-100을 포함하는 100개의 레코드를 반환합니다.
  2. 소비자가 기록 처리를 시작합니다.
  3. 5초 후에 타이머가 시작됩니다. 오프셋 100이 자동으로 커밋됩니다.
  4. 1-60 레코드만 처리한 후 소비자가 충돌합니다.
  5. 재부팅 시 소비자는 오프셋 100에서 시작합니다. 레코드 61-100을 건너뛰었습니다(데이터 손실).

동기식 수동 커밋

// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");

// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);

// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000);  // 5 minuti

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

try {
    while (true) {
        ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record : records) {
            elaboraOrdine(record);
        }

        // commitSync() bloca fino a conferma dal broker
        // In caso di errore, lancia CommitFailedException
        // Garantisce at-least-once: il commit avviene dopo l'elaborazione
        consumer.commitSync();
    }
} catch (CommitFailedException e) {
    // Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
    // Gestisci il rebalance e riprendi
    log.error("Commit fallito, probabile rebalance", e);
}

파티션당 커밋(세분성)

// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

while (true) {
    ConsumerRecords<String, String> records =
        consumer.poll(Duration.ofMillis(100));

    // Raggruppa i record per partizione
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    for (ConsumerRecord<String, String> record : records) {
        // Elabora il record
        elaboraOrdine(record);

        // Traccia l'offset per questa partizione
        // IMPORTANTE: commita offset+1 (il prossimo record da leggere)
        offsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
    }

    // Commit di tutti gli offset accumulati
    consumer.commitSync(offsets);
}

재조정: 작동 방식 및 관리 방법

Un 재조정 소비자 그룹이 변경될 때 발생합니다. 소비자가 들어오거나 나가거나 충돌합니다. 리밸런싱 중에는 그룹의 모든 소비자 그들은 읽기를 멈추고(세상을 멈추라) 그룹 코디네이터가 파티션을 다시 할당합니다. 최신 버전의 Kafka(2.4+)에서는 사용할 수 있습니다. 는 협동조합 재균형 (점증적 협력 재조정) 영향을 줄입니다. 담당자를 변경하는 파티션만 취소되고 재할당됩니다.

// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
    Collections.singletonList("ordini-effettuati"),
    new ConsumerRebalanceListener() {

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // Chiamato prima che le partizioni vengano riassegnate
            // Commita gli offset delle partizioni che stiamo per perdere
            log.info("Partizioni revocate: {}", partitions);

            // Commit degli offset non ancora committati per le partizioni revocate
            Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
            for (TopicPartition tp : partitions) {
                if (currentOffsets.containsKey(tp)) {
                    toCommit.put(tp, currentOffsets.get(tp));
                }
            }
            if (!toCommit.isEmpty()) {
                consumer.commitSync(toCommit);
            }
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Chiamato dopo che le nuove partizioni sono state assegnate
            log.info("Partizioni assegnate: {}", partitions);
            // Possiamo inizializzare state locale per le nuove partizioni
        }
    }
);

고급 오프셋 관리 기능을 갖춘 Python의 소비자

from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging

log = logging.getLogger(__name__)

class AdvancedKafkaConsumer:
    """Consumer Kafka con commit manuale e gestione rebalance."""

    def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
        self.topic = topic
        self.pending_offsets: Dict[tuple, int] = {}

        config = {
            "bootstrap.servers": bootstrap_servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            # Cooperative rebalance: meno interruzioni
            "partition.assignment.strategy": "cooperative-sticky",
            "session.timeout.ms": 30000,
            "max.poll.interval.ms": 300000,
        }

        self.consumer = Consumer(config)
        self.consumer.subscribe([topic], on_assign=self._on_assign,
                                 on_revoke=self._on_revoke)

    def _on_assign(self, consumer, partitions):
        log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")

    def _on_revoke(self, consumer, partitions):
        """Commita gli offset prima di perdere le partizioni."""
        log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
        to_commit = []
        for p in partitions:
            key = (p.topic, p.partition)
            if key in self.pending_offsets:
                to_commit.append(
                    TopicPartition(p.topic, p.partition,
                                   self.pending_offsets[key])
                )
        if to_commit:
            consumer.commit(offsets=to_commit, asynchronous=False)

    def process_messages(self, handler_fn, batch_size: int = 100):
        """Loop principale di elaborazione."""
        batch_count = 0

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)

                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() != KafkaError._PARTITION_EOF:
                        log.error(f"Errore Kafka: {msg.error()}")
                    continue

                # Elabora il messaggio
                try:
                    handler_fn(msg)

                    # Traccia l'offset (offset+1 = prossimo da leggere)
                    self.pending_offsets[(msg.topic(), msg.partition())] = \
                        msg.offset() + 1
                    batch_count += 1

                except Exception as e:
                    log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
                    # Qui potresti implementare una DLQ (Dead Letter Queue)
                    # o skippare il record problematico

                # Commit ogni N record
                if batch_count >= batch_size:
                    self._commit_pending()
                    batch_count = 0

        finally:
            # Commit finale prima di chiudere
            self._commit_pending()
            self.consumer.close()

    def _commit_pending(self):
        """Commita tutti gli offset pendenti."""
        if not self.pending_offsets:
            return
        offsets = [
            TopicPartition(topic, partition, offset)
            for (topic, partition), offset in self.pending_offsets.items()
        ]
        self.consumer.commit(offsets=offsets, asynchronous=False)
        self.pending_offsets.clear()
        log.debug(f"Committati {len(offsets)} offset")

성능 조정: 주요 매개변수

생산자 측

매개변수 기본 효과
linger.ms 0 더 큰 배치가 누적될 때까지 Nms를 기다립니다.
batch.size 16384(16KB) 파티션당 최대 배치 크기
buffer.memory 33554432(32MB) 모든 배치에 대한 총 인메모리 버퍼
compression.type 그렇지 않다 없음/gzip/snappy/lz4/zstd

소비자 측

매개변수 기본 효과
fetch.min.bytes 1 가져오기에서 반환할 최소 데이터 크기
fetch.max.wait.ms 500 fetch.min.bytes에 도달하지 못한 경우 최대 대기 시간
max.poll.records 500 단일 폴링에 대한 최대 레코드()
max.partition.fetch.bytes 1048576(1MB) 가져오기당 파티션당 최대 데이터

EmbeddedKafka로 테스트하기

Kafka를 사용하는 테스트 코드에는 사용 가능한 클러스터가 필요합니다. 단위 및 통합 테스트의 경우 스프링 카프카(Spring Kafka) 제공 @EmbeddedKafka 테스트 중에 메모리 내 브로커를 시작합니다.

// Dipendenza Maven
// <dependency>
//   <groupId>org.springframework.kafka</groupId>
//   <artifactId>spring-kafka-test</artifactId>
//   <scope>test</scope>
// </dependency>

import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = {"ordini-test"},
    brokerProperties = {
        "auto.create.topics.enable=false",
        "default.replication.factor=1"
    }
)
class OrdineProducerTest {

    @Autowired
    private OrdineProducer producer;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    void shouldSendOrderSuccessfully() throws Exception {
        // Crea un consumer di test per verificare che il messaggio sia arrivato
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
            "test-group", "true", embeddedKafka
        );
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");

        // Invia un ordine
        producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");

        // Verifica che il messaggio sia stato ricevuto entro 3 secondi
        ConsumerRecords<String, String> records =
            KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));

        assertThat(records.count()).isEqualTo(1);
        ConsumerRecord<String, String> record = records.iterator().next();
        assertThat(record.key()).isEqualTo("order-001");
        assertThat(record.value()).contains("99.99");

        consumer.close();
    }
}

Testcontainers로 테스트하려면(더 현실적으로 실제 Docker 브로커를 사용하세요):

// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
class OrdineProducerIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("apache/kafka:4.0.0")
    );

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }

    @Test
    void testConBrokerReale() {
        // Test con broker Kafka 4.0 reale in Docker
        // Garantisce che il comportamento corrisponda alla produzione
    }
}

시리즈의 다음 단계

고급 생산자와 소비자가 포함되면 가장 복잡한 과제에 직면할 수 있습니다.

  • 4조 - 정확히 1회 의미론: 보안을 위한 Kafka 트랜잭션 트랜잭션 코디네이터 및 처리량에 영향을 미치는 정확한 엔드투엔드 처리입니다. 금융 파이프라인에 필수적입니다.
  • 제5조 - 등록 제도: 스키마 레지스트리와 함께 Avro 및 Protobuf를 사용하는 방법 서로 다른 팀의 생산자와 소비자 간의 스키마 비호환성을 방지합니다.
  • 기사 6 – 카프카 스트림: Streams DSL을 사용하여 Java에서 임베디드 스트림 처리, 이 문서에서 살펴본 것과 동일한 생산자 및 소비자 API를 내부적으로 사용합니다.

다른 시리즈와의 연계

  • 아키텍처(마이크로서비스에서 모듈식 모놀리스까지): Kafka 생산자와 소비자 분산 아키텍처에서 이벤트 중심 패턴을 구체적으로 구현하는 것입니다.
  • 고급 자바: 스레드 안전성을 갖춘 Kafka의 Java API에 대해 자세히 알아보세요. EmbeddedKafka를 사용한 수명주기 관리 및 테스트.