Why Kafka Streams instead of Flink or Spark

When talking about stream processing with Kafka, the natural question is: why use it Kafka Streams instead of Apache Flink or Spark Streaming? The answer depends on context. Kafka Streams has a unique advantage: and a Java library (not a separate framework) that runs inside your application, no need of a dedicated cluster, without a job manager, without separate deployment.

For teams that want to add stream processing to their Java/Kotlin application without adding operational complexity, Kafka Streams is the natural choice. By pipeline complex analytics with multi-stream temporal joins, complex aggregations on data distributed, or workloads that require independent scaling, Flink is the best choice (see Article 08 of the series).

What You Will Learn

  • Kafka Streams architecture: topology, stream threads, tasks
  • KStream vs KTable: the conceptual and practical difference
  • DSL Streams: filter, map, flatMap, groupBy, aggregate
  • State Store: RocksDB embedded, in-memory, changelogs
  • Window operations: tumbling, hopping, session windows
  • KStream-KTable join: Enriching streams with reference data
  • Error handling and dead letter queues in Kafka Streams

Architecture: Topology, Task and Partition

A Kafka Streams application defines a topology: a DAG (Directed Acyclic Graph) of processing operators. Each node is an operation (filter, map, aggregate), each edge is a stream of records. Kafka Streams compile this topology in tasks, one for each partition of the source topic.

// Dipendenze Maven
// <dependency>
//   <groupId>org.apache.kafka</groupId>
//   <artifactId>kafka-streams</artifactId>
//   <version>3.7.0</version>
// </dependency>

// Configurazione base di Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");

// Serializer/Deserializer di default per chiavi e valori
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// Semantica exactly-once (vedi Articolo 04)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

// Numero di thread per istanza (parallelismo locale)
// Ogni thread gestisce uno o piu task (partizioni)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

// Intervallo di commit degli offset
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

// Directory per i dati dello State Store (RocksDB)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");

KStream vs KTable: Fundamental Dualism

The most important concept of Kafka Streams is the distinction between KStream and KTable. Understanding this difference is fundamental to designing correct topologies.

StreamsBuilder builder = new StreamsBuilder();

// KStream: rappresenta un flusso di eventi immutabili
// Ogni record e un evento indipendente con la stessa chiave
// Semantica: "E successa questa cosa in questo momento"
// Esempio: ogni ordine e un nuovo evento, anche se ha lo stesso order_id
KStream<String, String> ordersStream = builder.stream("orders");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"]  <-- evento diverso

// KTable: rappresenta una tabella mutabile (changelog)
// Ogni record con la stessa chiave AGGIORNA il valore precedente
// Semantica: "Lo stato attuale di questa entita e questo"
// Esempio: lo stato corrente di ogni ordine
KTable<String, String> ordersTable = builder.table("orders-state");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"]  <-- AGGIORNA il precedente
// Lo stato finale: order-123 = CONFIRMED

// GlobalKTable: come KTable ma replicato su tutti i thread/istanze
// Utile per dati di riferimento (prodotti, utenti) che non cambiano spesso
// Non partizionato: ogni istanza ha l'intera tabella
KTable<String, String> productsTable = builder.globalTable("products-catalog");

DSL Streams: Fundamental Operations

// Pipeline completa di analisi ordini
StreamsBuilder builder = new StreamsBuilder();

// Leggi lo stream degli ordini (JSON come String)
KStream<String, String> rawOrders = builder.stream("orders-raw");

// 1. FILTER: filtra record nulli o malformati
KStream<String, String> validOrders = rawOrders
    .filter((key, value) -> value != null && value.contains("order_id"));

// 2. MAP: trasforma il record (key e value)
KStream<String, Order> parsedOrders = validOrders
    .mapValues(value -> parseOrder(value));

// 3. FLATMAP: produce 0 o N record da 1 record in input
// Utile per espandere arrays o filtrare con output multiplo
KStream<String, OrderItem> orderItems = parsedOrders
    .flatMapValues(order -> order.getItems());

// 4. SELECTKEY: cambia la chiave del record
// ATTENZIONE: reparticiona i dati (shuffles attraverso Kafka)
KStream<String, Order> ordersByUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId());

// 5. PEEK: side effect senza modifica (logging, metrics)
KStream<String, Order> loggedOrders = ordersByUser
    .peek((userId, order) ->
        log.debug("Processing order {} for user {}", order.getOrderId(), userId));

// 6. BRANCH: divide lo stream in piu stream basandosi su condizioni
// Deprecato in 2.8+: usa split() che e piu flessibile
Map<String, KStream<String, Order>> branches = ordersByUser
    .split(Named.as("branch-"))
    .branch((key, order) -> order.getAmount() > 1000.0, Branched.as("high-value"))
    .branch((key, order) -> order.getAmount() > 100.0, Branched.as("medium-value"))
    .defaultBranch(Branched.as("low-value"));

KStream<String, Order> highValueOrders = branches.get("branch-high-value");

// 7. TO: scrivi lo stream su un topic Kafka
parsedOrders
    .mapValues(Order::toJson)
    .to("orders-processed", Produced.with(Serdes.String(), Serdes.String()));

Aggregations with State Store

Stateful aggregations are the heart of Kafka Streams. Each aggregation maintains one State Store local (RocksDB by default) that persists the values intermediates. The State Store has a changelog topic correspondent on Kafka for fault tolerance: if an instance crashes, the state can be reconstituted.

// Aggregazione: conta ordini per utente
KTable<String, Long> ordersPerUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()  // Raggruppa per chiave corrente (user_id)
    .count(Materialized.as("orders-count-store"));
    // Il risultato e un KTable: user_id -> count

// Aggregazione: calcola totale spesa per utente
KTable<String, Double> totalSpendPerUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey(Grouped.with(Serdes.String(),
        // Serve Serde custom per Order
        new OrderSerde()))
    .aggregate(
        () -> 0.0,                          // Initializer: valore iniziale
        (userId, order, total) ->           // Aggregator: combina accumulatore e nuovo record
            total + order.getAmount(),
        Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Double())
    );

// Scrivi il risultato su un topic per monitoring o downstream
totalSpendPerUser
    .toStream()
    .mapValues(total -> String.format("{\"total\": %.2f}", total))
    .to("user-spend-summary");

Window Operations: Temporal Aggregations

Window operations allow you to aggregate records based on time intervals. Kafka Streams supports four window types, each with distinct semantics.

// 1. TUMBLING WINDOW: finestre fisse non sovrapposte
// Esempio: conta ordini ogni 5 minuti (00:00-00:05, 00:05-00:10, ...)
KTable<Windowed<String>, Long> ordersPerWindow = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.as("orders-per-5min-store"));

ordersPerWindow.toStream()
    .map((windowedKey, count) -> KeyValue.pair(
        windowedKey.key(),  // user_id
        String.format(
            "{\"user\": \"%s\", \"window_start\": %d, \"count\": %d}",
            windowedKey.key(),
            windowedKey.window().start(),
            count
        )
    ))
    .to("orders-per-window");

// 2. HOPPING WINDOW: finestre di dimensione fissa che si sovrappongono
// Esempio: media mobile degli ordini negli ultimi 10 minuti, aggiornata ogni 5 min
// (00:00-00:10, 00:05-00:15, 00:10-00:20, ...)
KTable<Windowed<String>, Long> slidingCount = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeAndGrace(
            Duration.ofMinutes(10),   // Dimensione finestra
            Duration.ofMinutes(1)     // Grace period per late arrivals
        ).advanceBy(Duration.ofMinutes(5))  // Avanza di 5 min (hopping)
    )
    .count();

// 3. SESSION WINDOW: finestre basate sull'attivita dell'utente
// Raggruppa eventi dello stesso utente se arrivano entro 30 minuti l'uno dall'altro
// Le sessioni si chiudono dopo 30 minuti di inattivita
KTable<Windowed<String>, Long> sessionCounts = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapAndGrace(
        Duration.ofMinutes(30),
        Duration.ofMinutes(5)
    ))
    .count(Materialized.as("session-store"));

// 4. SLIDING WINDOW: finestre che si spostano su ogni nuovo evento
// Ogni evento definisce una finestra di [event_time - window_size, event_time]
KTable<Windowed<String>, Long> slidingWindow = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
    .count();

KStream-KTable Join: Stream enrichment

One of the most common use cases in Kafka Streams is enriching an event stream with static or slowly changing reference data: enrich orders with the product details, add the username to the login event, etc.

// Join tra KStream (ordini) e KTable (catalogo prodotti)
StreamsBuilder builder = new StreamsBuilder();

// Topic con gli ordini (stream di eventi)
KStream<String, Order> orders = builder.stream(
    "orders",
    Consumed.with(Serdes.String(), new OrderSerde())
);

// Topic con il catalogo prodotti (tabella di stato)
// Quando un prodotto viene aggiornato, il topic riceve un nuovo record
KTable<String, Product> products = builder.table(
    "products-catalog",
    Consumed.with(Serdes.String(), new ProductSerde())
);

// Per fare il join, la chiave dell'ordine deve essere il product_id
// Usiamo il primo item dell'ordine come esempio
KStream<String, Order> ordersByProduct = orders
    .selectKey((orderId, order) ->
        order.getItems().get(0).getProductId()  // Reparticiona per product_id
    );

// LEFT JOIN: mantiene tutti gli ordini anche se il prodotto non esiste nel catalog
KStream<String, EnrichedOrder> enrichedOrders = ordersByProduct.leftJoin(
    products,
    (order, product) -> {
        if (product == null) {
            // Prodotto non trovato: usa dati parziali
            return EnrichedOrder.fromOrder(order);
        }
        return EnrichedOrder.fromOrderAndProduct(order, product);
    }
);

enrichedOrders
    .selectKey((productId, enriched) -> enriched.getOrderId())
    .mapValues(EnrichedOrder::toJson)
    .to("orders-enriched");

Error Handling in Kafka Streams

// Handler per errori di deserializzazione
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);
// Alternativa: LogAndFailExceptionHandler (ferma l'app) - piu sicura in produzione

// Handler personalizzato per errori di produzione
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
    CustomProductionExceptionHandler.class.getName());

// Implementazione handler custom con Dead Letter Queue
public class DLQProductionExceptionHandler implements ProductionExceptionHandler {
    private final KafkaProducer<byte[], byte[]> dlqProducer;

    public DLQProductionExceptionHandler() {
        Properties dlqProps = new Properties();
        dlqProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
        dlqProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class);
        dlqProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class);
        this.dlqProducer = new KafkaProducer<>(dlqProps);
    }

    @Override
    public ProductionExceptionHandlerResponse handle(
            ProducerRecord<byte[], byte[]> record,
            Exception exception) {
        log.error("Errore produzione per record key={}", new String(record.key()), exception);

        // Invia il record problematico alla DLQ con metadata dell'errore
        Headers headers = new RecordHeaders();
        headers.add("error-message", exception.getMessage().getBytes());
        headers.add("error-type", exception.getClass().getSimpleName().getBytes());
        headers.add("source-topic", record.topic().getBytes());
        headers.add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

        ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
            record.topic() + ".dlq",  // {topic-name}.dlq
            null,
            record.key(),
            record.value(),
            headers
        );

        dlqProducer.send(dlqRecord);

        // CONTINUE: skippa il record e va avanti (non blocca lo stream)
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
}

State Store: Access via Interactive Queries

// Interactive Queries: esponi lo stato locale via HTTP
// Utile per dashboard real-time, lookup diretti

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Leggi dal Key-Value Store locale
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "orders-count-store",
        QueryableStoreTypes.keyValueStore()
    )
);

// Lookup singolo: conta ordini per user "user-42"
Long userOrderCount = keyValueStore.get("user-42");
System.out.println("Ordini utente: " + userOrderCount);

// Scan di tutti i valori (attenzione: puo essere costoso)
KeyValueIterator<String, Long> allCounts = keyValueStore.all();
while (allCounts.hasNext()) {
    KeyValue<String, Long> kv = allCounts.next();
    System.out.printf("User: %s, Count: %d%n", kv.key, kv.value);
}
allCounts.close();  // Importante: chiudi sempre l'iterator

// Range query: utenti da "user-100" a "user-200"
KeyValueIterator<String, Long> rangeIter =
    keyValueStore.range("user-100", "user-200");
// usa rangeIter..., poi close()

Conclusions and When to Use Kafka Streams

Kafka Streams shines in scenarios where you want to add stream processing to an existing Java/Kotlin application without a dedicated cluster: aggregations real-time, joins between streams and reference tables, transformations and enrichment of the data. The library manages fault tolerance, partitioning and scaling horizontal in a transparent way.

For more complex analytics pipelines that require cross-stream temporal joins, CEP (Complex Event Processing), or integration with a data lake (Iceberg, Delta), Article 08 shows how Kafka + Apache Flink is the de facto standard in 2026.

The Complete Series: Apache Kafka

  • Article 01 — Apache Kafka Fundamentals
  • Article 02 — KRaft in Kafka 4.0
  • Article 03 — Advanced Producer and Consumer
  • Article 04 — Exactly-Once Semantics in Kafka
  • Article 05 — Schema Registry: Avro and Protobuf
  • Article 06 (this) — Kafka Streams: Stream Processing Embedded in Java
  • Article 07 — Kafka Connect: Debezium CDC and DB Integration
  • Article 08 — Kafka + Apache Flink: Pipeline Analytics Real-Time