Why Kafka + Flink in 2026

In 2026, the Apache Kafka + Apache Flink combination has become the reference stack for real-time analytics pipelines in mature tech companies. Kafka handles ingestion and data buffering with its guarantees of durability and scalability; Flink deals of stateful processing with exactly-once semantics, temporal windows and complex joins which Kafka Streams does not natively support.

The crucial change in 2025 was the massive adoption of Flink SQL as the main interface for analytics pipelines: instead of writing DataStream API in Java, data engineers write standard SQL on streams and tables. Confluent has launched his Managed Flink (Apache Flink as a Service) with native integration to the Confluent Schema Registry and Apache Iceberg, making the stack zero-ops for many teams.

What You Will Learn

  • Flink architecture: JobManager, TaskManager, checkpointing with Kafka
  • Flink SQL: Create Kafka source and sink tables with DDL
  • Temporal operations: TUMBLE, HOP, SESSION windows in Flink SQL
  • Streaming join: stream-stream join and temporal join with lookup table
  • Apache Iceberg: table format for data lake, ACID on object storage
  • End-to-end pipeline: Kafka -> Flink -> Iceberg on S3
  • Confluent Managed Flink: configuration and differences from self-hosted

Flink architecture with Kafka

Apache Flink is a distributed stream processing framework. Its strong point compared to Kafka Streams and the ability to manage you are very great (TB) with RocksDB and checkpointing on distributed storage (S3/GCS), complex joins between multiple streams, and CEP (Complex Event Processing).

# Architettura Flink:
# JobManager: coordina l'esecuzione del job, gestisce i checkpoint
#   - unico (o 2 con HA tramite ZooKeeper/Kubernetes leader election)
#   - scheduler, checkpoint coordinator, metastore

# TaskManager: esegue i task paralleli (equivalente dei worker)
#   - ogni TaskManager ha N "slots" (unita di parallelismo)
#   - slot = thread dedicato con stato RocksDB locale

# Checkpoint: snapshot periodico dello stato su S3/GCS
#   - se un TaskManager crasha, il job riprende dall'ultimo checkpoint
#   - integrazione nativa con Kafka: salva l'offset Kafka nel checkpoint
#   - garantisce esattamente-once end-to-end

# Deployment su Kubernetes (Flink Operator)
# helm repo add flink-operator-repo \
#   https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
# helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

# FlinkDeployment CRD:
cat <<'EOF' | kubectl apply -f -
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: analytics-pipeline
spec:
  image: apache/flink:1.19-scala_2.12-java17
  flinkVersion: v1_19
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    state.backend: "rocksdb"
    state.backend.incremental: "true"
    state.checkpoints.dir: "s3://my-flink-checkpoints/analytics-pipeline"
    execution.checkpointing.interval: "60000"    # checkpoint ogni 60s
    execution.checkpointing.mode: "EXACTLY_ONCE"
    # Kafka source: committed offset = checkpoint offset
    execution.checkpointing.timeout: "300000"
  serviceAccount: flink-sa
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    replicas: 3
    resource:
      memory: "4096m"
      cpu: 2
  job:
    jarURI: s3://my-flink-jars/analytics-pipeline-1.0.jar
    parallelism: 8
    upgradeMode: stateful
EOF

Flink SQL: Stream Processing with SQL Standard

Flink SQL allows you to treat Kafka streams as SQL tables. Magic and the concept of event time (event time): Flink uses the timestamp in the data itself (not the arrival time) for windows, allowing late arrivals to be managed correctly.

DDL: Define Kafka Source Tables

-- Flink SQL: crea una tabella che legge dal topic Kafka "orders"

CREATE TABLE kafka_orders (
    order_id    VARCHAR,
    user_id     VARCHAR,
    amount      DECIMAL(10, 2),
    currency    VARCHAR,
    status      VARCHAR,
    created_at  BIGINT,            -- timestamp in milliseconds (dal payload)
    -- Campo virtuale: converte il BIGINT in TIMESTAMP per le window
    event_time  AS TO_TIMESTAMP_LTZ(created_at, 3),
    -- Watermark: permette late arrivals fino a 5 secondi
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka-1:9092,kafka-2:9092',
    'properties.group.id' = 'flink-analytics-consumer',
    'scan.startup.mode' = 'latest-offset',  -- o 'earliest-offset' per backfill
    -- Schema Registry Avro
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081'
);

-- Tabella di riferimento: catalogo prodotti (changelog topic da Debezium)
CREATE TABLE products_table (
    product_id    VARCHAR,
    product_name  VARCHAR,
    category      VARCHAR,
    base_price    DECIMAL(10, 2),
    -- PRIMARY KEY: indica che e una tabella di lookup (upsert semantics)
    PRIMARY KEY (product_id) NOT ENFORCED
)
WITH (
    'connector' = 'kafka',
    'topic' = 'cdc.public.products',
    'properties.bootstrap.servers' = 'kafka-1:9092',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081',
    -- upsert-kafka: gestisce insert/update/delete basandosi sulla chiave
    'connector' = 'upsert-kafka'
);

Window Aggregations in Flink SQL

-- Aggregazione: totale vendite per categoria ogni 10 minuti (TUMBLE window)
-- TUMBLE: finestre fisse non sovrapposte

SELECT
    window_start,
    window_end,
    p.category,
    COUNT(*)            AS order_count,
    SUM(o.amount)       AS total_revenue,
    AVG(o.amount)       AS avg_order_value,
    MAX(o.amount)       AS max_order_value
FROM TABLE(
    TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
JOIN products_table AS p ON o.product_id = p.product_id
GROUP BY window_start, window_end, p.category;

-- HOP window: sliding window di 1 ora, avanza ogni 15 minuti
-- Utile per medie mobili
SELECT
    window_start,
    window_end,
    user_id,
    COUNT(*)     AS orders_in_window,
    SUM(amount)  AS spend_in_window
FROM TABLE(
    HOP(TABLE kafka_orders, DESCRIPTOR(event_time),
        INTERVAL '15' MINUTE,   -- slide interval
        INTERVAL '1' HOUR       -- window size
    )
)
GROUP BY window_start, window_end, user_id;

-- SESSION window: raggruppa eventi dello stesso utente per sessione
SELECT
    window_start,
    window_end,
    user_id,
    COUNT(*)     AS orders_in_session,
    SUM(amount)  AS session_value
FROM TABLE(
    SESSION(TABLE kafka_orders, DESCRIPTOR(event_time),
        INTERVAL '30' MINUTE   -- gap: inattivita > 30 min = nuova sessione
    )
)
GROUP BY window_start, window_end, user_id;

Stream-Stream Join

-- Join tra due stream entro una finestra temporale
-- Esempio: join ordini con eventi di pagamento (devono arrivare entro 5 minuti)

CREATE TABLE payment_events (
    payment_id   VARCHAR,
    order_id     VARCHAR,
    payment_time BIGINT,
    status       VARCHAR,
    event_time   AS TO_TIMESTAMP_LTZ(payment_time, 3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH (
    'connector' = 'kafka',
    'topic' = 'payment-events',
    'properties.bootstrap.servers' = 'kafka-1:9092',
    'format' = 'json'
);

-- INTERVAL JOIN: join tra stream con condizione temporale
SELECT
    o.order_id,
    o.user_id,
    o.amount,
    p.status         AS payment_status,
    p.payment_time   AS paid_at
FROM kafka_orders AS o
JOIN payment_events AS p
ON o.order_id = p.order_id
-- Condizione temporale: il pagamento deve arrivare entro 5 minuti dall'ordine
AND p.event_time BETWEEN o.event_time
    AND o.event_time + INTERVAL '5' MINUTE;

Apache Iceberg: The Data Lake with ACID

Apache Iceberg and an open-source table format for the data lake which brings ACID (transaction, snapshot isolation) guarantees to object storage like S3 and GCS. In 2026, and the standard data lakehouse format — used by Snowflake, Databricks (as an alternative to Delta), AWS Glue and practically every managed query engine.

Its integration with Flink allows you to write Kafka data to Iceberg seamlessly continuous with transactional guarantees: Parquet files on S3 are organized into snapshots immutable, queryable with SELECT CURRENT without impact on ongoing writes.

-- Crea un Iceberg catalog in Flink SQL
-- (usa Hive Metastore o REST Catalog di Iceberg)

CREATE CATALOG iceberg_catalog WITH (
    'type' = 'iceberg',
    'catalog-type' = 'rest',
    'uri' = 'http://iceberg-rest-catalog:8181',
    'warehouse' = 's3://data-lake/warehouse',
    'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'
);

USE CATALOG iceberg_catalog;
CREATE DATABASE IF NOT EXISTS analytics;

-- Crea la tabella Iceberg di destinazione
CREATE TABLE IF NOT EXISTS analytics.orders_enriched (
    order_id        VARCHAR,
    user_id         VARCHAR,
    amount          DECIMAL(10, 2),
    currency        VARCHAR,
    category        VARCHAR,
    product_name    VARCHAR,
    payment_status  VARCHAR,
    order_hour      TIMESTAMP(3),     -- per partitioning
    created_date    DATE              -- per partitioning
)
PARTITIONED BY (created_date, category)
WITH (
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'snappy',
    -- Compaction automatica: merge dei file piccoli
    'write.target-file-size-bytes' = '134217728',  -- 128MB
    -- Snapshot expiration automatica dopo 7 giorni
    'history.expire.min-snapshots-to-keep' = '10',
    'history.expire.max-snapshot-age-ms' = '604800000'
);

Complete Pipeline: Kafka to Iceberg

-- Pipeline end-to-end: leggi ordini da Kafka, arricchisci con prodotti,
-- aggrega per 10 min, scrivi su Iceberg in S3

-- Job Flink SQL continuo (eseguito con flink sql-client)
INSERT INTO analytics.orders_enriched
SELECT
    o.order_id,
    o.user_id,
    o.amount,
    o.currency,
    COALESCE(p.category, 'unknown')       AS category,
    COALESCE(p.product_name, 'unknown')   AS product_name,
    pay.status                             AS payment_status,
    window_start                           AS order_hour,
    CAST(window_start AS DATE)             AS created_date
FROM TABLE(
    TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
LEFT JOIN products_table AS p ON o.product_id = p.product_id
LEFT JOIN payment_events AS pay
    ON o.order_id = pay.order_id
    AND pay.event_time BETWEEN o.event_time
        AND o.event_time + INTERVAL '5' MINUTE;

-- Query su Iceberg da Athena/Spark/Trino (dopo che Flink ha scritto):
-- SELECT
--     created_date,
--     category,
--     COUNT(*) as orders,
--     SUM(amount) as revenue
-- FROM analytics.orders_enriched
-- WHERE created_date >= CURRENT_DATE - INTERVAL '7' DAY
-- GROUP BY created_date, category
-- ORDER BY created_date DESC, revenue DESC;

Confluent Managed Flink

Confluent has launched its Apache Flink managed integrated in the Confluent Cloud platform. The main advantage: zero infrastructure to manage, native autoscaling, direct integration with Confluent Schema Registry and Kafka topics without manual configuration.

# Confluent Cloud Flink: workflow

# 1. Accedi alla Confluent Cloud Console
# 2. Naviga in Flink -> Compute pools -> Create pool
# Scegli: Cloud (AWS/Azure/GCP), Region, Max CFUs

# 3. In Flink SQL shell (Confluent Cloud o CLI):
confluent flink shell

# 4. I topic Kafka esistenti sono automaticamente disponibili come tabelle
-- Visualizza i topic come tabelle
SHOW TABLES IN kafka_cluster;

-- Gli schemi Avro dal Schema Registry vengono convertiti automaticamente
-- Non serve configurare 'connector', 'format', 'bootstrap.servers'
-- Tutto e pre-configurato dal managed service
SELECT * FROM `orders` LIMIT 10;

-- 5. Crea un Flink job (continuous query):
INSERT INTO `orders-enriched`
SELECT
    order_id,
    user_id,
    amount,
    category
FROM `orders` o
JOIN `products-catalog` p ON o.product_id = p.product_id;

-- 6. Monitora i job dalla UI Confluent
-- Metriche disponibili: records/sec, checkpoint lag, watermark lag

Monitoring and Troubleshooting Flink

# Metriche Flink chiave da monitorare (via Prometheus)

# 1. Checkpoint latency e dimensione
# flink_jobmanager_job_lastCheckpointDuration (ms)
# flink_jobmanager_job_lastCheckpointSize (bytes)
# Se duration > 60s o size > 10GB: problemi di stato o throughput

# 2. Watermark lag (ritardo nel processing degli eventi)
# flink_taskmanager_job_task_operator_currentInputWatermark
# Confronta con System.currentTimeMillis()
# Lag > 30s: il consumer non riesce a tenere il passo con Kafka

# 3. Back pressure
# flink_taskmanager_job_task_isBackPressured (0 o 1)
# Flink UI mostra le frecce rosse per i task sotto pressure

# 4. Kafka consumer lag (via Flink metrics)
# flink_taskmanager_job_task_operator_KafkaSourceReader_kafkaConsumerMetrics_records_lag_max
# Se lag in crescita: aumenta il parallelismo del source operator

# Accedi alla Flink Web UI:
kubectl port-forward svc/analytics-pipeline-rest 8081:8081
# http://localhost:8081 -> Jobs, Task Managers, Checkpoints

# Trigger checkpoint manuale (per debug)
curl -X POST http://flink-jobmanager:8081/jobs/{job-id}/checkpoints

# Cancella un job in modo sicuro (con savepoint per resume)
curl -X POST "http://flink-jobmanager:8081/jobs/{job-id}/stop" \
  -H "Content-Type: application/json" \
  -d '{"targetDirectory": "s3://my-savepoints/analytics", "drain": true}'

Kafka Streams vs Flink: When to Choose Which

  • Use Kafka Streams to: simple aggregations embedded in the Java app, join KStream-KTable (stream + lookup table), small teams with no experience of distributed clusters, simple deployment (and a library, not a cluster).
  • Use Flink to: temporal joins between multiple streams, state very large (GB/TB), CEP (Complex Event Processing), pipeline to Iceberg/Delta data lake, Flink SQL as an interface for non-Java data engineers, scaling agnostic application microservice, sub-second latencies with exactly-once guarantees.

Conclusions

The Kafka + Flink + Iceberg combination represents the modern data stack lakehouse real-time in 2026: Kafka as an event streaming backbone, Flink for the Stateful processing with transactional guarantees, Iceberg as an ACID storage format on object storage. With Confluent's managed Flink, this stack is now accessible even to teams without operational expertise on distributed clusters.

The Complete Series: Apache Kafka

  • Article 01 — Apache Kafka Fundamentals: Topics, Partitions and Consumer Groups
  • Article 02 — KRaft in Kafka 4.0: Goodbye ZooKeeper
  • Article 03 — Advanced Producer and Consumer
  • Article 04 — Exactly-Once Semantics in Kafka
  • Article 05 — Schema Registry: Avro and Protobuf
  • Article 06 — Kafka Streams: KTable and Windowing
  • Article 07 — Kafka Connect: Debezium CDC and DB Integration
  • Article 08 (this) — Kafka + Apache Flink: Pipeline Analytics Real-Time and Iceberg Sink
  • Article 09 — Monitoring Kafka: JMX Exporter, Prometheus and Grafana
  • Article 10 — Dead Letter Queue and Error Handling in Kafka
  • Article 11 — Kafka in Production: Sizing, Retention and Disaster Recovery