Kafka ストリーム: Java、KTable、ウィンドウ処理に埋め込まれたストリーム処理
Kafka Streams は、アプリケーション内でストリームを直接処理するための Java ライブラリです。 専用クラスターなし。 RocksDB で Streams DSL、KTables、State Store を検出する 時間集計のためのウィンドウ操作。
Flink や Spark ではなく Kafka ストリームを使用する理由
Kafka を使用したストリーム処理について話すとき、当然の疑問は、なぜそれを使用するのかということです。 Apache Flink または Spark ストリーミングの代わりに Kafka ストリームを使用しますか?答えは以下によって異なります コンテキスト。 Kafka Streams には次のような独特の利点があります。 Javaライブラリ (別個のフレームワークではなく) アプリケーション内で実行されるため、必要はありません ジョブ マネージャーや個別の展開を必要としない、専用クラスターの構成。
Java/Kotlin アプリケーションにストリーム処理を追加したいチーム向け 運用を複雑にすることなく、Kafka Streams を選択するのは自然な選択です。パイプライン別 マルチストリームの一時結合による複雑な分析、データの複雑な集計 分散型ワークロード、または独立したスケーリングが必要なワークロードの場合、Flink が最良の選択です (シリーズの記事 08 を参照)。
何を学ぶか
- Kafka ストリーム アーキテクチャ: トポロジ、ストリーム スレッド、タスク
- KStream と KTable: 概念的および実際的な違い
- DSL ストリーム: フィルター、マップ、フラットマップ、グループバイ、集約
- 状態ストア: RocksDB 組み込み、メモリ内、変更ログ
- ウィンドウ操作: タンブリング、ホッピング、セッション ウィンドウ
- KStream-KTable 結合: 参照データによるストリームの強化
- Kafka ストリームのエラー処理とデッドレターキュー
アーキテクチャ: トポロジ、タスク、およびパーティション
Kafka Streams アプリケーションは、 トポロジー: DAG 処理演算子の (有向非巡回グラフ)。各ノードはオペレーションです (フィルター、マップ、集計)、各エッジはレコードのストリームです。カフカストリーム このトポロジをコンパイルします タスク、パーティションごとに 1 つ ソーストピックの。
// Dipendenze Maven
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-streams</artifactId>
// <version>3.7.0</version>
// </dependency>
// Configurazione base di Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
// Serializer/Deserializer di default per chiavi e valori
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Semantica exactly-once (vedi Articolo 04)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Numero di thread per istanza (parallelismo locale)
// Ogni thread gestisce uno o piu task (partizioni)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// Intervallo di commit degli offset
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
// Directory per i dati dello State Store (RocksDB)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
KStream と KTable: 基本的な二元論
Kafka ストリームの最も重要な概念は、KStream と KTable の違いです。 この違いを理解することは、正しいトポロジを設計するための基本です。
StreamsBuilder builder = new StreamsBuilder();
// KStream: rappresenta un flusso di eventi immutabili
// Ogni record e un evento indipendente con la stessa chiave
// Semantica: "E successa questa cosa in questo momento"
// Esempio: ogni ordine e un nuovo evento, anche se ha lo stesso order_id
KStream<String, String> ordersStream = builder.stream("orders");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"] <-- evento diverso
// KTable: rappresenta una tabella mutabile (changelog)
// Ogni record con la stessa chiave AGGIORNA il valore precedente
// Semantica: "Lo stato attuale di questa entita e questo"
// Esempio: lo stato corrente di ogni ordine
KTable<String, String> ordersTable = builder.table("orders-state");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"] <-- AGGIORNA il precedente
// Lo stato finale: order-123 = CONFIRMED
// GlobalKTable: come KTable ma replicato su tutti i thread/istanze
// Utile per dati di riferimento (prodotti, utenti) che non cambiano spesso
// Non partizionato: ogni istanza ha l'intera tabella
KTable<String, String> productsTable = builder.globalTable("products-catalog");
DSL ストリーム: 基本的な操作
// Pipeline completa di analisi ordini
StreamsBuilder builder = new StreamsBuilder();
// Leggi lo stream degli ordini (JSON come String)
KStream<String, String> rawOrders = builder.stream("orders-raw");
// 1. FILTER: filtra record nulli o malformati
KStream<String, String> validOrders = rawOrders
.filter((key, value) -> value != null && value.contains("order_id"));
// 2. MAP: trasforma il record (key e value)
KStream<String, Order> parsedOrders = validOrders
.mapValues(value -> parseOrder(value));
// 3. FLATMAP: produce 0 o N record da 1 record in input
// Utile per espandere arrays o filtrare con output multiplo
KStream<String, OrderItem> orderItems = parsedOrders
.flatMapValues(order -> order.getItems());
// 4. SELECTKEY: cambia la chiave del record
// ATTENZIONE: reparticiona i dati (shuffles attraverso Kafka)
KStream<String, Order> ordersByUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId());
// 5. PEEK: side effect senza modifica (logging, metrics)
KStream<String, Order> loggedOrders = ordersByUser
.peek((userId, order) ->
log.debug("Processing order {} for user {}", order.getOrderId(), userId));
// 6. BRANCH: divide lo stream in piu stream basandosi su condizioni
// Deprecato in 2.8+: usa split() che e piu flessibile
Map<String, KStream<String, Order>> branches = ordersByUser
.split(Named.as("branch-"))
.branch((key, order) -> order.getAmount() > 1000.0, Branched.as("high-value"))
.branch((key, order) -> order.getAmount() > 100.0, Branched.as("medium-value"))
.defaultBranch(Branched.as("low-value"));
KStream<String, Order> highValueOrders = branches.get("branch-high-value");
// 7. TO: scrivi lo stream su un topic Kafka
parsedOrders
.mapValues(Order::toJson)
.to("orders-processed", Produced.with(Serdes.String(), Serdes.String()));
状態ストアを使用した集約
ステートフルな集約は Kafka ストリームの中心です。各集合体は次のことを維持します。 1つ ステートストア 値を永続化するローカル (デフォルトでは RocksDB) 中間体。ステートストアには、 変更ログのトピック 特派員 耐障害性のための Kafka: インスタンスがクラッシュした場合、状態を再構成できます。
// Aggregazione: conta ordini per utente
KTable<String, Long> ordersPerUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey() // Raggruppa per chiave corrente (user_id)
.count(Materialized.as("orders-count-store"));
// Il risultato e un KTable: user_id -> count
// Aggregazione: calcola totale spesa per utente
KTable<String, Double> totalSpendPerUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey(Grouped.with(Serdes.String(),
// Serve Serde custom per Order
new OrderSerde()))
.aggregate(
() -> 0.0, // Initializer: valore iniziale
(userId, order, total) -> // Aggregator: combina accumulatore e nuovo record
total + order.getAmount(),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double())
);
// Scrivi il risultato su un topic per monitoring o downstream
totalSpendPerUser
.toStream()
.mapValues(total -> String.format("{\"total\": %.2f}", total))
.to("user-spend-summary");
ウィンドウ操作: 時間的集計
ウィンドウ操作を使用すると、時間間隔に基づいてレコードを集計できます。 Kafka Streams は、それぞれ異なるセマンティクスを持つ 4 つのウィンドウ タイプをサポートします。
// 1. TUMBLING WINDOW: finestre fisse non sovrapposte
// Esempio: conta ordini ogni 5 minuti (00:00-00:05, 00:05-00:10, ...)
KTable<Windowed<String>, Long> ordersPerWindow = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("orders-per-5min-store"));
ordersPerWindow.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(), // user_id
String.format(
"{\"user\": \"%s\", \"window_start\": %d, \"count\": %d}",
windowedKey.key(),
windowedKey.window().start(),
count
)
))
.to("orders-per-window");
// 2. HOPPING WINDOW: finestre di dimensione fissa che si sovrappongono
// Esempio: media mobile degli ordini negli ultimi 10 minuti, aggiornata ogni 5 min
// (00:00-00:10, 00:05-00:15, 00:10-00:20, ...)
KTable<Windowed<String>, Long> slidingCount = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(10), // Dimensione finestra
Duration.ofMinutes(1) // Grace period per late arrivals
).advanceBy(Duration.ofMinutes(5)) // Avanza di 5 min (hopping)
)
.count();
// 3. SESSION WINDOW: finestre basate sull'attivita dell'utente
// Raggruppa eventi dello stesso utente se arrivano entro 30 minuti l'uno dall'altro
// Le sessioni si chiudono dopo 30 minuti di inattivita
KTable<Windowed<String>, Long> sessionCounts = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofMinutes(30),
Duration.ofMinutes(5)
))
.count(Materialized.as("session-store"));
// 4. SLIDING WINDOW: finestre che si spostano su ogni nuovo evento
// Ogni evento definisce una finestra di [event_time - window_size, event_time]
KTable<Windowed<String>, Long> slidingWindow = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
.count();
KStream-KTable 結合: ストリームの強化
Kafka ストリームの最も一般的な使用例の 1 つは、イベント ストリームを強化することです。 静的またはゆっくりと変化する参照データを使用して、注文を充実させます。 製品の詳細、ログイン イベントへのユーザー名の追加など。
// Join tra KStream (ordini) e KTable (catalogo prodotti)
StreamsBuilder builder = new StreamsBuilder();
// Topic con gli ordini (stream di eventi)
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), new OrderSerde())
);
// Topic con il catalogo prodotti (tabella di stato)
// Quando un prodotto viene aggiornato, il topic riceve un nuovo record
KTable<String, Product> products = builder.table(
"products-catalog",
Consumed.with(Serdes.String(), new ProductSerde())
);
// Per fare il join, la chiave dell'ordine deve essere il product_id
// Usiamo il primo item dell'ordine come esempio
KStream<String, Order> ordersByProduct = orders
.selectKey((orderId, order) ->
order.getItems().get(0).getProductId() // Reparticiona per product_id
);
// LEFT JOIN: mantiene tutti gli ordini anche se il prodotto non esiste nel catalog
KStream<String, EnrichedOrder> enrichedOrders = ordersByProduct.leftJoin(
products,
(order, product) -> {
if (product == null) {
// Prodotto non trovato: usa dati parziali
return EnrichedOrder.fromOrder(order);
}
return EnrichedOrder.fromOrderAndProduct(order, product);
}
);
enrichedOrders
.selectKey((productId, enriched) -> enriched.getOrderId())
.mapValues(EnrichedOrder::toJson)
.to("orders-enriched");
Kafka ストリームでのエラー処理
// Handler per errori di deserializzazione
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
// Alternativa: LogAndFailExceptionHandler (ferma l'app) - piu sicura in produzione
// Handler personalizzato per errori di produzione
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class.getName());
// Implementazione handler custom con Dead Letter Queue
public class DLQProductionExceptionHandler implements ProductionExceptionHandler {
private final KafkaProducer<byte[], byte[]> dlqProducer;
public DLQProductionExceptionHandler() {
Properties dlqProps = new Properties();
dlqProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
dlqProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
dlqProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
this.dlqProducer = new KafkaProducer<>(dlqProps);
}
@Override
public ProductionExceptionHandlerResponse handle(
ProducerRecord<byte[], byte[]> record,
Exception exception) {
log.error("Errore produzione per record key={}", new String(record.key()), exception);
// Invia il record problematico alla DLQ con metadata dell'errore
Headers headers = new RecordHeaders();
headers.add("error-message", exception.getMessage().getBytes());
headers.add("error-type", exception.getClass().getSimpleName().getBytes());
headers.add("source-topic", record.topic().getBytes());
headers.add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
record.topic() + ".dlq", // {topic-name}.dlq
null,
record.key(),
record.value(),
headers
);
dlqProducer.send(dlqRecord);
// CONTINUE: skippa il record e va avanti (non blocca lo stream)
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
状態ストア: インタラクティブなクエリによるアクセス
// Interactive Queries: esponi lo stato locale via HTTP
// Utile per dashboard real-time, lookup diretti
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Leggi dal Key-Value Store locale
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
StoreQueryParameters.fromNameAndType(
"orders-count-store",
QueryableStoreTypes.keyValueStore()
)
);
// Lookup singolo: conta ordini per user "user-42"
Long userOrderCount = keyValueStore.get("user-42");
System.out.println("Ordini utente: " + userOrderCount);
// Scan di tutti i valori (attenzione: puo essere costoso)
KeyValueIterator<String, Long> allCounts = keyValueStore.all();
while (allCounts.hasNext()) {
KeyValue<String, Long> kv = allCounts.next();
System.out.printf("User: %s, Count: %d%n", kv.key, kv.value);
}
allCounts.close(); // Importante: chiudi sempre l'iterator
// Range query: utenti da "user-100" a "user-200"
KeyValueIterator<String, Long> rangeIter =
keyValueStore.range("user-100", "user-200");
// usa rangeIter..., poi close()
結論と Kafka ストリームを使用する場合
Kafka Streams は、ストリーム処理を追加するシナリオに最適です。 専用クラスターのない既存の Java/Kotlin アプリケーション: 集約 リアルタイム、ストリームと参照テーブル間の結合、変換と強化 データの。ライブラリはフォールト トレランス、パーティショニング、スケーリングを管理します 透明な方法で水平にします。
クロスストリーム一時結合を必要とするより複雑な分析パイプラインの場合、 CEP (Complex Event Processing)、またはデータレイク (Iceberg、Delta) との統合、 記事 08 では、Kafka + Apache Flink が 2026 年の事実上の標準になる様子を示しています。







