高度な Kafka プロデューサーとコンシューマー: ACK、冪等性、および再試行
Kafka プロデューサーの動作は、次の 3 つのパラメーターに大きく依存します。 acks, retries e
max.in.flight.requests.per.connection。設定が間違っているとデータ損失が発生する可能性があります
または未検出の重複に対して。このガイドでは、実際の構成を使用したすべてのシナリオをカバーし、冪等プロデューサーについて説明します
Kafka 0.11 で導入されたものと、コンシューマがオフセットと再処理を処理する方法について説明します。
3つの配送保証
セットアップに入る前に、Kafka がサポートする 3 つの配信モードを理解することが重要です。 そしてその実際的な意味:
- 最大 1 回: メッセージは 1 回だけ送信され、再試行は行われません。ブローカーがそれを受け取らない場合、 それは永久に失われます。重複はゼロ、データ損失の可能性あり。重要でないメトリクスには許容されますが、 デバッグ ログ、一部のイベントの損失が許容できるクリック ストリーム イベント。
- 少なくとも 1 回: エラーがある場合、プロデューサーは再試行します。メッセージは少なくとも 1 回は届きますが、 ただし、ブローカーが受信してもタイムアウト前に確認を送信しなかった場合は、複数回(重複して)受信する可能性があります。 コンシューマは重複を処理できるように冪等である必要があります。運用環境で最も一般的なシナリオ。
- 必ず 1 回: メッセージは、損失や重複なしに、正確に 1 回処理されます。 べき等プロデューサー (1 回限りのプロデューサー側の場合) または Kafka トランザクション (1 回限りの場合) が必要です プロデューサーとコンシューマー間のエンドツーエンド)。重大なパフォーマンスのオーバーヘッド。
黄金律
分散システムでは、オーバーヘッドなしで正確に 1 回を保証することは不可能です。システムの 90% 米国での制作におけるカフカ 少なくとも1回 冪等のコンシューマを使用した場合 (コンシューマ側の重複排除) データベースまたはキャッシュ経由)。 Kafka 経由の 1 回限りのトランザクションは、財務パイプライン、請求に使用されます。 システムやその他の場所でイベントの重複が発生すると、実害が発生します。
acks パラメータ: 待機する確認の数
パラメータ acks プロデューサーの数は、最初に受信を確認する必要があるレプリカの数を定義します
プロデューサーがリクエストが完了したとみなしていること:
acks=0 (ファイアアンドフォーゲット)
プロデューサーはレコードを送信し、ブローカーからの応答を待ちません。最大のスループット、最小のレイテンシ、 ただし、保証はありません: ブローカーがダウンした場合、またはレコードがブローカーに書き込まれ、ブローカーが最初にダウンした場合 返信すると、記録は失われます。セマンティクスは次のとおりです せいぜい1回.
// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record
acks=1 (リーダーのみ)
プロデューサは、パーティションの主要なブローカーからの確認のみを待ちます。フォロワーはまだいないかもしれません 確認が到着したらレコードを複製しました。確認送信直後にリーダーが倒れた場合 しかし、フォロワーが返信する前に記録は失われます。セマンティクス 少なくとも1回 非常に短い障害ウィンドウでデータが失われるリスクがあります。
// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
// -> possibile duplicato se broker ha ricevuto ma non risposto
acks=all (または acks=-1): すべての ISR
プロデューサーは、すべてのレコードにレコードが書き込まれるのを待ちます。 ISR (同期レプリカ) パーティションの。
と min.insync.replicas=2 e replication.factor=3、これは少なくとも
2 人の返信 (リーダー + 1 人のフォロワー) が確認する必要があります。そうして初めてプロデューサーは ack を受け取ります。
セマンティクス 少なくとも1回 少なくともデータが失われることはありません min.insync.replicas
ブローカーがアクティブです。
// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all"); // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000); // attende fino a 60s se il broker è congestionato
// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)
// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova
再試行による重複の問題
このシナリオを考えてみましょう acks=all e retries=3:
- プロデューサーは R1 レコードをリード ブローカーに送信します
- ブローカーは R1 をディスクに書き込み、プロデューサーに ack を送信します。
- ACK が失われる (プロデューサーに到達する前にネットワークがタイムアウトする)
- プロデューサーは ack を受信せずに入力します。
request.timeout.ms、書き込みが失敗したと思います - プロデューサーは再試行して R1 を再度送信します
- ブローカーは R1 を 2 度目に受信し、それを別のレコードとして書き込みます
- トピックには重複した R1 が含まれています
これは at-least-once の正しい動作です。各データは少なくとも 1 回到着しますが、重複して到着する可能性があります。 プロデューサ レベルで重複を削除するには、べき等プロデューサー.
べき等プロデューサー
Kafka 0.11 (2017) で導入された冪等プロデューサーは、プロデューサーの再試行によって生じる重複を排除します。 このメカニズムは、次の 2 つの概念に基づいています。
- プロデューサー ID (PID): べき等プロデューサーが接続すると、ブローカーは一意の PID を割り当てます。 PID はプロデューサーの存続期間中存続します。プロデューサーが再起動すると、新しい PID を取得します。
- シーケンス番号: 送信される各レコードには、単調増加するシーケンス番号 (0、1、2、...) が含まれます。 ブローカーは、各 PID + パーティションについて受信した最後のシーケンス番号を追跡します。レコードが届いたら シーケンス番号がすでに認識されている場合は、警告なしに破棄されます (ブローカー側での重複排除)。
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Abilitare idempotenza
props.put("enable.idempotence", true);
// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException
// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)
冪等プロデューサーの限界
べき等プロデューサーは、すべてのレコードが確実に表示されるようにします。 ちょうど一度だけ Kafka ログ内 プロデューサーの現在のセッション用。プロデューサーが再起動すると (新しい PID)、実行中のレコードが 書き換えられる可能性があります。そして何よりも、消費者による処理については何も保証されません。 コンシューマーがレコードを処理し、オフセットをコミットする前にクラッシュした場合、次回の起動時に 同じレコードを再処理します。 1 回限りのエンドツーエンドの場合は、トランザクション API が必要です。
max.in.flight.requests.per.connection と並べ替え
パラメータ max.in.flight.requests.per.connection (MIFR) 実稼働リクエストの数を制御します
彼らは同時に単一のブローカーに飛ぶことができます。仕分けに重大な影響を与える
再試行の場合のメッセージの数:
- MIFR=1: 各リクエストは、別のリクエストを送信する前に確認する必要があります。確実な仕分け、 ただし、スループットは低下します (パイプラインなし)。
- MIFR > 1 (冪等性なし): パイプラインがアクティブで、スループットが高くなりますが、バッチ N が失敗した場合 バッチ N-1 はすでに実行中です。N の再試行後、レコードは N-1、N の順に表示されます。 N-1 に先行する必要がありました。並べ替えは保証されなくなりました。
-
MIFR ≤ 5 (冪等): と
enable.idempotence=true、カフカは保証します シーケンス番号のおかげで、処理中のリクエストが最大 5 つであってもソートできます。それはデフォルト値です 冪等性が有効であり、保証を維持するために最大サポートされている場合。
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1); // nessun pipelining
// Througput limitato ma ordine garantito
// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry
// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer
実稼働用の完全なプロデューサー構成
// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerFactory {
/**
* Crea un producer ottimizzato per throughput elevato
* con garanzie at-least-once e idempotenza abilitata.
*/
public static KafkaProducer<String, byte[]> createHighThroughputProducer(
String bootstrapServers) {
Properties props = new Properties();
// Connessione
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
// Durabilità e idempotenza
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// acks=all e retries=MAX impostati automaticamente
// Batching: aspetta fino a 10ms per accumulare records
// Riduce numero di richieste al broker, aumenta throughput
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale
// Compressione: snappy è bilanciata tra CPU e ratio
// lz4 per massima velocità; zstd per massimo ratio
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// Timeout: se il buffer è pieno, blocca fino a max.block.ms
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
// Request timeout: quanto aspettare risposta dal broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// Delivery timeout: timeout totale inclusi tutti i retry
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
return new KafkaProducer<>(props);
}
/**
* Crea un producer per eventi critici (financial, billing)
* con latenza minima e massime garanzie.
*/
public static KafkaProducer<String, byte[]> createLowLatencyProducer(
String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
// Massima durabilità
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Nessun batching: invia immediatamente ogni record
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
// Nessuna compressione: elimina latenza CPU
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
return new KafkaProducer<>(props);
}
}
先進的なコンシューマ: コミットと再処理
自動コミット: シンプルだが危険
Con enable.auto.commit=true (デフォルト)、Kafka は次の間隔でオフセットを自動的にコミットします。
auto.commit.interval.ms ミリ秒 (デフォルト: 5000ms = 5 秒)。問題:
レコードが実際にいつ処理されるかに関係なく、コミットはバックグラウンドで行われます。
自動コミットの問題のあるシナリオ:
- ポーリングはオフセット 1 ~ 100 の 100 レコードを返します
- 消費者がレコードの処理を開始します
- 5 秒後にタイマーが開始します。オフセット 100 が自動的にコミットされます。
- レコード 1 ~ 60 のみを処理した後にコンシューマがクラッシュする
- 再起動すると、コンシューマーはオフセット 100 から開始します。レコード 61 ~ 100 がスキップされました (データ損失)
同期手動コミット
// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");
// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);
// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000); // 5 minuti
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
elaboraOrdine(record);
}
// commitSync() bloca fino a conferma dal broker
// In caso di errore, lancia CommitFailedException
// Garantisce at-least-once: il commit avviene dopo l'elaborazione
consumer.commitSync();
}
} catch (CommitFailedException e) {
// Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
// Gestisci il rebalance e riprendi
log.error("Commit fallito, probabile rebalance", e);
}
パーティションごとのコミット (細かい粒度)
// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
// Raggruppa i record per partizione
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// Elabora il record
elaboraOrdine(record);
// Traccia l'offset per questa partizione
// IMPORTANTE: commita offset+1 (il prossimo record da leggere)
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
// Commit di tutti gli offset accumulati
consumer.commitSync(offsets);
}
リバランス: その仕組みと管理方法
Un リバランス コンシューマ グループが変更されたとき、つまりコンシューマが開始、終了、またはクラッシュしたときに発生します。 リバランス中に、 グループ内のすべての消費者 彼らは本を読むのをやめ(世界を止めて)、 グループコーディネーターがパーティションを再割り当てします。 Kafka の最近のバージョン (2.4 以降) では利用可能です の 協同組合のリバランス (増分協調リバランス)これにより影響が軽減されます。 割り当て先を変更するパーティションのみが取り消され、再割り当てされます。
// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
Collections.singletonList("ordini-effettuati"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Chiamato prima che le partizioni vengano riassegnate
// Commita gli offset delle partizioni che stiamo per perdere
log.info("Partizioni revocate: {}", partitions);
// Commit degli offset non ancora committati per le partizioni revocate
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
for (TopicPartition tp : partitions) {
if (currentOffsets.containsKey(tp)) {
toCommit.put(tp, currentOffsets.get(tp));
}
}
if (!toCommit.isEmpty()) {
consumer.commitSync(toCommit);
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Chiamato dopo che le nuove partizioni sono state assegnate
log.info("Partizioni assegnate: {}", partitions);
// Possiamo inizializzare state locale per le nuove partizioni
}
}
);
高度なオフセット管理を備えた Python のコンシューマ
from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging
log = logging.getLogger(__name__)
class AdvancedKafkaConsumer:
"""Consumer Kafka con commit manuale e gestione rebalance."""
def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
self.topic = topic
self.pending_offsets: Dict[tuple, int] = {}
config = {
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
# Cooperative rebalance: meno interruzioni
"partition.assignment.strategy": "cooperative-sticky",
"session.timeout.ms": 30000,
"max.poll.interval.ms": 300000,
}
self.consumer = Consumer(config)
self.consumer.subscribe([topic], on_assign=self._on_assign,
on_revoke=self._on_revoke)
def _on_assign(self, consumer, partitions):
log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")
def _on_revoke(self, consumer, partitions):
"""Commita gli offset prima di perdere le partizioni."""
log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
to_commit = []
for p in partitions:
key = (p.topic, p.partition)
if key in self.pending_offsets:
to_commit.append(
TopicPartition(p.topic, p.partition,
self.pending_offsets[key])
)
if to_commit:
consumer.commit(offsets=to_commit, asynchronous=False)
def process_messages(self, handler_fn, batch_size: int = 100):
"""Loop principale di elaborazione."""
batch_count = 0
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
log.error(f"Errore Kafka: {msg.error()}")
continue
# Elabora il messaggio
try:
handler_fn(msg)
# Traccia l'offset (offset+1 = prossimo da leggere)
self.pending_offsets[(msg.topic(), msg.partition())] = \
msg.offset() + 1
batch_count += 1
except Exception as e:
log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
# Qui potresti implementare una DLQ (Dead Letter Queue)
# o skippare il record problematico
# Commit ogni N record
if batch_count >= batch_size:
self._commit_pending()
batch_count = 0
finally:
# Commit finale prima di chiudere
self._commit_pending()
self.consumer.close()
def _commit_pending(self):
"""Commita tutti gli offset pendenti."""
if not self.pending_offsets:
return
offsets = [
TopicPartition(topic, partition, offset)
for (topic, partition), offset in self.pending_offsets.items()
]
self.consumer.commit(offsets=offsets, asynchronous=False)
self.pending_offsets.clear()
log.debug(f"Committati {len(offsets)} offset")
パフォーマンスチューニング: 主要パラメータ
プロデューサー側
| パラメータ | デフォルト | 効果 |
|---|---|---|
linger.ms |
0 | より大きなバッチを蓄積するために N ミリ秒待機します |
batch.size |
16384 (16KB) | パーティションあたりの最大バッチサイズ |
buffer.memory |
33554432 (32MB) | すべてのバッチの合計メモリ内バッファー |
compression.type |
そうではない | なし/gzip/snappy/lz4/zstd |
消費者側
| パラメータ | デフォルト | 効果 |
|---|---|---|
fetch.min.bytes |
1 | フェッチから返される最小データ サイズ |
fetch.max.wait.ms |
500 | fetch.min.bytes に達しない場合の最大待機時間 |
max.poll.records |
500 | 単一のpoll()の最大レコード数 |
max.partition.fetch.bytes |
1048576 (1MB) | フェッチごとのパーティションあたりの最大データ |
EmbeddedKafka を使用したテスト
Kafka を使用するコードをテストするには、使用可能なクラスターが必要です。単体テストと結合テストの場合、
春のカフカがお届けします @EmbeddedKafka これにより、テスト中にインメモリ ブローカーが起動されます。
// Dipendenza Maven
// <dependency>
// <groupId>org.springframework.kafka</groupId>
// <artifactId>spring-kafka-test</artifactId>
// <scope>test</scope>
// </dependency>
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;
@SpringBootTest
@EmbeddedKafka(
partitions = 3,
topics = {"ordini-test"},
brokerProperties = {
"auto.create.topics.enable=false",
"default.replication.factor=1"
}
)
class OrdineProducerTest {
@Autowired
private OrdineProducer producer;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void shouldSendOrderSuccessfully() throws Exception {
// Crea un consumer di test per verificare che il messaggio sia arrivato
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"test-group", "true", embeddedKafka
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");
// Invia un ordine
producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");
// Verifica che il messaggio sia stato ricevuto entro 3 secondi
ConsumerRecords<String, String> records =
KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));
assertThat(records.count()).isEqualTo(1);
ConsumerRecord<String, String> record = records.iterator().next();
assertThat(record.key()).isEqualTo("order-001");
assertThat(record.value()).contains("99.99");
consumer.close();
}
}
Testcontainers を使用したテストの場合 (より現実的には、実際の Docker ブローカーを使用します):
// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@Testcontainers
class OrdineProducerIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("apache/kafka:4.0.0")
);
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Test
void testConBrokerReale() {
// Test con broker Kafka 4.0 reale in Docker
// Garantisce che il comportamento corrisponda alla produzione
}
}
シリーズの次のステップ
高度なプロデューサーとコンシューマーが含まれるため、最も複雑な課題に直面することができます。
- 第 4 条 – 1 回限りのセマンティクス: 安全な Kafka トランザクション トランザクション コーディネーターとスループットに影響を与える、まさにエンドツーエンドの処理。 金融パイプラインには不可欠です。
- 第 5 条 – レジストリスキーム: スキーマ レジストリで Avro と Protobuf を使用する方法 異なるチームのプロデューサーとコンシューマー間のスキーマの非互換性を回避するため。
- 第 6 条 – Kafka ストリーム: Streams DSL を使用した Java での埋め込みストリーム処理、 この記事で説明したのと同じプロデューサー API とコンシューマー API を内部で使用します。
他シリーズとの連携
- アーキテクチャ (マイクロサービスからモジュラーモノリスまで): Kafka のプロデューサーとコンシューマー 分散アーキテクチャにおけるイベント駆動パターンの具体的な実装として。
- 高度な Java: スレッドセーフを備えた Kafka の Java API を詳しく説明します。 EmbeddedKafka を使用したライフサイクル管理とテスト。







