2026 年に Kafka + Flink を使用する理由

2026 年には、Apache Kafka + Apache Flink の組み合わせがリファレンス スタックになりました 成熟したテクノロジー企業のリアルタイム分析パイプライン向け。 Kafka が取り込みを処理します 耐久性と拡張性を保証するデータ バッファリング。フリンクセール 1 回限りのセマンティクス、時間ウィンドウ、複雑な結合を使用したステートフル処理 Kafka Streams はネイティブにサポートしていません。

2025 年の決定的な変化は、 フリンク SQL 分析パイプラインのメイン インターフェイスとして: DataStream API を記述する代わりに Java では、データ エンジニアがストリームとテーブルに標準 SQL を記述します。 Confluent がリリースされました 彼の マネージドフリンク (サービスとしての Apache Flink) とネイティブ統合 Confluent Schema Registry と Apache Iceberg に接続し、多くのチームにとってスタックをゼロオペレーションにします。

何を学ぶか

  • Flink アーキテクチャ: JobManager、TaskManager、Kafka によるチェックポイント処理
  • Flink SQL: DDL を使用して Kafka ソース テーブルとシンク テーブルを作成する
  • 一時的な操作: Flink SQL の TUMBLE、HOP、SESSION ウィンドウ
  • ストリーミング結合: ストリーム間結合およびルックアップ テーブルによる一時的な結合
  • Apache Iceberg: データ レイクのテーブル形式、オブジェクト ストレージの ACID
  • エンドツーエンドのパイプライン: S3 上の Kafka -> Flink -> Iceberg
  • Confluent マネージド Flink: 構成と自己ホスト型との違い

Kafka を使用した Flink アーキテクチャ

Apache Flink は、分散ストリーム処理フレームワークです。その強み Kafka ストリームと管理機能との比較 あなたはとても素晴らしいです (TB) RocksDB と分散ストレージ (S3/GCS) 上のチェックポイント、間の複雑な結合 複数のストリーム、および CEP (Complex Event Processing)。

# Architettura Flink:
# JobManager: coordina l'esecuzione del job, gestisce i checkpoint
#   - unico (o 2 con HA tramite ZooKeeper/Kubernetes leader election)
#   - scheduler, checkpoint coordinator, metastore

# TaskManager: esegue i task paralleli (equivalente dei worker)
#   - ogni TaskManager ha N "slots" (unita di parallelismo)
#   - slot = thread dedicato con stato RocksDB locale

# Checkpoint: snapshot periodico dello stato su S3/GCS
#   - se un TaskManager crasha, il job riprende dall'ultimo checkpoint
#   - integrazione nativa con Kafka: salva l'offset Kafka nel checkpoint
#   - garantisce esattamente-once end-to-end

# Deployment su Kubernetes (Flink Operator)
# helm repo add flink-operator-repo \
#   https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
# helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

# FlinkDeployment CRD:
cat <<'EOF' | kubectl apply -f -
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: analytics-pipeline
spec:
  image: apache/flink:1.19-scala_2.12-java17
  flinkVersion: v1_19
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    state.backend: "rocksdb"
    state.backend.incremental: "true"
    state.checkpoints.dir: "s3://my-flink-checkpoints/analytics-pipeline"
    execution.checkpointing.interval: "60000"    # checkpoint ogni 60s
    execution.checkpointing.mode: "EXACTLY_ONCE"
    # Kafka source: committed offset = checkpoint offset
    execution.checkpointing.timeout: "300000"
  serviceAccount: flink-sa
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    replicas: 3
    resource:
      memory: "4096m"
      cpu: 2
  job:
    jarURI: s3://my-flink-jars/analytics-pipeline-1.0.jar
    parallelism: 8
    upgradeMode: stateful
EOF

Flink SQL: SQL 標準によるストリーム処理

Flink SQL を使用すると、Kafka ストリームを SQL テーブルとして扱うことができます。魔法と概念 の イベント時間 (イベント時間): Flink はデータ自体のタイムスタンプを使用します (到着時間ではありません) 窓の場合、遅れた到着を正しく管理できるようになります。

DDL: Kafka ソース テーブルを定義する

-- Flink SQL: crea una tabella che legge dal topic Kafka "orders"

CREATE TABLE kafka_orders (
    order_id    VARCHAR,
    user_id     VARCHAR,
    amount      DECIMAL(10, 2),
    currency    VARCHAR,
    status      VARCHAR,
    created_at  BIGINT,            -- timestamp in milliseconds (dal payload)
    -- Campo virtuale: converte il BIGINT in TIMESTAMP per le window
    event_time  AS TO_TIMESTAMP_LTZ(created_at, 3),
    -- Watermark: permette late arrivals fino a 5 secondi
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka-1:9092,kafka-2:9092',
    'properties.group.id' = 'flink-analytics-consumer',
    'scan.startup.mode' = 'latest-offset',  -- o 'earliest-offset' per backfill
    -- Schema Registry Avro
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081'
);

-- Tabella di riferimento: catalogo prodotti (changelog topic da Debezium)
CREATE TABLE products_table (
    product_id    VARCHAR,
    product_name  VARCHAR,
    category      VARCHAR,
    base_price    DECIMAL(10, 2),
    -- PRIMARY KEY: indica che e una tabella di lookup (upsert semantics)
    PRIMARY KEY (product_id) NOT ENFORCED
)
WITH (
    'connector' = 'kafka',
    'topic' = 'cdc.public.products',
    'properties.bootstrap.servers' = 'kafka-1:9092',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081',
    -- upsert-kafka: gestisce insert/update/delete basandosi sulla chiave
    'connector' = 'upsert-kafka'
);

Flink SQL のウィンドウ集計

-- Aggregazione: totale vendite per categoria ogni 10 minuti (TUMBLE window)
-- TUMBLE: finestre fisse non sovrapposte

SELECT
    window_start,
    window_end,
    p.category,
    COUNT(*)            AS order_count,
    SUM(o.amount)       AS total_revenue,
    AVG(o.amount)       AS avg_order_value,
    MAX(o.amount)       AS max_order_value
FROM TABLE(
    TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
JOIN products_table AS p ON o.product_id = p.product_id
GROUP BY window_start, window_end, p.category;

-- HOP window: sliding window di 1 ora, avanza ogni 15 minuti
-- Utile per medie mobili
SELECT
    window_start,
    window_end,
    user_id,
    COUNT(*)     AS orders_in_window,
    SUM(amount)  AS spend_in_window
FROM TABLE(
    HOP(TABLE kafka_orders, DESCRIPTOR(event_time),
        INTERVAL '15' MINUTE,   -- slide interval
        INTERVAL '1' HOUR       -- window size
    )
)
GROUP BY window_start, window_end, user_id;

-- SESSION window: raggruppa eventi dello stesso utente per sessione
SELECT
    window_start,
    window_end,
    user_id,
    COUNT(*)     AS orders_in_session,
    SUM(amount)  AS session_value
FROM TABLE(
    SESSION(TABLE kafka_orders, DESCRIPTOR(event_time),
        INTERVAL '30' MINUTE   -- gap: inattivita > 30 min = nuova sessione
    )
)
GROUP BY window_start, window_end, user_id;

ストリーム-ストリーム結合

-- Join tra due stream entro una finestra temporale
-- Esempio: join ordini con eventi di pagamento (devono arrivare entro 5 minuti)

CREATE TABLE payment_events (
    payment_id   VARCHAR,
    order_id     VARCHAR,
    payment_time BIGINT,
    status       VARCHAR,
    event_time   AS TO_TIMESTAMP_LTZ(payment_time, 3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH (
    'connector' = 'kafka',
    'topic' = 'payment-events',
    'properties.bootstrap.servers' = 'kafka-1:9092',
    'format' = 'json'
);

-- INTERVAL JOIN: join tra stream con condizione temporale
SELECT
    o.order_id,
    o.user_id,
    o.amount,
    p.status         AS payment_status,
    p.payment_time   AS paid_at
FROM kafka_orders AS o
JOIN payment_events AS p
ON o.order_id = p.order_id
-- Condizione temporale: il pagamento deve arrivare entro 5 minuti dall'ordine
AND p.event_time BETWEEN o.event_time
    AND o.event_time + INTERVAL '5' MINUTE;

Apache Iceberg: ACID を使用したデータ レイク

アパッチ・アイスバーグ データレイク用のオープンソースのテーブル形式 これにより、S3 などのオブジェクト ストレージに ACID (トランザクション、スナップショット分離) 保証がもたらされます。 そしてGCS。 2026 年、標準のデータ レイクハウス形式 - Snowflake、Databricks で使用 (Delta の代替として)、AWS Glue、および実質的にすべてのマネージド クエリ エンジン。

Flink との統合により、Kafka データを Iceberg にシームレスに書き込むことができます トランザクション保証による継続性: S3 上の Parquet ファイルはスナップショットに編成されます 不変で、進行中の書き込みに影響を与えることなく SELECT CURRENT でクエリ可能。

-- Crea un Iceberg catalog in Flink SQL
-- (usa Hive Metastore o REST Catalog di Iceberg)

CREATE CATALOG iceberg_catalog WITH (
    'type' = 'iceberg',
    'catalog-type' = 'rest',
    'uri' = 'http://iceberg-rest-catalog:8181',
    'warehouse' = 's3://data-lake/warehouse',
    'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'
);

USE CATALOG iceberg_catalog;
CREATE DATABASE IF NOT EXISTS analytics;

-- Crea la tabella Iceberg di destinazione
CREATE TABLE IF NOT EXISTS analytics.orders_enriched (
    order_id        VARCHAR,
    user_id         VARCHAR,
    amount          DECIMAL(10, 2),
    currency        VARCHAR,
    category        VARCHAR,
    product_name    VARCHAR,
    payment_status  VARCHAR,
    order_hour      TIMESTAMP(3),     -- per partitioning
    created_date    DATE              -- per partitioning
)
PARTITIONED BY (created_date, category)
WITH (
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'snappy',
    -- Compaction automatica: merge dei file piccoli
    'write.target-file-size-bytes' = '134217728',  -- 128MB
    -- Snapshot expiration automatica dopo 7 giorni
    'history.expire.min-snapshots-to-keep' = '10',
    'history.expire.max-snapshot-age-ms' = '604800000'
);

完全なパイプライン: Kafka から Iceberg まで

-- Pipeline end-to-end: leggi ordini da Kafka, arricchisci con prodotti,
-- aggrega per 10 min, scrivi su Iceberg in S3

-- Job Flink SQL continuo (eseguito con flink sql-client)
INSERT INTO analytics.orders_enriched
SELECT
    o.order_id,
    o.user_id,
    o.amount,
    o.currency,
    COALESCE(p.category, 'unknown')       AS category,
    COALESCE(p.product_name, 'unknown')   AS product_name,
    pay.status                             AS payment_status,
    window_start                           AS order_hour,
    CAST(window_start AS DATE)             AS created_date
FROM TABLE(
    TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
LEFT JOIN products_table AS p ON o.product_id = p.product_id
LEFT JOIN payment_events AS pay
    ON o.order_id = pay.order_id
    AND pay.event_time BETWEEN o.event_time
        AND o.event_time + INTERVAL '5' MINUTE;

-- Query su Iceberg da Athena/Spark/Trino (dopo che Flink ha scritto):
-- SELECT
--     created_date,
--     category,
--     COUNT(*) as orders,
--     SUM(amount) as revenue
-- FROM analytics.orders_enriched
-- WHERE created_date >= CURRENT_DATE - INTERVAL '7' DAY
-- GROUP BY created_date, category
-- ORDER BY created_date DESC, revenue DESC;

Confluent マネージド Flink

Confluent は、 Apache Flink 管理 統合された Confluent クラウド プラットフォームで。主な利点: インフラストラクチャの管理が不要 ネイティブ自動スケーリング、Confluent Schema Registry および Kafka トピックとの直接統合 手動構成なしで。

# Confluent Cloud Flink: workflow

# 1. Accedi alla Confluent Cloud Console
# 2. Naviga in Flink -> Compute pools -> Create pool
# Scegli: Cloud (AWS/Azure/GCP), Region, Max CFUs

# 3. In Flink SQL shell (Confluent Cloud o CLI):
confluent flink shell

# 4. I topic Kafka esistenti sono automaticamente disponibili come tabelle
-- Visualizza i topic come tabelle
SHOW TABLES IN kafka_cluster;

-- Gli schemi Avro dal Schema Registry vengono convertiti automaticamente
-- Non serve configurare 'connector', 'format', 'bootstrap.servers'
-- Tutto e pre-configurato dal managed service
SELECT * FROM `orders` LIMIT 10;

-- 5. Crea un Flink job (continuous query):
INSERT INTO `orders-enriched`
SELECT
    order_id,
    user_id,
    amount,
    category
FROM `orders` o
JOIN `products-catalog` p ON o.product_id = p.product_id;

-- 6. Monitora i job dalla UI Confluent
-- Metriche disponibili: records/sec, checkpoint lag, watermark lag

Flinkの監視とトラブルシューティング

# Metriche Flink chiave da monitorare (via Prometheus)

# 1. Checkpoint latency e dimensione
# flink_jobmanager_job_lastCheckpointDuration (ms)
# flink_jobmanager_job_lastCheckpointSize (bytes)
# Se duration > 60s o size > 10GB: problemi di stato o throughput

# 2. Watermark lag (ritardo nel processing degli eventi)
# flink_taskmanager_job_task_operator_currentInputWatermark
# Confronta con System.currentTimeMillis()
# Lag > 30s: il consumer non riesce a tenere il passo con Kafka

# 3. Back pressure
# flink_taskmanager_job_task_isBackPressured (0 o 1)
# Flink UI mostra le frecce rosse per i task sotto pressure

# 4. Kafka consumer lag (via Flink metrics)
# flink_taskmanager_job_task_operator_KafkaSourceReader_kafkaConsumerMetrics_records_lag_max
# Se lag in crescita: aumenta il parallelismo del source operator

# Accedi alla Flink Web UI:
kubectl port-forward svc/analytics-pipeline-rest 8081:8081
# http://localhost:8081 -> Jobs, Task Managers, Checkpoints

# Trigger checkpoint manuale (per debug)
curl -X POST http://flink-jobmanager:8081/jobs/{job-id}/checkpoints

# Cancella un job in modo sicuro (con savepoint per resume)
curl -X POST "http://flink-jobmanager:8081/jobs/{job-id}/stop" \
  -H "Content-Type: application/json" \
  -d '{"targetDirectory": "s3://my-savepoints/analytics", "drain": true}'

Kafka ストリームと Flink: どちらを選択するか

  • Kafka ストリームを使用して次のことを行います。 Java アプリに埋め込まれた単純な集計、 KStream-KTable (ストリーム + ルックアップ テーブル)、経験のない小規模チームに参加 分散クラスター、シンプルなデプロイメント (クラスターではなくライブラリー)。
  • フリンクを使用して次のことを行います。 複数のストリーム間の一時的な結合、状態が非常に大きい (GB/TB)、CEP (複雑なイベント処理)、Iceberg/Delta データ レイクへのパイプライン、 スケーリングに依存しない非 Java データ エンジニア向けのインターフェイスとしての Flink SQL アプリケーション マイクロサービス、1 秒未満のレイテンシー、1 回限りの保証。

結論

Kafka + Flink + Iceberg の組み合わせは最新のデータ スタックを表します 2026 年のレイクハウス リアルタイム: イベント ストリーミング バックボーンとしての Kafka、Flink の トランザクション保証を備えたステートフル処理、ACID ストレージ形式としての Iceberg オブジェクトストレージ上で。 Confluent のマネージド Flink を使用して、このスタックにアクセスできるようになりました 分散クラスターの運用専門知識を持たないチームでも利用できます。

完全なシリーズ: Apache Kafka

  • 第01条 — Apache Kafka の基礎: トピック、パーティション、コンシューマー グループ
  • 第02条 — Kafka 4.0 の KRaft: さようなら ZooKeeper
  • 第03条 — 先進的なプロデューサーとコンシューマー
  • 第04条 — Kafka の 1 回限りのセマンティクス
  • 第05条 — スキーマ レジストリ: Avro および Protobuf
  • 第06条 — Kafka ストリーム: KTable とウィンドウ処理
  • 第07条 — Kafka Connect: Debezium CDC と DB の統合
  • 第08条(本) — Kafka + Apache Flink: パイプライン分析リアルタイムと氷山シンク
  • 第09条 — Kafka のモニタリング: JMX エクスポーター、Prometheus、Grafana
  • 第10条 — Kafka でのデッドレターキューとエラー処理
  • 第11条 — 本番環境の Kafka: サイジング、保持、障害回復