Flink や Spark ではなく Kafka ストリームを使用する理由

Kafka を使用したストリーム処理について話すとき、当然の疑問は、なぜそれを使用するのかということです。 Apache Flink または Spark ストリーミングの代わりに Kafka ストリームを使用しますか?答えは以下によって異なります コンテキスト。 Kafka Streams には次のような独特の利点があります。 Javaライブラリ (別個のフレームワークではなく) アプリケーション内で実行されるため、必要はありません ジョブ マネージャーや個別の展開を必要としない、専用クラスターの構成。

Java/Kotlin アプリケーションにストリーム処理を追加したいチーム向け 運用を複雑にすることなく、Kafka Streams を選択するのは自然な選択です。パイプライン別 マルチストリームの一時結合による複雑な分析、データの複雑な集計 分散型ワークロード、または独立したスケーリングが必要なワークロードの場合、Flink が最良の選択です (シリーズの記事 08 を参照)。

何を学ぶか

  • Kafka ストリーム アーキテクチャ: トポロジ、ストリーム スレッド、タスク
  • KStream と KTable: 概念的および実際的な違い
  • DSL ストリーム: フィルター、マップ、フラットマップ、グループバイ、集約
  • 状態ストア: RocksDB 組み込み、メモリ内、変更ログ
  • ウィンドウ操作: タンブリング、ホッピング、セッション ウィンドウ
  • KStream-KTable 結合: 参照データによるストリームの強化
  • Kafka ストリームのエラー処理とデッドレターキュー

アーキテクチャ: トポロジ、タスク、およびパーティション

Kafka Streams アプリケーションは、 トポロジー: DAG 処理演算子の (有向非巡回グラフ)。各ノードはオペレーションです (フィルター、マップ、集計)、各エッジはレコードのストリームです。カフカストリーム このトポロジをコンパイルします タスク、パーティションごとに 1 つ ソーストピックの。

// Dipendenze Maven
// <dependency>
//   <groupId>org.apache.kafka</groupId>
//   <artifactId>kafka-streams</artifactId>
//   <version>3.7.0</version>
// </dependency>

// Configurazione base di Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");

// Serializer/Deserializer di default per chiavi e valori
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// Semantica exactly-once (vedi Articolo 04)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

// Numero di thread per istanza (parallelismo locale)
// Ogni thread gestisce uno o piu task (partizioni)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

// Intervallo di commit degli offset
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

// Directory per i dati dello State Store (RocksDB)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");

KStream と KTable: 基本的な二元論

Kafka ストリームの最も重要な概念は、KStream と KTable の違いです。 この違いを理解することは、正しいトポロジを設計するための基本です。

StreamsBuilder builder = new StreamsBuilder();

// KStream: rappresenta un flusso di eventi immutabili
// Ogni record e un evento indipendente con la stessa chiave
// Semantica: "E successa questa cosa in questo momento"
// Esempio: ogni ordine e un nuovo evento, anche se ha lo stesso order_id
KStream<String, String> ordersStream = builder.stream("orders");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"]  <-- evento diverso

// KTable: rappresenta una tabella mutabile (changelog)
// Ogni record con la stessa chiave AGGIORNA il valore precedente
// Semantica: "Lo stato attuale di questa entita e questo"
// Esempio: lo stato corrente di ogni ordine
KTable<String, String> ordersTable = builder.table("orders-state");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"]  <-- AGGIORNA il precedente
// Lo stato finale: order-123 = CONFIRMED

// GlobalKTable: come KTable ma replicato su tutti i thread/istanze
// Utile per dati di riferimento (prodotti, utenti) che non cambiano spesso
// Non partizionato: ogni istanza ha l'intera tabella
KTable<String, String> productsTable = builder.globalTable("products-catalog");

DSL ストリーム: 基本的な操作

// Pipeline completa di analisi ordini
StreamsBuilder builder = new StreamsBuilder();

// Leggi lo stream degli ordini (JSON come String)
KStream<String, String> rawOrders = builder.stream("orders-raw");

// 1. FILTER: filtra record nulli o malformati
KStream<String, String> validOrders = rawOrders
    .filter((key, value) -> value != null && value.contains("order_id"));

// 2. MAP: trasforma il record (key e value)
KStream<String, Order> parsedOrders = validOrders
    .mapValues(value -> parseOrder(value));

// 3. FLATMAP: produce 0 o N record da 1 record in input
// Utile per espandere arrays o filtrare con output multiplo
KStream<String, OrderItem> orderItems = parsedOrders
    .flatMapValues(order -> order.getItems());

// 4. SELECTKEY: cambia la chiave del record
// ATTENZIONE: reparticiona i dati (shuffles attraverso Kafka)
KStream<String, Order> ordersByUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId());

// 5. PEEK: side effect senza modifica (logging, metrics)
KStream<String, Order> loggedOrders = ordersByUser
    .peek((userId, order) ->
        log.debug("Processing order {} for user {}", order.getOrderId(), userId));

// 6. BRANCH: divide lo stream in piu stream basandosi su condizioni
// Deprecato in 2.8+: usa split() che e piu flessibile
Map<String, KStream<String, Order>> branches = ordersByUser
    .split(Named.as("branch-"))
    .branch((key, order) -> order.getAmount() > 1000.0, Branched.as("high-value"))
    .branch((key, order) -> order.getAmount() > 100.0, Branched.as("medium-value"))
    .defaultBranch(Branched.as("low-value"));

KStream<String, Order> highValueOrders = branches.get("branch-high-value");

// 7. TO: scrivi lo stream su un topic Kafka
parsedOrders
    .mapValues(Order::toJson)
    .to("orders-processed", Produced.with(Serdes.String(), Serdes.String()));

状態ストアを使用した集約

ステートフルな集約は Kafka ストリームの中心です。各集合体は次のことを維持します。 1つ ステートストア 値を永続化するローカル (デフォルトでは RocksDB) 中間体。ステートストアには、 変更ログのトピック 特派員 耐障害性のための Kafka: インスタンスがクラッシュした場合、状態を再構成できます。

// Aggregazione: conta ordini per utente
KTable<String, Long> ordersPerUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()  // Raggruppa per chiave corrente (user_id)
    .count(Materialized.as("orders-count-store"));
    // Il risultato e un KTable: user_id -> count

// Aggregazione: calcola totale spesa per utente
KTable<String, Double> totalSpendPerUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey(Grouped.with(Serdes.String(),
        // Serve Serde custom per Order
        new OrderSerde()))
    .aggregate(
        () -> 0.0,                          // Initializer: valore iniziale
        (userId, order, total) ->           // Aggregator: combina accumulatore e nuovo record
            total + order.getAmount(),
        Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Double())
    );

// Scrivi il risultato su un topic per monitoring o downstream
totalSpendPerUser
    .toStream()
    .mapValues(total -> String.format("{\"total\": %.2f}", total))
    .to("user-spend-summary");

ウィンドウ操作: 時間的集計

ウィンドウ操作を使用すると、時間間隔に基づいてレコードを集計できます。 Kafka Streams は、それぞれ異なるセマンティクスを持つ 4 つのウィンドウ タイプをサポートします。

// 1. TUMBLING WINDOW: finestre fisse non sovrapposte
// Esempio: conta ordini ogni 5 minuti (00:00-00:05, 00:05-00:10, ...)
KTable<Windowed<String>, Long> ordersPerWindow = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.as("orders-per-5min-store"));

ordersPerWindow.toStream()
    .map((windowedKey, count) -> KeyValue.pair(
        windowedKey.key(),  // user_id
        String.format(
            "{\"user\": \"%s\", \"window_start\": %d, \"count\": %d}",
            windowedKey.key(),
            windowedKey.window().start(),
            count
        )
    ))
    .to("orders-per-window");

// 2. HOPPING WINDOW: finestre di dimensione fissa che si sovrappongono
// Esempio: media mobile degli ordini negli ultimi 10 minuti, aggiornata ogni 5 min
// (00:00-00:10, 00:05-00:15, 00:10-00:20, ...)
KTable<Windowed<String>, Long> slidingCount = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeAndGrace(
            Duration.ofMinutes(10),   // Dimensione finestra
            Duration.ofMinutes(1)     // Grace period per late arrivals
        ).advanceBy(Duration.ofMinutes(5))  // Avanza di 5 min (hopping)
    )
    .count();

// 3. SESSION WINDOW: finestre basate sull'attivita dell'utente
// Raggruppa eventi dello stesso utente se arrivano entro 30 minuti l'uno dall'altro
// Le sessioni si chiudono dopo 30 minuti di inattivita
KTable<Windowed<String>, Long> sessionCounts = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapAndGrace(
        Duration.ofMinutes(30),
        Duration.ofMinutes(5)
    ))
    .count(Materialized.as("session-store"));

// 4. SLIDING WINDOW: finestre che si spostano su ogni nuovo evento
// Ogni evento definisce una finestra di [event_time - window_size, event_time]
KTable<Windowed<String>, Long> slidingWindow = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
    .count();

KStream-KTable 結合: ストリームの強化

Kafka ストリームの最も一般的な使用例の 1 つは、イベント ストリームを強化することです。 静的またはゆっくりと変化する参照データを使用して、注文を充実させます。 製品の詳細、ログイン イベントへのユーザー名の追加など。

// Join tra KStream (ordini) e KTable (catalogo prodotti)
StreamsBuilder builder = new StreamsBuilder();

// Topic con gli ordini (stream di eventi)
KStream<String, Order> orders = builder.stream(
    "orders",
    Consumed.with(Serdes.String(), new OrderSerde())
);

// Topic con il catalogo prodotti (tabella di stato)
// Quando un prodotto viene aggiornato, il topic riceve un nuovo record
KTable<String, Product> products = builder.table(
    "products-catalog",
    Consumed.with(Serdes.String(), new ProductSerde())
);

// Per fare il join, la chiave dell'ordine deve essere il product_id
// Usiamo il primo item dell'ordine come esempio
KStream<String, Order> ordersByProduct = orders
    .selectKey((orderId, order) ->
        order.getItems().get(0).getProductId()  // Reparticiona per product_id
    );

// LEFT JOIN: mantiene tutti gli ordini anche se il prodotto non esiste nel catalog
KStream<String, EnrichedOrder> enrichedOrders = ordersByProduct.leftJoin(
    products,
    (order, product) -> {
        if (product == null) {
            // Prodotto non trovato: usa dati parziali
            return EnrichedOrder.fromOrder(order);
        }
        return EnrichedOrder.fromOrderAndProduct(order, product);
    }
);

enrichedOrders
    .selectKey((productId, enriched) -> enriched.getOrderId())
    .mapValues(EnrichedOrder::toJson)
    .to("orders-enriched");

Kafka ストリームでのエラー処理

// Handler per errori di deserializzazione
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);
// Alternativa: LogAndFailExceptionHandler (ferma l'app) - piu sicura in produzione

// Handler personalizzato per errori di produzione
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
    CustomProductionExceptionHandler.class.getName());

// Implementazione handler custom con Dead Letter Queue
public class DLQProductionExceptionHandler implements ProductionExceptionHandler {
    private final KafkaProducer<byte[], byte[]> dlqProducer;

    public DLQProductionExceptionHandler() {
        Properties dlqProps = new Properties();
        dlqProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
        dlqProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class);
        dlqProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class);
        this.dlqProducer = new KafkaProducer<>(dlqProps);
    }

    @Override
    public ProductionExceptionHandlerResponse handle(
            ProducerRecord<byte[], byte[]> record,
            Exception exception) {
        log.error("Errore produzione per record key={}", new String(record.key()), exception);

        // Invia il record problematico alla DLQ con metadata dell'errore
        Headers headers = new RecordHeaders();
        headers.add("error-message", exception.getMessage().getBytes());
        headers.add("error-type", exception.getClass().getSimpleName().getBytes());
        headers.add("source-topic", record.topic().getBytes());
        headers.add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

        ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
            record.topic() + ".dlq",  // {topic-name}.dlq
            null,
            record.key(),
            record.value(),
            headers
        );

        dlqProducer.send(dlqRecord);

        // CONTINUE: skippa il record e va avanti (non blocca lo stream)
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
}

状態ストア: インタラクティブなクエリによるアクセス

// Interactive Queries: esponi lo stato locale via HTTP
// Utile per dashboard real-time, lookup diretti

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Leggi dal Key-Value Store locale
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "orders-count-store",
        QueryableStoreTypes.keyValueStore()
    )
);

// Lookup singolo: conta ordini per user "user-42"
Long userOrderCount = keyValueStore.get("user-42");
System.out.println("Ordini utente: " + userOrderCount);

// Scan di tutti i valori (attenzione: puo essere costoso)
KeyValueIterator<String, Long> allCounts = keyValueStore.all();
while (allCounts.hasNext()) {
    KeyValue<String, Long> kv = allCounts.next();
    System.out.printf("User: %s, Count: %d%n", kv.key, kv.value);
}
allCounts.close();  // Importante: chiudi sempre l'iterator

// Range query: utenti da "user-100" a "user-200"
KeyValueIterator<String, Long> rangeIter =
    keyValueStore.range("user-100", "user-200");
// usa rangeIter..., poi close()

結論と Kafka ストリームを使用する場合

Kafka Streams は、ストリーム処理を追加するシナリオに最適です。 専用クラスターのない既存の Java/Kotlin アプリケーション: 集約 リアルタイム、ストリームと参照テーブル間の結合、変換と強化 データの。ライブラリはフォールト トレランス、パーティショニング、スケーリングを管理します 透明な方法で水平にします。

クロスストリーム一時結合を必要とするより複雑な分析パイプラインの場合、 CEP (Complex Event Processing)、またはデータレイク (Iceberg、Delta) との統合、 記事 08 では、Kafka + Apache Flink が 2026 年の事実上の標準になる様子を示しています。

完全なシリーズ: Apache Kafka

  • 第01条 — Apache Kafka の基礎
  • 第02条 — Kafka 4.0 の KRaft
  • 第03条 — 先進的なプロデューサーとコンシューマー
  • 第04条 — Kafka の 1 回限りのセマンティクス
  • 第05条 — スキーマ レジストリ: Avro および Protobuf
  • 第06条(本) — Kafka ストリーム: Java に埋め込まれたストリーム処理
  • 第07条 — Kafka Connect: Debezium CDC と DB の統合
  • 第08条 — Kafka + Apache Flink: リアルタイムのパイプライン分析