パイプライン オーケストレーション: Airflow、Dagster、Prefect
完璧なデータスタックがあることを想像してみてください: Apache Iceberg の湖畔、変換 エレガントな dbt、多数のソースからのデータを同期する Airbyte コネクタ。すべて 誰かがあなたにこう尋ねるまでは、「しかし、誰がこれをすべて実行しているのですか? 誰が 取り込みジョブは変換の前に開始されますか?何か問題が起こったときに誰が警告してくれるのか 午前3時に?」答えは、パイプライン オーケストレーター.
オーケストレーションと データスタックの目に見えない接着剤: コンポーネント 何百もの相互依存タスクの実行を調整し、次のような場合の再試行を管理します。 障害を監視し、パイプラインの状態を監視し、ビジネスに必要な監査証跡を作成します。 オーケストレーターがなければ、データ パイプラインは希望によってまとめられた cron スクリプトになります。
2025 年には、オーケストレーターの状況は 3 つの主要なプラットフォームによって支配されます。 Apache エアフロー 3.0、DAG の概念自体を定義したベテラン オーケストレーション。 ダグスター 1.xというコンセプトを導入した挑戦者 ソフトウェア定義資産の。そして 知事3、Python ファーストで設計されたソリューション 開発者のエクスペリエンスのために。この記事では、コードを使用してそれらを詳しく分析します。 実際の正直な比較により、状況に応じて適切なものを選択できます。
この記事で学べること
- Apache Airflow 3.0 内部アーキテクチャ: スケジューラ、エグゼキュータ、ワーカー、メタデータ DB
- エンドツーエンド ETL パイプライン用の完全な Python DAG を作成する方法
- Dagster のソフトウェア デファインド アセット パラダイムと、それがパイプラインに対する考え方を変える理由
- Prefect 3 のフロー/タスク モデルとその簡略化された展開モデル
- ユースケース別の比較表による 3 つのツール間の詳細な比較
- 耐久性があり、長時間実行されるワークフローの代替としての Temporal
- dbt、Airbyte、データ レイクハウスとオーケストレーションを統合するためのリファレンス アーキテクチャ
- 本番環境における冪等パイプラインの監視、アラート、ベスト プラクティス
データ ウェアハウス、AI、デジタル トランスフォーメーション シリーズの記事
| # | アイテム | 集中 |
|---|---|---|
| 1 | データ ウェアハウスの進化: SQL Server からデータ レイクハウスへ | アーキテクチャとプラットフォーム |
| 2 | データメッシュと分散型アーキテクチャ | ガバナンスと所有権 |
| 3 | ETL 対モダン ELT: dbt、Airbyte、Fivetran | 変換パイプライン |
| 4 | 現在位置 - パイプライン オーケストレーション | エアフロー、ダグスター、プリフェクト |
| 5 | 製造における AI: 予知保全 | IoT、ML、デジタルツイン |
| 6 | 金融における AI: 不正行為の検出と信用スコアリング | リアルタイム ML |
| 7 | 小売における AI: 需要予測と推奨 | ML 適用 |
オーケストレーションが基本であるため
技術的な詳細に入る前に、それがどのような問題を解決するのかを正確に理解する価値があります。 オーケストレーター。中規模から大企業の一般的なデータ パイプラインには次のものが含まれます。
- CRM、ERP、トランザクション データベース、外部 API からの抽出 (10 ~ 50 のソース)
- データレイクへのアップロード (Airflow トリガー Airbyte または Fivetran)
- dbt 変換 (複雑な依存関係を持つ 30 ~ 200 のモデル)
- データマートと集計テーブルの更新
- BI ツール (メタベース、Tableau、Power BI) へのエクスポート
- ML モデルの更新 (特徴量エンジニアリング + 再トレーニング)
これらの各ステップには、 依存症 (B は A が出発する前に出発することはできません) 完了)、 ALS (レポートは 08:00 までに準備ができていなければなりません)、e 品質要件 (ソースデータが空の場合は、 変換)。これらすべてを個別の cron スクリプトと保証された災害で処理します。
パイプライン オーケストレーターの機能
| 機能性 | 説明 |
|---|---|
| 依存関係の管理 | タスクの実行順序を定義し、パイプラインの依存関係を管理します。 |
| スケジュール設定 | Cron スケジューリング、イベント駆動型、データ対応 (資産更新時のトリガー) |
| 再試行とエラー処理 | 指数バックオフによる自動再試行、部分的な障害の管理 |
| 平行度 | 時間を最適化するための独立したタスクの並列実行 |
| 監視 | 一元化されたダッシュボード、タスクログ、SLAモニタリング、アラート |
| 埋め戻し | パイプラインのロジックが変更された場合の履歴実行の再実行 |
| 監査証跡 | コンプライアンスとデバッグのための各実行の完全な履歴 |
| パラメータ化 | 環境 (開発、ステージング、本番) および手動実行の変数構成 |
Apache Airflow 3.0: 改良されたベテラン
2014 年に Airbnb によって作成され、2016 年に Apache Software Foundation に寄付されました。 アパッチ エアフロー そして、データ パイプライン オーケストレーションの事実上の標準となっています。 超えて GitHub 上の 35,000 個のスター そして何百人ものコミュニティ Airflow の寄稿者は、スタートアップから世界中の何千もの企業によって使用されています。 大企業へ。
2025 年、Airflow は最も重要な進化を遂げ、 バージョン 3.0 では、クライアント/サーバー アーキテクチャが導入されています。 タスクの実行 インターフェース、ネイティブのアセット認識スケジューリング (Dagster から継承)、DAG バージョン管理と完全に再設計された UI。そしてもはや単なるジョブ スケジューラではありません。 最新のオーケストレーション プラットフォーム。
Apache Airflow アーキテクチャ
Airflow を適切に導入して診断するには、Airflow のアーキテクチャを理解することが重要です 生産上の問題。主要なコンポーネントは次の 5 つです。
エアフロー建築コンポーネント
| 成分 | 役割 | テクノロジー |
|---|---|---|
| ウェブサーバー | DAG のモニタリング、デバッグ、手動トリガーのための Web UI | Flask + Gunicorn (2.x)、FastAPI (3.0) |
| スケジューラ | DAG を分析し、タスクをスケジュールし、それらを実行キューに配置します。 | Python デーモン、複数のインスタンスによる HA |
| 執行者 | タスクを実行します: LocalExecutor (単一ノード)、CeleryExecutor、KubernetesExecutor | Celery + Redis/RabbitMQ、または K8s |
| 労働者 | 実際にタスクを処理する (CeleryExecutor/KubernetesExecutor のみ) | セロリワーカーまたはK8sポッド |
| メタデータ データベース | すべての DAG 実行、タスク インスタンス、変数、接続のステータス | PostgreSQL (本番環境で推奨)、MySQL |
| DAGプロセッサ | Airflow 3.0 の新機能: スケジューラから分離された DAG パーサー | Python プロセス プール |
DAGのコンセプト
Airflow の心臓部と DAG (有向非巡回グラフ): 有向グラフ タスクの実行順序を定義するサイクルはありません。グラフの各ノードはタスクです。 各アークは依存関係です。グラフは次のとおりである必要があります 非周期的な (サイクルなし)、したがって 循環依存関係を作成することはできません。
タスクは次の方法で実装されます。 オペレーター: 抽象化する Python クラス 仕事の一種。 Airflow には何百もの組み込みオペレーター (PythonOperator、 BashOperator、PostgresOperator、SparkSubmitOperator、dbtCloudOperator...) とコミュニティ プロバイダー パッケージを介して多数公開します。
コード例: セールス パイプラインの完全な ETL DAG
ETL パイプラインをオーケストレーションする完全かつ現実的な DAG を見てみましょう。 PostgreSQL、ステージングへのロード、dbt 変換、およびチームへの通知。
# dags/pipeline_vendite.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging
logger = logging.getLogger(__name__)
# Configurazione DAG con default_args per tutti i task
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"email": ["data-team@azienda.it"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5, 10, 20 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(hours=2),
}
with DAG(
dag_id="pipeline_vendite_giornaliera",
default_args=default_args,
description="Estrai vendite da ERP, carica in DWH, aggrega con dbt",
schedule="0 5 * * *", # Ogni giorno alle 05:00 UTC
start_date=datetime(2025, 1, 1),
end_date=None,
catchup=False, # Non eseguire run passate
max_active_runs=1, # Una sola esecuzione alla volta
tags=["vendite", "etl", "produzione"],
doc_md="""
## Pipeline Vendite Giornaliera
Estrae ordini dall'ERP (PostgreSQL), li carica in staging,
lancia le trasformazioni dbt e notifica il team su Slack.
**SLA**: Completamento entro le 07:30 UTC
**Owner**: Team Data Engineering
""",
) as dag:
# ============ TASK 1: VERIFICA DISPONIBILITA SORGENTE ============
def check_source_data(**context):
"""Verifica che ci siano dati nella finestra di esecuzione."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
execution_date = context["ds"] # "2025-01-15"
count = pg_hook.get_first("""
SELECT COUNT(*)
FROM orders
WHERE DATE(created_at) = %s
""", parameters=[execution_date])[0]
logger.info(f"Trovati {count} ordini per {execution_date}")
if count == 0:
logger.warning("Nessun dato trovato, skip pipeline")
return "notify_no_data" # Branch: salta elaborazione
return "extract_task_group.extract_orders"
check_source = BranchPythonOperator(
task_id="check_source_data",
python_callable=check_source_data,
)
# ============ TASK GROUP: ESTRAZIONE ============
with TaskGroup("extract_task_group") as extract_group:
def extract_orders(**context):
"""Estrae ordini dall'ERP e li salva in staging."""
execution_date = context["ds"]
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
df = pg_hook.get_pandas_df("""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price * (1 - COALESCE(oi.discount, 0)))
AS importo_totale,
COUNT(oi.id) AS num_righe
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = %(exec_date)s
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""", parameters={"exec_date": execution_date})
# Validazione dati estratti
assert df["importo_totale"].ge(0).all(), "Importi negativi trovati!"
assert df["ordine_id"].is_unique, "Ordini duplicati nell'estrazione!"
# Salva in XCom per passare al task successivo
# Per dataset grandi, usa staging table invece di XCom
context["ti"].xcom_push(key="row_count", value=len(df))
# Carica in tabella staging del DWH
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
dwh_hook.insert_rows(
table="staging.stg_ordini_raw",
rows=df.values.tolist(),
target_fields=df.columns.tolist(),
replace=True,
replace_index=["ordine_id"],
)
logger.info(f"Caricati {len(df)} ordini in staging")
extract_orders_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
def extract_customers(**context):
"""Estrae snapshot clienti aggiornati."""
pg_hook = PostgresHook(postgres_conn_id="erp_postgres")
dwh_hook = PostgresHook(postgres_conn_id="dwh_postgres")
df_customers = pg_hook.get_pandas_df("""
SELECT id, email, nome, cognome, citta, segmento,
DATE(updated_at) AS data_aggiornamento
FROM customers
WHERE DATE(updated_at) >= CURRENT_DATE - INTERVAL '1 day'
""")
if len(df_customers) > 0:
dwh_hook.insert_rows(
table="staging.stg_clienti_raw",
rows=df_customers.values.tolist(),
target_fields=df_customers.columns.tolist(),
replace=True,
replace_index=["id"],
)
extract_customers_task = PythonOperator(
task_id="extract_customers",
python_callable=extract_customers,
)
# I due extract girano in PARALLELO
[extract_orders_task, extract_customers_task]
# ============ TASK GROUP: TRASFORMAZIONI DBT ============
with TaskGroup("dbt_task_group") as dbt_group:
# dbt run sullo staging layer
dbt_staging = BashOperator(
task_id="dbt_run_staging",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:staging --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt run sul marts layer (dipende dallo staging)
dbt_marts = BashOperator(
task_id="dbt_run_marts",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt run --select tag:marts --target prod \
--vars '{"execution_date": "{{ ds }}"}'
""",
)
# dbt test per validare la qualità
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="""
cd /opt/airflow/dbt/dwh_project &&
dbt test --select tag:staging tag:marts --target prod
""",
)
dbt_staging >> dbt_marts >> dbt_test
# ============ NOTIFICHE ============
notify_success = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_data_team",
message="""
:white_check_mark: *Pipeline Vendite Completata*
Data: {{ ds }}
Ordini elaborati: {{ ti.xcom_pull(task_ids='extract_task_group.extract_orders', key='row_count') }}
Durata: {{ dag_run.get_task_instance('dbt_task_group.dbt_test').duration | round(1) }}s
""",
trigger_rule="all_success",
)
notify_no_data = SlackWebhookOperator(
task_id="notify_no_data",
slack_webhook_conn_id="slack_data_team",
message=":information_source: *Pipeline Vendite* - Nessun dato per {{ ds }}, esecuzione saltata",
)
notify_failure = SlackWebhookOperator(
task_id="notify_failure",
slack_webhook_conn_id="slack_data_team",
message=":x: *Pipeline Vendite FALLITA* - Data: {{ ds }} - Controlla Airflow UI!",
trigger_rule="one_failed",
)
# ============ DIPENDENZE DAG ============
check_source >> [extract_group, notify_no_data]
extract_group >> dbt_group >> notify_success
dbt_group >> notify_failure
この DAG は、Airflow の基本的なパターンを示しています。 TaskGroup のために
関連タスクをグループ化する、 BranchPythonOperator 条件付きロジックにより、
XCom タスク間でデータを渡し、管理ルールをトリガーする
成功と失敗に関する通知の数。
Apache Airflow 3.0 (2025) の新機能
- タスク実行インターフェース: ワーカーを Airflow API サーバーから分離し、セキュリティとスケーラビリティを向上させる新しいクライアントサーバー API
- 資産を意識したスケジューリング: DAG は、cron スケジュールだけでなく、アセット (データセット) の更新によってトリガーできるようになりました
- DAG のバージョン管理: 各実行は起動時に DAG バージョンに関連付けられるため、デプロイメント中の不一致が排除されます。
- 人間参加型: Airflow 3.1 では、続行する前に人間の承認を待つワークフローが導入されています
- 反応UI: インターフェースを完全に再設計し、より高速かつ直感的に
- 個別の DAG プロセッサ: DAG 解析は専用プロセスで行われるため、スケジューラの負荷が軽減されます。
Dagster: Software-Defined Assets パラダイム
2018 年に Elementl (現 Dagster Labs) から誕生した Dagster はパラダイムシフトをもたらしました オーケストレーションにおけるラディカル: 考える代わりに 実行するタスク、私たちは考えます 生み出す資産。アセットおよびあらゆるデータ成果物: DWH 内のテーブル、 ML モデル、Parquet ファイル、生成されたレポート。
このアプローチは、 ソフトウェア定義資産 (SDA)、革命を起こした
開発者のエクスペリエンス。 Airflow で「このスクリプトを 5:00 に実行する」を定義します。ダグスターで
「テーブルが欲しい」と定義する gold.fatturato_mensile 常に最新の状態であり、
そして彼女は自分自身を更新する方法を知っています。」その違いは微妙に見えますが、それはすべてを変えます:自動的な血統、
簡単なテストで、データスタックに何が存在するかを即座に理解できます。
2025 年に、Dagster 1.9 はコンポーネント フレームワークで完成度に達しました (2025 年 10 月に一般提供)。 これにより、パイプライン全体を宣言型構成として記述できるようになり、カタログ システム内のすべての資産のステータスに対する前例のない可視性を提供する高度な機能です。
Dagster の主要な概念
ダグスターの用語
| コンセプト | 説明 | 風量換算 |
|---|---|---|
| 資産 | パイプラインが生成するデータ アーティファクト (テーブル、モデル、ファイル) | 直接的に相当するものはありません |
| @assets | アセットの生成方法を定義する Python デコレータ | PythonOperator (より限定的) |
| 求人 | 一緒に実行するアセット/オペレーションの選択 | DAG |
| Op | 明示的なデータ出力のない一般的なタスク | オペレーター |
| Iマネージャー | アセットの保存および読み取り方法を管理します (S3、BigQuery、Snowflake など)。 | 同等のものはありません |
| リソース | 共有接続とクライアント (データベース、API) | 接続+フック |
| センサー | イベント駆動型トリガー (ファイルシステムポーリング、API、イベント) | センサーオペレーター |
| スケジュール | ジョブの時間ベースのトリガー | DAG スケジュール |
| パーティション | 日付、カテゴリ、地域による資産の分割 | PartitionedSchedule (より複雑) |
コード例: Dagster を使用したアセットベースのパイプライン
当社は Airflow と同じ販売パイプラインを実装していますが、Dagster パラダイムを使用しています。 アセット間の依存関係がどのように明示的であるか、また、 テストが簡単になります。
# pipeline_vendite/assets.py
import pandas as pd
from dagster import (
asset,
AssetIn,
AssetExecutionContext,
MetadataValue,
Output,
DailyPartitionsDefinition,
MaterializeResult,
)
from dagster_dbt import dbt_assets, DbtCliResource
from resources import postgres_erp, postgres_dwh, slack_resource
# Partizionamento giornaliero - Dagster gestisce il backfill automaticamente
daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")
# ============ ASSET 1: ORDINI GREZZI DALL'ERP ============
@asset(
name="raw_ordini",
group_name="ingestione",
partitions_def=daily_partitions,
description="Ordini grezzi estratti dall'ERP PostgreSQL",
metadata={
"source": "ERP PostgreSQL",
"owner": "data-engineering@azienda.it",
},
compute_kind="python",
)
def raw_ordini(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae gli ordini per la partizione corrente."""
partition_date = context.partition_key # "2025-01-15"
df = postgres_erp.execute_query(f"""
SELECT
o.id AS ordine_id,
o.customer_id,
o.created_at AS data_ordine,
o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = '{partition_date}'
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
""")
# Metadata allegati all'asset: visibili nella UI Dagster
return Output(
df,
metadata={
"num_records": MetadataValue.int(len(df)),
"importo_totale": MetadataValue.float(df["importo_totale"].sum()),
"preview": MetadataValue.md(df.head(5).to_markdown()),
},
)
# ============ ASSET 2: CLIENTI AGGIORNATI ============
@asset(
name="raw_clienti",
group_name="ingestione",
partitions_def=daily_partitions,
compute_kind="python",
)
def raw_clienti(
context: AssetExecutionContext,
postgres_erp: PostgresResource,
) -> Output[pd.DataFrame]:
"""Estrae snapshot clienti aggiornati."""
partition_date = context.partition_key
df = postgres_erp.execute_query(f"""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = '{partition_date}'
""")
return Output(df, metadata={"num_records": MetadataValue.int(len(df))})
# ============ ASSET 3: ORDINI ARRICCHITI (Silver Layer) ============
@asset(
name="silver_ordini_arricchiti",
group_name="silver",
partitions_def=daily_partitions,
ins={
"raw_ordini": AssetIn("raw_ordini"),
"raw_clienti": AssetIn("raw_clienti"),
},
description="Ordini joinati con info cliente, validati",
compute_kind="python",
)
def silver_ordini_arricchiti(
context: AssetExecutionContext,
raw_ordini: pd.DataFrame,
raw_clienti: pd.DataFrame,
postgres_dwh: PostgresResource,
) -> MaterializeResult:
"""Arricchisce e valida gli ordini."""
df = raw_ordini.merge(
raw_clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
# Validazioni
assert df["importo_totale"].ge(0).all(), "Importi negativi!"
assert df["ordine_id"].is_unique, "Ordini duplicati!"
# Carica nel DWH
postgres_dwh.load_dataframe(
df=df,
table="silver.ordini_arricchiti",
partition_col="data_ordine",
partition_value=context.partition_key,
)
return MaterializeResult(
metadata={
"num_records": MetadataValue.int(len(df)),
"clienti_senza_match": MetadataValue.int(df["segmento"].isna().sum()),
}
)
# ============ ASSET 4: MODELLI DBT (Gold Layer) ============
# Dagster ha integrazione nativa con dbt via dagster-dbt
@dbt_assets(
manifest=dbt_manifest_path, # manifest.json generato da dbt
select="tag:marts", # Solo i modelli gold
)
def dbt_marts_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""Esegue i modelli dbt del layer gold."""
yield from dbt.cli(["run", "--select", "tag:marts"], context=context).stream()
yield from dbt.cli(["test", "--select", "tag:marts"], context=context).stream()
# ============ JOB: COMPOSIZIONE ============
# In Dagster, un Job seleziona quali asset materializzare
from dagster import define_asset_job, ScheduleDefinition
pipeline_vendite_job = define_asset_job(
name="pipeline_vendite_giornaliera",
selection=[
"raw_ordini",
"raw_clienti",
"silver_ordini_arricchiti",
"dbt_marts_assets*", # Tutti gli asset dbt marts
],
)
pipeline_vendite_schedule = ScheduleDefinition(
job=pipeline_vendite_job,
cron_schedule="0 5 * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
このアプローチのすぐにわかる利点は、各アセットが明示的に宣言されているということです。
あなたの入力(AssetIn)、Dagster は自動的に次のグラフを構築します。
依存関係が表示され、UI にはソースから最終製品までの完全な系統が表示されます。テスト
すべての関数は純粋な Python 関数であるため、これは自明になります。
IO マネージャー: Dagster 統合の鍵
Dagster の IO マネージャーは、資産の調達方法を定義します 保存してロードした タスクの合間に。 Dagster には、S3 (Parquet、CSV)、Snowflake、BigQuery、DuckDB 用の IO Manager が含まれています。 パンダなどなど。必要に応じて、さまざまな環境に異なる IO マネージャーをセットアップできます。 資産コードを変更します。
# resources.py - Configurazione Resources e IO Managers
from dagster import EnvVar
from dagster_aws.s3 import S3PickleIOManager, S3Resource
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_snowflake_pandas import SnowflakePandasIOManager
# Configurazione multi-environment
resources_by_env = {
"dev": {
# In dev: salva su DuckDB locale (costo zero)
"io_manager": DuckDBPandasIOManager(
database="./dev_lakehouse.duckdb",
),
"postgres_erp": PostgresResource(
host="localhost",
port=5432,
database="erp_dev",
),
},
"staging": {
# In staging: usa S3 + Parquet
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-staging-lakehouse",
s3_prefix="dagster/",
),
},
"prod": {
# In prod: usa Snowflake
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
database="DWH_PROD",
schema="DAGSTER_ASSETS",
),
},
}
完璧 3: 開発者エクスペリエンスのための Python ファースト
2018 年に設立された Prefect は、当初から異なる哲学を受け入れてきました。 オーケストレーションは可能な限りシンプルかつ Python 的です。バージョン 3、2024 年にリリース 2025 年に成熟し、このビジョンが実現します: あらゆる Python 関数 デコレータを 1 人追加することで、調整されたタスクになる可能性があります。
Il フロー/タスクモデル Prefect では設定の大部分が不要になります Airflow と Dagster からのリクエスト。 DAG を明示的に定義する必要はなく、ファイルも必要ありません 個別の構成ファイルを使用するため、独自の DSL を学ぶ必要はありません。プレーンな Python を記述します。 そして残りは知事がやってくれます。
2025 年、Prefect 3 が導入 知事の事件 管理用 構造化されたサービス中断、メトリクスベースのトリガーによる自動化 (実行イベントだけでなく)、実行のためのモーダルとのネイティブ統合 スケーラブルなサーバーレス。 Prefect Cloud は、豊富な無料プランを提供しています。 小規模なチームでもアクセス可能。
コード例: 販売パイプラインのフロー担当者
# flows/pipeline_vendite.py
import pandas as pd
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret
from prefect.filesystems import S3
from prefect_slack import SlackWebhook
from datetime import timedelta, date
from typing import Optional
# ============ TASK (funzioni Python con decoratore) ============
@task(
name="Estrai Ordini ERP",
description="Estrae ordini completati per la data specificata",
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash, # Cache basata su input
cache_expiration=timedelta(hours=1), # Valida per 1 ora
tags=["estrazione", "erp"],
)
def estrai_ordini(data: date) -> pd.DataFrame:
"""Estrae ordini dall'ERP per la data specificata."""
logger = get_run_logger()
# Leggi credenziali da Prefect Blocks (gestione sicura secrets)
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
query = """
SELECT o.id AS ordine_id, o.customer_id,
o.created_at, o.status,
SUM(oi.qty * oi.unit_price) AS importo_totale
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE DATE(o.created_at) = :data
AND o.status IN ('completed', 'shipped')
GROUP BY o.id, o.customer_id, o.created_at, o.status
"""
with engine.connect() as conn:
df = pd.read_sql(query, conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} ordini per {data}")
return df
@task(
name="Estrai Clienti ERP",
retries=2,
retry_delay_seconds=30,
tags=["estrazione", "erp"],
)
def estrai_clienti(data: date) -> pd.DataFrame:
"""Estrae clienti aggiornati."""
logger = get_run_logger()
conn_string = Secret.load("erp-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df = pd.read_sql("""
SELECT id, email, nome, cognome, citta, segmento
FROM customers
WHERE DATE(updated_at) = :data
""", conn, params={"data": str(data)})
logger.info(f"Estratti {len(df)} clienti aggiornati")
return df
@task(
name="Valida e Arricchisci Ordini",
retries=1,
)
def valida_e_arricchisci(
ordini: pd.DataFrame,
clienti: pd.DataFrame,
) -> pd.DataFrame:
"""Valida e arricchisce gli ordini con dati cliente."""
logger = get_run_logger()
# Validazioni
if ordini["importo_totale"].lt(0).any():
raise ValueError("Trovati importi negativi nel dataset ordini")
if not ordini["ordine_id"].is_unique:
raise ValueError("Trovati ordini duplicati")
# Join
df = ordini.merge(
clienti[["id", "citta", "segmento"]],
left_on="customer_id",
right_on="id",
how="left",
)
match_rate = (1 - df["segmento"].isna().mean()) * 100
logger.info(f"Match rate clienti: {match_rate:.1f}%")
return df
@task(name="Carica nel DWH", retries=2)
def carica_dwh(df: pd.DataFrame, schema: str, table: str):
"""Carica il dataframe nel DWH."""
conn_string = Secret.load("dwh-db-url").get()
import sqlalchemy as sa
engine = sa.create_engine(conn_string)
with engine.connect() as conn:
df.to_sql(
name=table,
con=conn,
schema=schema,
if_exists="replace",
index=False,
)
return len(df)
@task(name="Esegui dbt", retries=1)
def esegui_dbt(target: str = "prod"):
"""Lancia dbt run e test sui modelli marts."""
import subprocess
result = subprocess.run(
["dbt", "run", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if result.returncode != 0:
raise RuntimeError(f"dbt run fallito:\n{result.stderr}")
test_result = subprocess.run(
["dbt", "test", "--select", "tag:marts", "--target", target],
capture_output=True, text=True, cwd="/opt/dbt/dwh_project"
)
if test_result.returncode != 0:
raise RuntimeError(f"dbt test fallito:\n{test_result.stderr}")
# ============ FLOW PRINCIPALE ============
@flow(
name="Pipeline Vendite Giornaliera",
description="Orchestrazione ETL vendite: ERP -> DWH -> dbt marts",
flow_run_name="vendite-{data}", # Nome run dinamico
retries=0, # Retry a livello task, non flow
timeout_seconds=7200, # 2 ore max
log_prints=True,
)
def pipeline_vendite(data: Optional[date] = None):
"""
Flow principale per la pipeline vendite.
Può essere lanciato manualmente con una data specifica
o schedulato per girare ogni giorno.
"""
logger = get_run_logger()
if data is None:
data = date.today()
logger.info(f"Avvio pipeline vendite per {data}")
# Estrazione parallela (Prefect gestisce la concorrenza automaticamente)
# submit() esegue il task in modo asincrono
future_ordini = estrai_ordini.submit(data)
future_clienti = estrai_clienti.submit(data)
# Attende entrambi i task e recupera i risultati
ordini = future_ordini.result()
clienti = future_clienti.result()
if len(ordini) == 0:
logger.warning(f"Nessun ordine per {data}, skip elaborazione")
return {"status": "skipped", "data": str(data)}
# Task sequenziali
df_arricchito = valida_e_arricchisci(ordini, clienti)
num_record = carica_dwh(df_arricchito, schema="silver", table="ordini_arricchiti")
esegui_dbt(target="prod")
logger.info(f"Pipeline completata: {num_record} record elaborati")
return {"status": "success", "records": num_record, "data": str(data)}
# ============ DEPLOYMENT ============
# Prefect 3: il deployment definisce schedule e infrastruttura
if __name__ == "__main__":
from prefect.deployments import DeploymentImage
pipeline_vendite.deploy(
name="prod-giornaliero",
cron="0 5 * * *",
work_pool_name="k8s-work-pool", # Esegui su Kubernetes
image=DeploymentImage(
name="myregistry/pipeline-vendite:latest",
platform="linux/amd64",
),
parameters={}, # Parametri default
)
Prefect Blocks: シークレットと設定の一元管理
I パーフェクトブロック これらは Prefect に保存されている構成可能なオブジェクトです。 接続とシークレットを一元管理するバックエンド (クラウドまたはセルフホスト) インフラストラクチャ構成。環境変数が不要になります コンテナ内に分散されており、コードを再デプロイすることなく更新できます。
エアフローとダグスターとプリフェクトの比較: どちらが優れていますか?
正直な答えは、「それは状況による」です。各ツールには理想的なユースケースとポイントがあります 具体的な強さ。ここでは、選択に役立つ構造化された比較を示します。
詳細な比較: エアフロー vs ダグスター vs プリフェクト (2025)
| 基準 | エアフロー 3.0 | ダグスター 1.9 | 知事3 |
|---|---|---|---|
| パラダイム | タスク中心 (DAG) | 資産中心 (SDA) | タスク中心 (Python ファースト) |
| 学習曲線 | 高 (DAG セマンティクス、プロバイダー、フック) | 中~高 (資産モデル、IO マネージャー) | Low (既存の関数のデコレータ) |
| データリネージュ | 限定的 (タスクの依存関係) | 優れた (ネイティブ、ビジュアル) | 限定(最近追加されました) |
| 開発者エクスペリエンス (ローカル) | まあまあ (Docker 構成が重い) | 優れた (dagster 開発、ローカル UI) | 優れています (pip インストール、ローカルで実行) |
| テスト | 複雑 (インフラに依存) | 優れている(資産も機能も) | 優れています(タスクと機能) |
| DBTの統合 | 良い (dbt クラウド オペレーター) | 優れた (dagster-dbt、ネイティブ アセット) | 良い (prefect-dbt) |
| スケジュール設定 | 優れた (cron、イベント、アセット、期限) | 優れた (cron、イベント、アセット、センサー) | 良好 (cron、イベント、自動化) |
| 埋め戻し | 優れています (3.0 でスケジューラー管理) | 優れた (ネイティブ パーティショニング) | ディスクリート (手動または自動) |
| スケーラビリティ | 高 (CeleryExecutor、KubernetesExecutor) | 高 (Kubernetes、ECS、Docker) | 高 (作業プール、K8、モーダル) |
| UIモニタリング | 優れています (3.0 の React UI) | 優れた(カタログ、系統図) | 優れています (Prefect Cloud ダッシュボード) |
| 導入モデル | 複合体 (ヘルム チャート、構成) | 中 (dagster-webserver、k8s) | シンプル (.deploy()、ワークプール) |
| コミュニティ/エコシステム | 巨大 (35,000 以上の GitHub スター、数百のプロバイダー) | 昇順 (10,000 つ星以上、エンタープライズ重視) | 大規模 (15,000 つ星以上、開発者重視) |
| マネージドクラウド | 天文学者、MWAA、クラウド コンポーザー | Dagster クラウド (サーバーレス + ハイブリッド) | Prefect Cloud (寛大な無料枠) |
| 自己ホスト型のコスト | オープンソース (管理するインフラ) | オープンソース (管理するインフラ) | オープンソース (管理するインフラ) |
| 管理コスト | 天文学者は月額 500 ドルから | Dagster クラウドは月額 500 ドルから | Prefect Cloud の寛大な無料利用枠 |
| に最適 | Airflow の経験、従来の ETL パイプラインを備えたチーム | データ駆動型チーム、dbt 統合、ML パイプライン | チーム Python、ラピッド プロトタイピング、イベント駆動型 |
クイック選択ガイド
- 次の場合はエアフローを選択してください。 あなたのチームはすでに Airflow スキルを持っています。 複雑なエコシステム (Spark、Hadoop、多くのプロバイダー) を操作する場合、次のことが必要です。 利用可能なオペレーターの巨大なエコシステムのうち、AWS (MWAA) または GCP を使用している場合 (Cloud Composer) をネイティブに管理します。
- 次の場合は Dagster を選択してください。 データの品質とリネージが優先事項であるため、 スタックに dbt が含まれており、緊密な統合が必要な場合、チームはそれに慣れています。 タスクではなくアセットで考えるか、好きな場所に ML パイプラインを構築します それぞれの成果物をトレースします。
- 次の場合は「知事」を選択してください。 学習曲線を最小限に抑えたい場合は、 チームと Python ファーストでオーケストレーションの経験がない場合は、ワークフローが必要です イベント駆動型で運用可能 (データだけでなく)、またはすぐに使い始めたい場合 Prefect Cloud の無料枠。
Temporal: 複雑なワークフローの永続的な実行
Airflow、Dagster、Prefect は、 日付 パイプライン、 時間的 別の問題、つまり実行を解決します 耐久性のある 存続する必要がある長時間実行アプリケーションのワークフロー クラッシュ、再起動、ネットワーク中断など。
元 Uber エンジニア (その前身である Cadence を作成した人) によって開発されました。 Temporal は各ワークフローを 1 つとして扱います 耐久性のあるステートフル関数: 場合 ワークフローの実行中にサーバーが再起動され、正確な場所から再開されます。 状態を失うことなく停止していました。これは数時間続くワークフローには不可欠です。 数日または数週間 (例: オンボーディング プロセス、AI エージェントのオーケストレーション、ストーリー 分散トランザクションのパターン)。
テンポラル vs エアフロー/ダグスター: いつ選択するか
| シナリオ | 推奨ツール | 理由 |
|---|---|---|
| 毎日の ETL パイプライン | エアフロー / ダグスター / プリフェクト | バッチデータオーケストレーション用に最適化 |
| DBT変換 | ダグスター (改良版) / エアフロー | ネイティブ統合、アセットリネージ |
| ML トレーニング パイプライン | ダグスター / 知事 | アーティファクトの追跡、テストの促進 |
| ユーザーのオンボーディング プロセス (複数のステップ) | 時間的 | 耐久性、アプリケーションの状態管理 |
| AI / LLM エージェントのオーケストレーション | 時間的/知事 | 長期にわたる複雑な障害管理 |
| Saga パターン / 分散トランザクション | 時間的 | 補償トランザクション、耐久性 |
| リアルタイム IoT パイプライン | Kafka + Flink (非バッチ オーケストレーター) | ストリーミングには別のアーキテクチャが必要です |
多くの企業は、オーケストレーションに Airflow または Dagster というハイブリッド アプローチを採用しています。 データ パイプライン、耐久性を必要とするアプリケーション ワークフロー向けの Temporal。 2つのツール それらは互いに完全に補完し合い、直接競合するものではありません。
リファレンス アーキテクチャ: 最新のデータ スタックのオーケストレーション
オーケストレーターは残りのデータ スタックとどのように統合されますか?アーキテクチャを見てみよう このシリーズで紹介したすべてのコンポーネントを組み合わせた完全なリファレンス: 取り込みには Airbyte、変換には dbt、テーブル形式として Apache Iceberg、 そして中央のオーケストレーターとしてDagster。
オーケストレーターとして Dagster を使用したデータ スタック アーキテクチャ
| レイヤー | 成分 | ツール | ダグスターによるオーケストレーション? |
|---|---|---|---|
| 摂取 | ソース同期 | エアバイト / ファイブトラン | はい (dagster-airbyte) |
| ストレージ | オブジェクトストレージ + テーブル形式 | S3 + アパッチ アイスバーグ | はい (IO マネージャー) |
| 変換 | 宣言型 SQL モデル | dbtコア/クラウド | はい (dagster-dbt、ネイティブ アセット) |
| 品質 | データのテストと検証 | dbt テスト、大きな期待 | はい (資産チェック) |
| 給仕 | クエリエンジン | トリノ / アテナ / ダックDB | いいえ (オンデマンド クエリ) |
| 分析 | ダッシュボードとレポート | メタベース / タブロー | いいえ(データを消費します) |
| ML | 特徴量エンジニアリング + トレーニング | Python + MLflow | はい (ML アセット) |
# definitions.py - Dagster Definitions: composizione del data stack completo
from dagster import Definitions, load_assets_from_modules
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
from dagster_dbt import DbtCliResource
from dagster_aws.s3 import S3Resource
# Import dei moduli asset
from assets import ingestione, silver, gold, ml_features
# ============ CARICAMENTO ASSET ============
# Asset Airbyte: ogni connessione Airbyte diventa un Dagster asset
airbyte_assets = load_assets_from_airbyte_instance(
airbyte=AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
connection_filter=lambda meta: meta.name.startswith("prod_"),
)
# Asset dbt: ogni modello dbt diventa un Dagster asset con lineage completa
@dbt_assets(
manifest=Path("dbt_project/target/manifest.json"),
)
def dbt_all_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Asset Python custom
python_assets = load_assets_from_modules([ingestione, silver, gold, ml_features])
# ============ RISORSE ============
resources_prod = {
"airbyte": AirbyteResource(
host="http://airbyte-server:8001",
username="admin",
password=EnvVar("AIRBYTE_PASSWORD"),
),
"dbt": DbtCliResource(
project_dir="dbt_project",
profiles_dir="dbt_project",
target="prod",
),
"s3": S3Resource(region_name="eu-west-1"),
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
"slack": SlackResource(token=EnvVar("SLACK_BOT_TOKEN")),
}
# ============ COMPOSIZIONE FINALE ============
defs = Definitions(
assets=[
*airbyte_assets,
dbt_all_assets,
*python_assets,
],
resources=resources_prod,
jobs=[pipeline_vendite_job, pipeline_clienti_job, pipeline_ml_job],
schedules=[pipeline_vendite_schedule, pipeline_ml_weekly_schedule],
sensors=[nuovo_file_s3_sensor, airbyte_sync_sensor],
)
監視、アラート、可観測性
モニタリングのないオーケストレーターは、搭載機器のない飛行機のようなものです。モニタリング パイプラインの数を増やし、SLA の確保、問題の診断、コミュニケーションに不可欠です ビジネスのデータ プラットフォームのステータス。
推奨される監視スタック
オーケストレーションされたパイプラインの監視ツール
| レイヤー | ツール | 監視対象 |
|---|---|---|
| インフラストラクチャのメトリクス | プロメテウス + グラファナ | CPU/RAM ワーカー、タスク キュー、スケジューラー レイテンシー |
| パイプラインのメトリクス | エアフローメトリクス/Dagsterイベント/Prefect API | タスクの成功率、期間、SLA 違反 |
| 一元化されたログ | ELK スタック / CloudWatch / Loki | デバッグ用の各タスクインスタンスのログ |
| 警告 | PagerDuty / OpsGenie | 重大な障害、SLA 違反に対するエスカレーション |
| チーム通知 | スラック / Microsoft Teams | 成功/失敗通知、日報 |
| データ品質 | 初級 / DBT テスト / 大きな期待 | データの異常、鮮度、行数 |
# monitoring/alerting_setup.py
# Configurazione alerting Airflow con Slack e PagerDuty
from airflow.hooks.base import BaseHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime
def task_failure_alert(context):
"""
Callback chiamato su ogni task failure.
Inviamo notifica Slack con dettagli del fallimento.
"""
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
run_id = context["run_id"]
execution_date = context["execution_date"]
log_url = context["task_instance"].log_url
exception = context.get("exception", "N/A")
message = f"""
:x: *Task Fallito*
*DAG:* {dag_id}
*Task:* {task_id}
*Run ID:* {run_id}
*Data:* {execution_date.strftime('%Y-%m-%d %H:%M')}
*Errore:* `{str(exception)[:200]}`
*Log:* <{log_url}|Vedi Log Airflow>
"""
slack_op = SlackWebhookOperator(
task_id="slack_alert",
slack_webhook_conn_id="slack_data_ops",
message=message,
channel="#data-ops-alerts",
dag=context["dag"],
)
slack_op.execute(context)
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Callback per SLA breach: invia alert critico su PagerDuty."""
import requests
sla_tasks = ", ".join([f"{s.task_id}" for s in slas])
message = f"SLA BREACH su DAG {dag.dag_id}: task {sla_tasks} non completati in tempo"
# PagerDuty Events API
payload = {
"routing_key": "YOUR_PAGERDUTY_INTEGRATION_KEY",
"event_action": "trigger",
"payload": {
"summary": message,
"severity": "critical",
"source": "airflow",
"component": dag.dag_id,
"custom_details": {
"sla_tasks": sla_tasks,
"dag_id": dag.dag_id,
},
},
}
requests.post("https://events.pagerduty.com/v2/enqueue", json=payload)
# Utilizzo nel DAG
with DAG(
dag_id="pipeline_critica",
default_args={
"on_failure_callback": task_failure_alert, # Per ogni task
},
sla_miss_callback=sla_miss_callback, # Per SLA violations
schedule="0 5 * * *",
# ...
) as dag:
task_etl = PythonOperator(
task_id="etl_principale",
python_callable=esegui_etl,
sla=timedelta(hours=1), # Deve completare entro 1 ora
)
パイプラインメトリクス用の Grafana ダッシュボード
Airflow は、StatsD 経由 (または airflow-prometheus-exporter を使用して直接 Prometheus) でメトリクスを公開します。 Grafana ダッシュボードで監視する最も重要な指標は次のとおりです。
Grafana で監視するエアフロー メトリクス
- airflow.scheduler.tasks.starving: キューに入れられたタスクに使用可能なワーカーがありません (スケーラビリティが不十分であることを示します)
- airflow.dagrun.duration.success: DAG 実行時間の分布 (SLA 指標としての P95 パーセンタイル)
- airflow.task_instance.failures: タスク別の障害の傾向 (異常はソースの問題を示します)
- airflow.scheduler.heartbeat: スケジューラーのハートビート頻度 (停止した場合、タスクはスケジュールされません)
- airflow.pool.open_slots: プール内の空きスロット (飽和状態を示します)
- airflow.dagrun.first_task_scheduling_delay: スケジュールから最初のタスクが開始されるまでの遅延
本番環境での堅牢なパイプラインのベスト プラクティス
1. 冪等性: 黄金律
すべてのタスクは次のようにする必要があります 冪等: 同じものを数回実行します パラメータは同じ結果を生成する必要があります。このプロパティは基本的なものであるため、 実稼働環境では再試行 (自動または手動) が避けられません。
# SBAGLIATO: non idempotente (doppio insert)
def carica_dati_wrong(**context):
df = estrai_dati()
pg_hook.insert_rows("staging.ordini", df.values.tolist()) # APPEND!
# CORRETTO: idempotente (DELETE + INSERT per la data specifica)
def carica_dati_correct(**context):
execution_date = context["ds"] # "2025-01-15"
df = estrai_dati(execution_date)
with pg_hook.get_conn() as conn:
with conn.cursor() as cur:
# 1. Elimina i dati esistenti per questa partizione
cur.execute(
"DELETE FROM staging.ordini WHERE DATE(data_ordine) = %s",
[execution_date]
)
# 2. Inserisci i nuovi dati
execute_values(cur, """
INSERT INTO staging.ordini (ordine_id, data_ordine, importo)
VALUES %s
""", df[["ordine_id", "data_ordine", "importo"]].values.tolist())
conn.commit()
# OPPURE: usa INSERT ... ON CONFLICT per UPSERT atomico
def carica_dati_upsert(**context):
execution_date = context["ds"]
df = estrai_dati(execution_date)
pg_hook.insert_rows(
table="staging.ordini",
rows=df.values.tolist(),
target_fields=["ordine_id", "data_ordine", "importo"],
replace=True,
replace_index=["ordine_id"], # PK: upsert su ordine_id
)
2. 再試行ポリシーと指数バックオフ
常に適切な再試行ポリシーを設定してください。一時的な障害 (ネットワークのタイムアウト、 データベースのロック、サービスが一時的に利用できないなど)は、多くの場合、次の方法で解決されます。 数分後に再試行してください。指数関数的バックオフにより、既存システムの過負荷を回避します 困難に陥っています。
# Retry policy consigliata per diversi tipi di task
from datetime import timedelta
# Task di estrazione da API esterne: retry aggressivi con backoff
default_args_api = {
"retries": 5,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True, # 1, 2, 4, 8, 16 min
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(minutes=30),
}
# Task di trasformazione dbt: retry conservativi
default_args_dbt = {
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# Task di notifica: no retry (evita spam di alert)
default_args_notify = {
"retries": 0,
"execution_timeout": timedelta(minutes=2),
}
3. センサーによる依存関係管理
固定時刻でパイプラインをスケジュールできるとは限りません。待たなければならないこともあります ファイルが S3 に到着するか、別のパイプラインが終了するか、または外部データが 利用可能です。ザ センサーオペレーター Airflow (および Dagster) が解決策です。
# Sensor: attendi che il file di input sia disponibile prima di procedere
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
with DAG("pipeline_dipendente", ...) as dag:
# Attendi file S3 (polling ogni 5 minuti, max 2 ore)
wait_for_file = S3KeySensor(
task_id="wait_for_input_file",
bucket_name="my-data-lake",
bucket_key="inputs/vendite_{{ ds_nodash }}.csv",
aws_conn_id="aws_default",
poke_interval=300, # Controlla ogni 5 min
timeout=7200, # Timeout dopo 2 ore
mode="reschedule", # Libera il worker mentre aspetta
)
# Attendi che un'altra DAG abbia finito
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_erp_sync",
external_dag_id="sincronizzazione_erp",
external_task_id="verify_sync_complete",
allowed_states=["success"],
failed_states=["failed", "skipped"],
poke_interval=120,
timeout=3600,
mode="reschedule",
)
estrai = PythonOperator(task_id="estrai_dati", ...)
# Pipeline partira solo quando entrambi i sensor sono verdi
[wait_for_file, wait_for_upstream] >> estrai
アンチパターン: 避けるべきよくある間違い
- 変更可能なグローバル状態を持つタスク: Python グローバル変数を使用しないでください DAG 内で。 DAG はスケジューラによって頻繁に解析されます。グローバル変数 予測できない動作を引き起こします。 XCom、変数、またはタスクフロー API を使用します。
- DAG ファイルのビジネス ロジック: DAG ファイルには、 オーケストレーションの定義。ビジネス ロジックは別の Python モジュールに組み込まれます。 タスクからインポートされました。テストが容易になり、解析時間が短縮されます。
- 大規模なデータセット用の XCom: XCom は小規模なメタデータ用に設計されています (カウント、ファイルパス、フラグ)。タスク間で DataFrame を渡すためにこれを使用しないでください。 ステージング テーブルまたは S3 ファイル。
- 粒度のないモノリシック パイプライン: すべてを行う単一のタスク (抽出 + 変換 + ロード) ではデバッグが不可能になります。あらゆる操作 論理的に異なるものは別個のタスクである必要があります。
-
厳しいスケジュールの場合、Catchup=True: キャッチアップを有効にすると、
時間ごとのスケジュールを持つパイプラインを作成し、30 日間の休憩後にオンにすると、720 が起動されます。
同時に処刑される。常に使用する
catchup=Falseデフォルトでは バックフィルを明示的に管理します。 - ハードコードされたシークレット: パスワード、API キー、または接続を入力しないでください 文字列をコードに直接入力します。エアフロー接続、Dagster リソース、または Prefect Blocks は AWS Secrets Manager または HashiCorp Vault と統合されています。
4. パイプラインのテスト
未テストのパイプラインと確保された技術的負債。テストを構成する方法は次のとおりです Airflow DAG と Dagster アセットの場合。
# tests/test_pipeline_vendite.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import date
import pandas as pd
# ============ TEST AIRFLOW ============
from airflow.models import DagBag
def test_dag_integrity():
"""Verifica che i DAG siano validi e non abbiano cicli."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dagbag.import_errors) == 0, \
f"Errori di import: {dagbag.import_errors}"
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert dag is not None, "DAG non trovato"
assert dag.schedule == "0 5 * * *"
def test_dag_task_count():
"""Verifica il numero di task nel DAG."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("pipeline_vendite_giornaliera")
assert len(dag.tasks) >= 5, "Troppo pochi task nel DAG"
# ============ TEST DAGSTER ============
from dagster import materialize, build_asset_context
from assets import raw_ordini, silver_ordini_arricchiti
def test_raw_ordini_asset():
"""Test dell'asset raw_ordini con mock delle risorse."""
mock_df = pd.DataFrame({
"ordine_id": [1, 2, 3],
"customer_id": [101, 102, 103],
"importo_totale": [150.0, 89.99, 220.5],
"data_ordine": ["2025-01-15"] * 3,
})
with patch("assets.ingestione.PostgresResource") as mock_pg:
mock_pg.return_value.execute_query.return_value = mock_df
result = materialize(
[raw_ordini],
resources={"postgres_erp": mock_pg.return_value},
partition_key="2025-01-15",
)
assert result.success
asset_value = result.output_for_node("raw_ordini")
assert len(asset_value) == 3
assert (asset_value["importo_totale"] >= 0).all()
# ============ TEST PREFECT ============
from flows.pipeline_vendite import estrai_ordini, valida_e_arricchisci
from prefect.testing.utilities import prefect_test_harness
def test_valida_e_arricchisci():
"""Test del task di validazione con dati di test."""
ordini = pd.DataFrame({
"ordine_id": [1, 2],
"customer_id": [101, 102],
"importo_totale": [100.0, 200.0],
})
clienti = pd.DataFrame({
"id": [101, 102],
"citta": ["Milano", "Roma"],
"segmento": ["Premium", "Standard"],
})
with prefect_test_harness():
result = valida_e_arricchisci.fn(ordini, clienti)
assert len(result) == 2
assert "segmento" in result.columns
assert result["segmento"].isna().sum() == 0
def test_valida_importi_negativi():
"""Test che importi negativi generino eccezione."""
ordini = pd.DataFrame({
"ordine_id": [1],
"importo_totale": [-50.0], # NEGATIVO
"customer_id": [101],
})
clienti = pd.DataFrame({"id": [101], "citta": ["Milano"], "segmento": ["Standard"]})
with prefect_test_harness():
with pytest.raises(ValueError, match="Trovati importi negativi"):
valida_e_arricchisci.fn(ordini, clienti)
結論と推奨事項
成熟したデータ スタックのオーケストレーションとバックボーン。オーケストレーターなし 信頼性が高くなると、最も洗練されたパイプラインでさえもろくなり、不透明になり、取り扱いが困難になります。 管理する。私たちは 2025 年の市場の 3 つの主要な手段を調査し、どのように変化するかを見てきました。 それぞれが異なるニーズに応えます。
シナリオの推奨事項
- これから始まるPMI: あなたが選択します 知事3 プリフェクトクラウドを使って 無料枠。価値を評価するまでの時間が最小限で、学習曲線が短く、合格できる 複雑さが必要な場合には、より多くのエンタープライズ ソリューションを利用できます。
- dbt スタックを使用したチーム: ダグスター そして自然な選択。 dagster-dbt を介した dbt とのネイティブ統合およびモデル間の自動リネージ dbt とasset Dagster は、チームの認知負荷を大幅に軽減します。
- 既存の Airflow スキルを持つエンタープライズ: エアフロー 3.0 Astronomer (管理対象) または MWAA で。チームがすでに生産性を高めている場合、移行する理由はありません バージョン 3.0 では、Dagster や Prefect との差の多くが埋められました。
- 長時間実行されるアプリケーションのワークフロー: 考慮する 時間的 お気に入りのデータ オーケストレーターと組み合わせて使用できます。これらは補完的なツールであり、 代替品ではありません。
シリーズの次の記事では、以下について詳しく説明します製造業への AI の適用: IoT センサー、Apache Kafka を使用して予知保全システムを構築する方法を見ていきます。 マシンデータのストリーミングと、障害が発生する前に予測するための ML モデル。 私たちが構築しているデータスタックがどのようにして 企業における人工知能の基礎。
オーケストレーターを本番環境に導入するためのチェックリスト
- すべてのタスクはべき等です。タスクを再実行しても重複は生成されません。
- 各タスクの指数バックオフを使用して構成された再試行ポリシー
- シークレットはハードコーディングされず、接続/ブロック/リソース経由で管理されます
- Grafana または同等のインフラストラクチャ メトリックを使用したアクティブなモニタリング
- 障害や SLA 違反について Slack または PagerDuty でアラートを送信する
- CI/CD パイプライン内の DAG/アセットの整合性テスト
- キャッチアップはデフォルトで無効になっており、バックフィルは明示的に処理されます
- ビジネス ロジックをパイプライン定義ファイルから分離する
- 作業者が直接アクセスせずに一元化されたログにアクセス可能
- インラインドキュメント: 各 DAG/アセットには明確な説明があります
詳細については役立つリンク
- 前の記事: ETL 対モダン ELT: dbt、Airbyte、Fivetran
- 次の記事: 製造における AI: 予知保全とデジタル ツイン
- 関連シリーズ: ビジネス向け MLOps - MLflow を使用した本番環境での AI モデル
- 関連シリーズ: ビジネスにおける LLM: RAG Enterprise、微調整、ガードレール







