食品サプライチェーン: 農場から小売業者までの ETL パターン
毎年、およそ 人間の消費のために生産された食料の 3 分の 1 が失われるか廃棄されています 食品サプライチェーンに沿った総額:米国だけで4,730億ドルを超え、金額に換算すると 世界中で毎日10億食が廃棄されています。逆説的ですが、大部分の この無駄は気候や生物学的要因ではなく、可視性というデータの問題に依存しています。 不十分、レガシー システムが統合されていない、KPI の計算が遅く、情報に基づいて意思決定が行われる 不完全または不正確。
の世界市場 食品サプライチェーン管理 2025 年には 1,828 億 1,000 万ドルの価値がある 2034 年までに 3,593 億 9,000 万に増加すると予想されます (CAGR 7.8%)。しかし、食品供給業者の 48% は依然として 日常業務にはスプレッドシートを使用し、60% は消耗品を消費する反復的な手動タスクを報告しています。 貴重な時間。テクノロジーが可能にするものと農業食品企業が実現できるものとの間のギャップ 実際に実装すると巨大になります。
FoodTech シリーズの最新記事であるこの記事は、根本的な問題、つまり、 堅牢でスケーラブルな ETL/ELT パイプライン 農場から小売店までの異種データを統合するため、 Apache Airflow でフローを調整し、dbt でデータを変換し、Great で品質を確保します 食品セクターに固有のサプライ チェーン KPI を使用して期待し、パフォーマンスを測定します。
何を学ぶか
- 食品サプライチェーン(農場→加工→流通→小売)のエンドツーエンドのデータアーキテクチャ
- 異種データソース: ERP (SAP/Oracle)、MES、LIMS、WMS、TMS、POS、IoT、EDI
- ETL と ELT: 食品データに対してどちらのアプローチを選択するか
- エラー処理と再試行を備えた食品パイプライン用の完全な Apache Airflow DAG
- サプライチェーン KPI の DBT モデル: 廃棄率、OTIF、充填率、供給日数
- コールドチェーンと有効期限に大きな期待がかかるデータ品質
- レガシー システム統合: SAP RFC/BAPI、EDI EDIFACT、CDC と Debezium
- 事例: 農業協同組合 500 の農場、3 つの加工センター、200 の販売拠点
- FoodTechシリーズ全体のまとめ(全10記事)
FoodTech シリーズ: 私たちの居場所
これはシリーズの 10 番目で最後のエピソードです フードテック、主なものを調査しました 食品サプライチェーン全体に応用できるデジタル技術:現場でのデータ収集から 小売。全体像は次のとおりです。
| # | タイトル | テクノロジー | レベル |
|---|---|---|---|
| 00 | Python と MQTT を使用した精密農業用の IoT パイプライン | IoT、MQTT、Python、データレイク | 高度な |
| 01 | 作物の病気検出のための ML Edge: Raspberry Pi 上の TensorFlow Lite | TensorFlow Lite、Edge ML、Raspberry Pi | 高度な |
| 02 | アグリテック向けの衛星および天気 API: 予測データ | センチネル、プラネット、天気 API、NDVI | 高度な |
| 03 | 食品トレーサビリティシステム: ブロックチェーン、RFID、IoT | ブロックチェーン、RFID、IoT、コンプライアンス | 高度な |
| 04 | PyTorch YOLO を使用した食品品質管理のためのコンピューター ビジョン | YOLO、PyTorch、コンピューター ビジョン | 高度な |
| 05 | FSMA 204 自動化: Python による追跡、アラート、リコール | FSMA、コンプライアンス、Python、リコール | 高度な |
| 06 | 垂直農法の自動化: API を介したロボット制御 | ロボティクス、API、自動化 | 高度な |
| 07 | 廃棄物削減のための需要予測: ML 時系列 | LSTM、Prophet、時系列 ML | 高度な |
| 08 | Angular と Grafana を使用したファーム IoT のリアルタイム ダッシュボード | Angular、Grafana、InfluxDB、リアルタイム | 高度な |
| 09 | 食品サプライチェーン: 農場から小売業者までの ETL パターン <-- ここにいます | エアフロー、dbt、Great Expectations、ETL | 高度な |
この記事でシリーズは終了となるため、
以前の記事では、チェーン内の個々のノードについて説明しました: 現場での IoT データ収集、 ML エッジ モデル、ブロックチェーン トレーサビリティ、品質のためのコンピュータ ビジョン、FSMA 準拠、 垂直農業、需要予測、リアルタイム ダッシュボード。この記事では、それらすべてを結び付けます。 ETL パイプラインと各データ ソースを統合し、生の情報を変換するバックボーン 実行可能な KPI に変換し、農場からスーパーマーケットの棚までエンドツーエンドの可視性をもたらします。
食品サプライチェーンのデータアーキテクチャ
食品サプライチェーンは異なるノードで構成されており、それぞれが独自の情報システム、フォーマットを備えています。 データと更新頻度。 ETL パイプラインは、この異質性を堅牢に処理する必要があります。
# Architettura logica della supply chain alimentare
# (dati generati ad ogni step)
FARM SYSTEMS
|-- Sensori IoT (MQTT): temp suolo, umidita, pH, EC
|-- Sistema ERP agricolo: colture pianificate, costi input
|-- Registro trattamenti fitosanitari (LIMS)
|-- Geo-dati GPS macchinari agricoli
|-- API meteo (OpenMeteo, Copernicus)
|
v
PROCESSING PLANTS (Centri di Lavorazione)
|-- MES (Manufacturing Execution System): ordini produzione, rese
|-- LIMS: analisi chimiche, microbiologiche, allergeni
|-- ERP SAP/Oracle: distinta base, lotti, tracciabilita
|-- WMS (Warehouse Management): magazzino materie prime/finiti
|-- Sensori cold chain: temperatura/umidita durante lavorazione
|
v
DISTRIBUTION CENTERS
|-- WMS: gestione stock, picking, slotting
|-- TMS (Transport Management): spedizioni, veicoli, routing
|-- Sensori cold chain: logger temperatura nei camion frigoriferi
|-- EDI: ordini da retailer (EDIFACT ORDERS, DESADV)
|-- Tracker GPS: posizione veicoli in tempo reale
|
v
RETAIL POS
|-- POS data: vendite per SKU, store, ora
|-- Gestione shelf life: prodotti prossimi scadenza
|-- EDI: conferme ricezione (RECADV), fatture (INVOIC)
|-- Inventario store: stock disponibile per punto vendita
|
v
CONSUMER (segnali indiretti)
|-- App loyalty: acquisti per cliente, frequenza
|-- Reviews prodotto: qualità percepita
|-- Social sentiment: trend di consumo
各ノードは、異なる形式 (リレーショナル SQL、CSV/EDI フラット ファイル、REST API からの JSON、 ミリ秒からの範囲の頻度のバイナリ MQTT メッセージ、レガシー SFTP ファイル) (IoT センサー) 週次 (ERP レポート) まで。食品データ エンジニアの課題は、これらすべてを統合することです 単一の一貫した分析モデルに変換します。
異種データ ソース: 参照テーブル
パイプラインを設計する前に、ソースを調査することが不可欠です。主なものは次のとおりです 中程度の複雑さの食品サプライ チェーンの場合:
| システム | タイプ | 形式 | 一日あたりの量 | 頻度 | 許容可能な遅延 |
|---|---|---|---|---|---|
| SAP ERP (農場/工場) | ERP | RFC/BAPI、IDoc、SQL | 50,000 ~ 500,000 レコード | オーバーナイトバッチ/CDC | T+1h |
| オラクルEBS | ERP | SQL、REST API | 100K~1Mレコード | バッチ/CDC | T+1h |
| MES(生産) | MES | OPC-UA、SQL、REST | 100 万~1,000 万のイベント | ほぼリアルタイム(1分) | 5分 |
| LIMS(研究室) | リムズ | HL7、CSV、REST | 1K ~ 10K 分析 | 毎日 | T+4時間 |
| WMS(倉庫) | WMS | SQL、EDI、REST | 10,000 ~ 100,000 回の動き | 15分ごと | 15分 |
| TMS(輸送) | TMS | REST API、EDI | 5,000 ~ 50,000 個の出荷 | 5分ごと(GPS) | 5分 |
| 小売POS | POS | CSV、REST、SQL | 100 万~5,000 万のトランザクション | 毎時/夕方のバッチ | T+2時間 |
| IoTコールドチェーンセンサー | IoT | MQTT、JSON、プロトバッファ | 1億件以上の測定値 | 30秒~5分ごと | リアルタイム (<1 分) |
| EDIパートナー(EDIFACT) | EDI | EDIFACT、X12、GS1 XML | 100 ~ 10,000 メッセージ | イベント駆動型 / バッチ | T+30分 |
| 天気 API (OpenMeteo) | 外部API | JSON REST | 1,000 ~ 10,000 件の通話 | 毎時 | T+1h |
| AGEA(CAP補助金) | 公的機関 | XML、CSV SFTP | 1 ~ 100 ファイル | 毎月/季節ごと | T+24h |
| GPS車両追跡装置 | IoT/テレマティクス | REST、WebSocket | 1,000 万以上の GPS ポイント | 30秒ごと | 2分未満 |
注意: レガシー ERP と「抽出ウィンドウ」
多くの農業および農産物 ERP はネイティブ CDC をサポートしていません。多くの場合、抽出する唯一の方法 データ、テーブルをロックする夜間のバッチ ジョブ経由、または SAP RFC/BAPI インターフェイス経由 厳格なメンテナンスウィンドウ。抽出ウィンドウを事前に計画し、予測する 夜間のバッチ障害が発生した場合の回復メカニズム。
食品サプライチェーンにおけるETLとELTの比較
ETL と ELT の議論は、データが共存する食品分野に特に関係します。 高頻度 (IoT コールド チェーン)、従来のバッチ データ (SAP ERP) および規制要件を使用 厳格 (トレーサビリティ、FSMA 204)。
| 基準 | 従来のETL | 最新の ELT (dbt + データ レイク) |
|---|---|---|
| 変換 | アップロード前(ステージングサーバー) | データ ウェアハウス内 (ネイティブ SQL) |
| スケーラビリティ | ETLサーバーによる制限 | DWH によるスケール (Snowflake、BigQuery、DuckDB) |
| 生データの保持 | 多くの場合、そうではありません (データはすでに変換されています) | はい: ブロンズ層は生の状態を保存します |
| 監査可能性 (FSMA) | ハード: ぼやけた血統 | 素晴らしい: 完全な dbt リネージ グラフ |
| レイテンシ | バッチ (T+1h、T+1d) | マイクロバッチまたはストリーミング (Flink/Spark) |
| コールドチェーンエラー | ETL が失敗した場合のデータ損失 | Raw は常に保存されます。変換のみを再試行します |
| 必要なチームスキル | Java/Informatica/SSIS 開発者 | SQL + Python (dbt + エアフロー) |
| インフラストラクチャコスト | 常時稼働の専用サーバー | クエリごとの支払い(Snowflake/BigQuery) |
現代の食品サプライチェーンの推奨事項はアーキテクチャです メダリオンELT:
# Architettura Medallion per Food Supply Chain
BRONZE LAYER (Raw, immutabile)
|-- Tutti i dati sorgente caricati as-is
|-- Partitioned by: source_system, ingestion_date
|-- Retention: 7 anni (compliance FSMA/EU)
|-- Formato: Parquet su S3/GCS o Delta Lake
|
v
SILVER LAYER (Cleansed, deduplicato)
|-- Dbt models: normalizzazione schemi eterogenei
|-- Deduplicazione lotti, SKU, location codes
|-- Type casting: date, temperature (C/F), pesi (kg/lb)
|-- NULL handling: sensori offline, ERP null fields
|-- PII masking: dati operatori, conducenti
|
v
GOLD LAYER (KPI pronti per business)
|-- Aggregazioni: fill rate, waste rate, OTIF per SKU
|-- Metriche cold chain: violazioni temperatura per lotto
|-- Dashboard BI: Power BI, Tableau, Grafana
|-- API ML: feature store per demand forecasting
|
v
DATA MART
|-- Retail mart: sell-through, shelf life at receipt
|-- Operations mart: OEE impianti, rendimenti produzione
|-- Finance mart: costo per lotto, margine per SKU
|-- Compliance mart: tracciabilita FSMA, recall readiness
食品パイプラインオーケストレーションのための Apache Airflow
Apache Airflow は、複雑な ETL ワークフローを調整するための事実上の標準です。サプライチェーン内で 強力な場合、単一の DAG がレイテンシ、依存関係、ポリシーを使用して数十のソースを調整する必要があります さまざまな再試行を行います。毎日のパイプラインの完全な DAG を見てみましょう。
メイン DAG: 食品サプライ チェーンの日次 ETL
# food_supply_chain_dag.py
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.trigger_rule import TriggerRule
logger = logging.getLogger(__name__)
DEFAULT_ARGS = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
with DAG(
dag_id="food_supply_chain_daily_etl",
description="Pipeline ETL giornaliera: farm -> processing -> distribution -> retail",
default_args=DEFAULT_ARGS,
schedule_interval="0 2 * * *", # ogni notte alle 02:00 UTC
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["foodtech", "supply-chain", "etl", "production"],
max_active_runs=1,
doc_md="""
## Food Supply Chain Daily ETL
Estrae dati da: SAP ERP, MES impianti, WMS, TMS, POS retail.
Carica su Bronze S3, poi lancia dbt per Silver e Gold.
""",
) as dag:
# ----------------------------------------------------------------
# STEP 1: Health checks sulle sorgenti
# ----------------------------------------------------------------
check_sap_available = HttpSensor(
task_id="check_sap_api_available",
http_conn_id="sap_erp_api",
endpoint="/api/health",
timeout=30,
poke_interval=60,
mode="reschedule",
soft_fail=False,
)
check_mes_available = HttpSensor(
task_id="check_mes_api_available",
http_conn_id="mes_plant_api",
endpoint="/health",
timeout=30,
poke_interval=60,
mode="reschedule",
)
# ----------------------------------------------------------------
# STEP 2: Estrazione SAP ERP (lotti, ordini, inventario)
# ----------------------------------------------------------------
@task(task_id="extract_sap_erp", retries=3)
def extract_sap_erp(**context: Any) -> dict:
"""Estrae dati da SAP ERP via RFC/BAPI wrapper REST."""
from src.connectors.sap_connector import SAPConnector
execution_date = context["ds"] # 'YYYY-MM-DD'
sap = SAPConnector(conn_id="sap_erp_api")
# Estrai: lotti produzione del giorno precedente
batches = sap.get_production_batches(
date=execution_date,
plants=["PL001", "PL002", "PL003"],
)
# Estrai: movimenti magazzino
stock_movements = sap.get_stock_movements(
date=execution_date,
movement_types=["101", "261", "501", "601"],
)
# Estrai: ordini di vendita confermati
sales_orders = sap.get_sales_orders(
date=execution_date,
status=["CONF", "DLVR"],
)
# Salva su S3 Bronze layer
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/sap_erp/dt={execution_date}"
s3.load_string(
string_data=batches.to_json(orient="records", lines=True),
key=f"{prefix}/batches.jsonl",
bucket_name="food-data-lake",
replace=True,
)
s3.load_string(
string_data=stock_movements.to_json(orient="records", lines=True),
key=f"{prefix}/stock_movements.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"SAP extraction OK: %d batches, %d movements, %d orders",
len(batches),
len(stock_movements),
len(sales_orders),
)
return {
"batches_count": len(batches),
"movements_count": len(stock_movements),
"orders_count": len(sales_orders),
}
# ----------------------------------------------------------------
# STEP 3: Estrazione dati cold chain (IoT sensori temperatura)
# ----------------------------------------------------------------
@task(task_id="extract_cold_chain_iot")
def extract_cold_chain_iot(**context: Any) -> dict:
"""Estrae letture temperatura da InfluxDB (IoT cold chain)."""
from influxdb_client import InfluxDBClient
execution_date = context["ds"]
client = InfluxDBClient(
url="{{ var.value.influxdb_url }}",
token="{{ var.value.influxdb_token }}",
org="foodtech-org",
)
query_api = client.query_api()
# Query Flux: temperature readings per lotto di trasporto
flux_query = f"""
from(bucket: "cold-chain")
|> range(start: {execution_date}T00:00:00Z,
stop: {execution_date}T23:59:59Z)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "celsius")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> pivot(rowKey: ["_time", "lot_id"], columnKey: ["_field"], valueColumn: "_value")
"""
result = query_api.query_data_frame(flux_query)
# Calcola violazioni: temperatura fuori range per categoria prodotto
violations = result[
(result["celsius"] < result["min_temp_required"]) |
(result["celsius"] > result["max_temp_required"])
]
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/cold_chain/dt={execution_date}"
s3.load_string(
string_data=result.to_json(orient="records", lines=True),
key=f"{prefix}/temperature_readings.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"Cold chain extraction OK: %d readings, %d violations",
len(result),
len(violations),
)
if len(violations) > 0:
logger.warning(
"COLD CHAIN VIOLATIONS: %d eventi fuori temperatura!",
len(violations),
)
return {
"readings_count": len(result),
"violations_count": len(violations),
}
# ----------------------------------------------------------------
# STEP 4: Estrazione EDI dai partner retailer
# ----------------------------------------------------------------
@task(task_id="extract_edi_partners")
def extract_edi_partners(**context: Any) -> dict:
"""Processa file EDI EDIFACT da SFTP partners."""
from src.connectors.edi_connector import EDIFACTConnector
execution_date = context["ds"]
edi = EDIFACTConnector(sftp_conn_id="edi_sftp_partners")
# Download e parsing ORDERS (EDIFACT ORDERS D.01B)
orders = edi.process_orders(date=execution_date)
# Download e parsing DESADV (Despatch Advice)
despatch_advices = edi.process_desadv(date=execution_date)
# Download e parsing RECADV (Receiving Advice)
receiving_advices = edi.process_recadv(date=execution_date)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/edi_partners/dt={execution_date}"
s3.load_string(
string_data=orders.to_json(orient="records", lines=True),
key=f"{prefix}/orders.jsonl",
bucket_name="food-data-lake",
replace=True,
)
return {
"orders_count": len(orders),
"desadv_count": len(despatch_advices),
"recadv_count": len(receiving_advices),
}
# ----------------------------------------------------------------
# STEP 5: Estrazione POS retail
# ----------------------------------------------------------------
@task(task_id="extract_pos_data")
def extract_pos_data(**context: Any) -> dict:
"""Estrae dati vendite POS da database retailer partner."""
from src.connectors.retail_connector import RetailPOSConnector
execution_date = context["ds"]
pos = RetailPOSConnector(conn_id="retail_pos_db")
# Vendite per SKU/store con shelf life info
sales = pos.get_daily_sales(
date=execution_date,
include_promotions=True,
include_markdowns=True,
)
# Near-expiry: prodotti a <3 giorni scadenza ancora in shelf
near_expiry = pos.get_near_expiry_stock(
date=execution_date,
days_threshold=3,
)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/pos_retail/dt={execution_date}"
s3.load_string(
string_data=sales.to_json(orient="records", lines=True),
key=f"{prefix}/sales.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info("POS extraction OK: %d transazioni", len(sales))
return {
"sales_count": len(sales),
"near_expiry_skus": len(near_expiry),
}
# ----------------------------------------------------------------
# STEP 6: dbt run (Silver + Gold transformations)
# ----------------------------------------------------------------
dbt_run_silver = BashOperator(
task_id="dbt_run_silver_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:silver --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
dbt_run_gold = BashOperator(
task_id="dbt_run_gold_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:gold --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
# ----------------------------------------------------------------
# STEP 7: dbt test (data quality checks)
# ----------------------------------------------------------------
dbt_test = BashOperator(
task_id="dbt_test_all",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt test --select tag:silver tag:gold --target prod"
),
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# ----------------------------------------------------------------
# STEP 8: Great Expectations validation
# ----------------------------------------------------------------
@task(task_id="run_great_expectations_validation")
def run_ge_validation(**context: Any) -> dict:
"""Lancia Great Expectations per validare data quality cold chain."""
from great_expectations.data_context import get_context
context_ge = get_context(project_root_dir="/opt/great_expectations")
execution_date = context["ds"]
results = {}
suites_to_run = [
("cold_chain_temperature", "bronze_cold_chain_checkpoint"),
("lot_traceability", "silver_lots_checkpoint"),
("kpi_supply_chain", "gold_kpi_checkpoint"),
]
for suite_name, checkpoint_name in suites_to_run:
result = context_ge.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request={
"runtime_parameters": {"path": f"s3://food-data-lake/bronze/cold_chain/dt={execution_date}"},
"batch_identifiers": {"execution_date": execution_date},
},
)
results[suite_name] = result.success
failed = [name for name, ok in results.items() if not ok]
if failed:
raise ValueError(f"GE validation FAILED per suite: {failed}")
logger.info("Great Expectations: tutte le suite OK")
return results
# ----------------------------------------------------------------
# STEP 9: KPI alert (email se waste rate > soglia)
# ----------------------------------------------------------------
@task(task_id="check_kpi_alerts", trigger_rule=TriggerRule.ALL_SUCCESS)
def check_kpi_alerts(**context: Any) -> None:
"""Controlla KPI critici e invia alert se fuori soglia."""
from src.services.kpi_alert_service import KPIAlertService
execution_date = context["ds"]
alert_service = KPIAlertService(conn_id="snowflake_dw")
kpis = alert_service.get_daily_kpis(date=execution_date)
alerts_sent = []
# Waste rate > 5%: alert critico
if kpis["waste_rate_pct"] > 5.0:
alert_service.send_alert(
severity="HIGH",
message=f"Waste rate {kpis['waste_rate_pct']:.1f}% supera soglia 5%",
kpi="waste_rate",
value=kpis["waste_rate_pct"],
)
alerts_sent.append("waste_rate")
# OTIF < 95%: alert warning
if kpis["otif_pct"] < 95.0:
alert_service.send_alert(
severity="MEDIUM",
message=f"OTIF {kpis['otif_pct']:.1f}% sotto soglia 95%",
kpi="otif",
value=kpis["otif_pct"],
)
alerts_sent.append("otif")
# Cold chain violations > 0: alert immediato
if kpis["cold_chain_violations"] > 0:
alert_service.send_alert(
severity="CRITICAL",
message=f"{kpis['cold_chain_violations']} violazioni temperatura cold chain!",
kpi="cold_chain_violations",
value=kpis["cold_chain_violations"],
)
alerts_sent.append("cold_chain")
logger.info("KPI check completato. Alert inviati: %s", alerts_sent)
# ----------------------------------------------------------------
# DAG wiring: dipendenze tra task
# ----------------------------------------------------------------
extract_sap = extract_sap_erp()
extract_cold = extract_cold_chain_iot()
extract_edi = extract_edi_partners()
extract_pos = extract_pos_data()
ge_validation = run_ge_validation()
kpi_alerts = check_kpi_alerts()
# Health checks prima di tutto
[check_sap_available, check_mes_available] >> extract_sap
# Estrazione parallela di tutte le sorgenti
[extract_sap, extract_cold, extract_edi, extract_pos] >> dbt_run_silver
# Silver prima di Gold
dbt_run_silver >> dbt_run_gold
# Test dopo Gold
dbt_run_gold >> dbt_test
# GE validation e KPI alerts dopo i test
dbt_test >> ge_validation >> kpi_alerts
変換のための dbt: 食品 KPI の SQL モデル
dbt (データ構築ツール) は、シルバー層とゴールド層のデータを変換するための理想的なツールです。 統合されたテストと文書化された、食品サプライ チェーンの主なモデルを見てみましょう。
Schema.yml: ドキュメントとテスト DBT
# models/silver/schema.yml
version: 2
models:
- name: silver_lots
description: "Lotti di produzione normalizzati da SAP ERP"
config:
tags: ["silver", "lots", "traceability"]
materialized: incremental
incremental_strategy: merge
unique_key: lot_id
columns:
- name: lot_id
description: "Identificativo univoco lotto (GS1 SGTIN)"
tests:
- not_null
- unique
- name: production_date
tests:
- not_null
- name: expiry_date
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "expiry_date > production_date"
- name: plant_code
tests:
- not_null
- accepted_values:
values: ["PL001", "PL002", "PL003", "PL004"]
- name: temperature_class
tests:
- accepted_values:
values: ["ambient", "chilled", "frozen", "ultra_frozen"]
- name: net_weight_kg
tests:
- dbt_utils.expression_is_true:
expression: "net_weight_kg > 0 AND net_weight_kg < 50000"
- name: silver_shipments
description: "Spedizioni normalizzate da TMS e EDI DESADV"
config:
tags: ["silver", "logistics"]
materialized: incremental
unique_key: shipment_id
columns:
- name: shipment_id
tests: [not_null, unique]
- name: planned_delivery_date
tests: [not_null]
- name: actual_delivery_date
description: "NULL se non ancora consegnato"
- name: otif_flag
description: "True se consegnato On-Time and In-Full"
tests:
- accepted_values:
values: [true, false, null]
quote: false
- name: gold_kpi_supply_chain
description: "KPI giornalieri supply chain aggregati per SKU/plant/customer"
config:
tags: ["gold", "kpi", "dashboard"]
materialized: table
columns:
- name: kpi_date
tests: [not_null]
- name: waste_rate_pct
description: "% prodotto sprecato su prodotto totale"
tests:
- dbt_utils.expression_is_true:
expression: "waste_rate_pct >= 0 AND waste_rate_pct <= 100"
- name: otif_pct
tests:
- dbt_utils.expression_is_true:
expression: "otif_pct >= 0 AND otif_pct <= 100"
シルバーモデル: 生産バッチ
-- models/silver/silver_lots.sql
-- {{ config(tags=["silver", "lots"], materialized="incremental") }}
WITH source_sap AS (
SELECT
lot_number AS lot_id,
material_number AS sku_code,
plant AS plant_code,
production_date AS production_date,
-- Normalizza data scadenza (SAP usa formato YYYYMMDD)
TO_DATE(CAST(expiry_date_sap AS VARCHAR), 'YYYYMMDD') AS expiry_date,
quantity_kg AS net_weight_kg,
-- Classi temperatura da tabella materiali SAP
CASE
WHEN temp_cond_sap = '0001' THEN 'ambient'
WHEN temp_cond_sap = '0002' THEN 'chilled'
WHEN temp_cond_sap = '0003' THEN 'frozen'
WHEN temp_cond_sap = '0004' THEN 'ultra_frozen'
ELSE 'unknown'
END AS temperature_class,
-- Range temperatura richiesto (gradi Celsius)
CASE
WHEN temp_cond_sap = '0001' THEN STRUCT(15.0 AS min_c, 25.0 AS max_c)
WHEN temp_cond_sap = '0002' THEN STRUCT(2.0 AS min_c, 8.0 AS max_c)
WHEN temp_cond_sap = '0003' THEN STRUCT(-25.0 AS min_c, -15.0 AS max_c)
WHEN temp_cond_sap = '0004' THEN STRUCT(-60.0 AS min_c, -30.0 AS max_c)
END AS temp_range,
batch_status AS lot_status,
_ingestion_timestamp AS ingested_at
FROM {{ source('bronze', 'sap_erp_batches') }}
WHERE lot_number IS NOT NULL
AND quantity_kg > 0
{% if is_incremental() %}
AND _ingestion_timestamp > (
SELECT MAX(ingested_at) FROM {{ this }}
)
{% endif %}
),
deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY lot_id
ORDER BY ingested_at DESC
) AS row_num
FROM source_sap
),
final AS (
SELECT
lot_id,
sku_code,
plant_code,
production_date,
expiry_date,
net_weight_kg,
temperature_class,
temp_range,
lot_status,
-- Shelf life totale in giorni
DATEDIFF('day', production_date, expiry_date) AS shelf_life_days,
-- Shelf life residua rispetto a oggi
DATEDIFF('day', CURRENT_DATE, expiry_date) AS shelf_life_remaining_days,
ingested_at
FROM deduplicated
WHERE row_num = 1
AND expiry_date > production_date
)
SELECT * FROM final
ゴールド モデル: KPI サプライ チェーン
-- models/gold/gold_kpi_supply_chain.sql
-- {{ config(tags=["gold", "kpi"], materialized="table") }}
WITH lots AS (
SELECT * FROM {{ ref('silver_lots') }}
),
shipments AS (
SELECT * FROM {{ ref('silver_shipments') }}
),
pos_sales AS (
SELECT * FROM {{ ref('silver_pos_sales') }}
),
inventory AS (
SELECT * FROM {{ ref('silver_inventory_snapshot') }}
),
-- KPI 1: Waste Rate (% prodotto sprecato)
waste_kpi AS (
SELECT
production_date AS kpi_date,
plant_code,
-- Prodotto scaduto o smaltito come spreco
SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END) AS wasted_kg,
SUM(net_weight_kg) AS total_produced_kg,
ROUND(
100.0 * SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END)
/ NULLIF(SUM(net_weight_kg), 0),
2
) AS waste_rate_pct
FROM lots
GROUP BY 1, 2
),
-- KPI 2: OTIF (On-Time In-Full)
otif_kpi AS (
SELECT
DATE(planned_delivery_date) AS kpi_date,
customer_code,
COUNT(*) AS total_orders,
SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END) AS otif_orders,
ROUND(
100.0 * SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END)
/ NULLIF(COUNT(*), 0),
2
) AS otif_pct
FROM shipments
WHERE actual_delivery_date IS NOT NULL
GROUP BY 1, 2
),
-- KPI 3: Days of Supply (scorta in giorni)
dos_kpi AS (
SELECT
inv.snapshot_date AS kpi_date,
inv.sku_code,
inv.location_code,
inv.stock_qty_units,
COALESCE(avg_daily_sales.avg_daily_units, 0) AS avg_daily_demand,
ROUND(
inv.stock_qty_units
/ NULLIF(avg_daily_sales.avg_daily_units, 0),
1
) AS days_of_supply
FROM inventory inv
LEFT JOIN (
SELECT sku_code, location_code,
AVG(units_sold) AS avg_daily_units
FROM pos_sales
WHERE sale_date >= DATEADD('day', -30, CURRENT_DATE)
GROUP BY 1, 2
) avg_daily_sales
ON inv.sku_code = avg_daily_sales.sku_code
AND inv.location_code = avg_daily_sales.location_code
),
-- KPI 4: Shelf Life at Receipt (qualità al ricevimento)
slr_kpi AS (
SELECT
DATE(s.actual_delivery_date) AS kpi_date,
l.sku_code,
-- % shelf life residua al momento della consegna
ROUND(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0),
1
) AS shelf_life_pct_at_receipt,
-- Media per SKU
AVG(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0)
) OVER (PARTITION BY DATE(s.actual_delivery_date), l.sku_code)
AS avg_slr_pct
FROM shipments s
INNER JOIN lots l ON s.lot_id = l.lot_id
WHERE s.actual_delivery_date IS NOT NULL
)
-- Join finale per dashboard KPI unificato
SELECT
w.kpi_date,
w.plant_code,
w.waste_rate_pct,
w.wasted_kg,
w.total_produced_kg,
o.otif_pct,
o.total_orders,
d.days_of_supply,
d.avg_daily_demand,
s.avg_slr_pct AS shelf_life_at_receipt_pct,
-- Score complessivo supply chain (0-100)
ROUND(
(
(100.0 - COALESCE(w.waste_rate_pct, 0)) * 0.30 +
COALESCE(o.otif_pct, 0) * 0.40 +
LEAST(d.days_of_supply / 14.0 * 100, 100) * 0.15 +
COALESCE(s.avg_slr_pct, 0) * 0.15
),
1
) AS supply_chain_health_score
FROM waste_kpi w
LEFT JOIN otif_kpi o ON w.kpi_date = o.kpi_date
LEFT JOIN dos_kpi d ON w.kpi_date = d.kpi_date
LEFT JOIN slr_kpi s ON w.kpi_date = s.kpi_date
食品データのデータ品質: 大きな期待
食品業界では、データの品質は単に清潔さの問題ではなく、要件です。 規制的な。温度、有効期限、またはバッチコードに関するデータが不正確になる可能性があります。 FSMA の罰金、高額なリコール、風評被害。 Great Expectations を使用すると、 宣言的な検証ルールを定義し、Airflow パイプラインに統合します。
# great_expectations/expectations/cold_chain_temperature_suite.py
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
context = gx.get_context()
# Crea o aggiorna la suite di aspettative
suite = context.add_or_update_expectation_suite(
expectation_suite_name="cold_chain_temperature_suite"
)
validator = context.get_validator(
batch_request=BatchRequest(
datasource_name="s3_bronze_datasource",
data_connector_name="cold_chain_connector",
data_asset_name="temperature_readings",
),
expectation_suite_name="cold_chain_temperature_suite",
)
# REGOLA 1: Temperatura mai NULL per prodotti refrigerati/surgelati
validator.expect_column_values_to_not_be_null(
column="celsius",
mostly=0.99, # tolleranza 1% per sensori offline temporanei
meta={
"notes": "Sensori IoT cold chain: max 1% di letture mancanti",
"regulatory_reference": "EU Reg 37/2005 cold chain monitoring",
}
)
# REGOLA 2: Range temperatura prodotti refrigerati (2-8°C)
validator.expect_column_values_to_be_between(
column="celsius",
min_value=-30.0,
max_value=35.0,
meta={
"notes": "Range assoluto: include frozen (-25°C) e ambient (25°C)",
}
)
# REGOLA 3: Lot ID sempre presente e formato GS1 SGTIN valido
validator.expect_column_values_to_match_regex(
column="lot_id",
regex=r"^[0-9]{14}$", # GS1 GTIN-14 format
meta={"notes": "GS1 SGTIN-14 standard per tracciabilita alimentare"}
)
# REGOLA 4: Timestamp monotonicamente crescente per sensore
validator.expect_column_values_to_be_increasing(
column="reading_timestamp",
strictly=False, # allow duplicates (es. batch upload)
parse_strings_as_datetimes=True,
meta={"notes": "Verifico che i timestamp non siano retroattivi"}
)
# REGOLA 5: Numero di letture per lotto (almeno 1 ogni 30 min)
validator.expect_table_row_count_to_be_between(
min_value=48, # 24h * 2 letture/ora = 48 minimo
max_value=10000, # max ragionevole per 24h
meta={"notes": "Minimo 1 lettura ogni 30 min per cold chain attiva"}
)
# REGOLA 6: Distribuzione temperatura per classe prodotto
# Prodotti refrigerati: mediana deve essere 2-8°C
validator.expect_column_median_to_be_between(
column="celsius",
min_value=1.0,
max_value=9.0,
meta={
"notes": "Solo per lotti con temperature_class='chilled'",
"applies_to": "filtered_batches_chilled",
}
)
# Salva le aspettative
validator.save_expectation_suite(discard_failed_expectations=False)
# Checkpoint per Airflow
checkpoint = context.add_or_update_checkpoint(
name="bronze_cold_chain_checkpoint",
config_version=1,
template_name=None,
module_name="great_expectations.checkpoint",
class_name="Checkpoint",
run_name_template="%Y%m%d-%H%M%S-cold-chain",
expectation_suite_name="cold_chain_temperature_suite",
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "send_slack_notification_on_failure",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "{{ var.value.slack_webhook_cold_chain }}",
"notify_on": "failure",
"renderer": {
"module_name": "great_expectations.render.renderer",
"class_name": "SlackRenderer",
},
},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
],
)
食品サプライチェーンのKPIと指標
適切な KPI と、データに基づいた意思決定を行うための基礎を定義します。指標は次のとおりです 方式、セクターのベンチマーク、現実的な目標を含む、食品サプライチェーンの基本です。
| KPI | Formula | 業界のベンチマーク | クラス最高の目標を達成する | アラートしきい値 |
|---|---|---|---|---|
| 完全注文率 | 完全な注文数 / 合計注文数 * 100 | 85~92% | >95% | <90% |
| OTIF (オンタイム・フル) | (迅速かつ完全な配達) / 合計 * 100 | 88-94% | >97% | <92% |
| 廃棄率 % | 無駄になったkg / 製品合計のkg * 100 | 3~8% | <2% | >5% |
| 在庫回転率 | 年間原価率 / 平均在庫 | 12~20倍(新鮮) | >25x (フレッシュ) | <10x |
| 供給日数 (DOS) | 利用可能な在庫 / 1 日の平均需要 | 3~7日(生) | 2~4日 | >10日 |
| 充填率 | 納品された単位 / 注文された単位 * 100 | 92-96% | >98% | <94% |
| 受領時の保存期限 (SLR) | 配送残り日数 / 総賞味期限 * 100 | 60-75% | >80% | <60% |
| コールドチェーンのコンプライアンス | 一時違反のないロット / 合計ロット * 100 | 97-99% | >99.5% | <98% |
| 予測精度 | 1 - (MAE / 平均需要) * 100 | 75-85% | >90% | <75% |
| 現金から現金へのサイクル | DIO + DSO - DPO (日) | 15~35日 | 15日未満 | >45日 |
サプライチェーン健全性スコアの計算方法
単一の複合スコアで KPI を重み付けすることで、経営陣が健全性を監視できるようになります サプライチェーン全体を 1 つの数字で表示:
- OTIF: 重み 40% (顧客満足度に直接影響)
- 廃棄率: 重量 30% (利益と持続可能性への影響)
- 受領時の賞味期限: 重量 15% (消費者向け品質)
- コールド チェーン コンプライアンス: 重量 15% (食品の安全性とコンプライアンス)
スコア > 90: 優れた | 80-90: 良い | 70-80: 改善予定 | <70: クリティカル
リアルタイムとバッチ: いつどのアーキテクチャを使用するか
すべてがリアルタイムである必要はありません。ストリーミング アーキテクチャのコストは大幅に増加します 従来のバッチ アーキテクチャよりも優れています。食品サプライチェーンにおける選択 どちらが正しいかは、許容可能な遅延と運用上の結果によって決まります。
| 使用事例 | 建築 | レイテンシ | 楽器 | モチベーション |
|---|---|---|---|---|
| コールドチェーン監視(温度) | リアルタイムストリーミング | 1分未満 | カフカ + フリンク | 温度違反 = 即時のバッチ損失 |
| リコール管理 | リアルタイム/イベントドリブン | 5分未満 | Kafka + アラート | FSMA: 2 時間未満でバッチを追跡 |
| GPS車両追跡 | ほぼリアルタイム | 2分未満 | MQTT + InfluxDB | お客様向けに到着予定時刻を更新しました |
| 日次 KPI (OTIF、廃棄物) | 毎日のバッチ | T+2時間 | エアフロー+dbt | 朝の運行報告 |
| 需要予測 | 日次/週次バッチ | T+6h | エアフロー+MLフロー | 生産計画はリアルタイム性を必要としない |
| インベントリのスナップショット | マイクロバッチ(15分ごと) | 15分 | エアフロー + スノーフレーク | 倉庫オペレーター: 十分な可視性 |
| 毎週の廃棄物分析 | 毎週のバッチ | T+24h | dbt+BIツール | 運用上の決定ではなく、戦略的な決定 |
| FSMA コンプライアンス監査 | オンデマンドでバッチ処理 | リクエストに応じて | DBT + データ保管庫 | 毎日の検査ではなく計画的な検査 |
# Architettura Lambda per Food Supply Chain
# Speed layer (real-time) + Batch layer (accuracy)
# SPEED LAYER: Kafka + Apache Flink
# Processa eventi in milliseconds: cold chain, recall alerts
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("cold-chain-readings") \
.set_group_id("flink-cold-chain-consumer") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
cold_chain_stream = env.from_source(
kafka_source,
watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(
Duration.of_seconds(30)
),
source_name="cold-chain-kafka",
)
# Filtra violazioni temperatura in tempo reale
violations_stream = cold_chain_stream \
.map(lambda msg: json.loads(msg)) \
.filter(lambda r: (
r.get("temperature_class") == "chilled" and
(r["celsius"] < 2.0 or r["celsius"] > 8.0)
)) \
.map(lambda r: {
**r,
"violation_severity": "CRITICAL" if r["celsius"] > 12.0 else "WARNING",
"violation_detected_at": datetime.utcnow().isoformat(),
})
# Scrivi violations su Kafka alert topic
violations_stream.sink_to(
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("cold-chain-violations")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)
env.execute("food-cold-chain-monitoring")
レガシーシステムとの統合
ほとんどの農産食品会社はレガシー システムで運用されています: SAP R/3 オブ ザ イヤー '90、SFTP 経由の EDIFACT EDI ファイル、REST API を使用しない Oracle データベース。これらを統合すると、 システムには特定のアプローチが必要です。
SAP コネクタ: RFC および BAPI
# src/connectors/sap_connector.py
"""
Connettore SAP ERP via pyrfc (SAP RFC/BAPI wrapper).
Richiede: SAP NetWeaver RFC SDK + pyrfc library.
"""
import pyrfc
import pandas as pd
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class SAPConnectionConfig:
ashost: str
sysnr: str
client: str
user: str
passwd: str
lang: str = "IT"
class SAPConnector:
"""Wrapper per SAP RFC/BAPI calls per dati supply chain."""
def __init__(self, config: SAPConnectionConfig):
self._config = config
self._conn: Optional[pyrfc.Connection] = None
def __enter__(self):
self._conn = pyrfc.Connection(
ashost=self._config.ashost,
sysnr=self._config.sysnr,
client=self._config.client,
user=self._config.user,
passwd=self._config.passwd,
lang=self._config.lang,
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn:
self._conn.close()
def get_production_batches(
self,
date_from: str,
date_to: str,
plants: list[str],
) -> pd.DataFrame:
"""
Estrae lotti di produzione da SAP tramite BAPI_BATCH_GET_DETAIL.
Args:
date_from: Data inizio formato YYYYMMDD
date_to: Data fine formato YYYYMMDD
plants: Lista codici stabilimento SAP
"""
all_batches = []
for plant in plants:
try:
# Chiama BAPI SAP per lista lotti
result = self._conn.call(
"BAPI_BATCH_GET_DETAIL",
PLANT=plant,
DATE_FROM=date_from,
DATE_TO=date_to,
BATCH_STATUS="", # tutti gli stati
)
batches = result.get("BATCH_DETAIL_LIST", [])
logger.info(
"SAP plant %s: %d lotti estratti",
plant,
len(batches),
)
for batch in batches:
all_batches.append({
"lot_id": f"{batch['MATNR']}-{batch['CHARG']}",
"material_number": batch["MATNR"].strip(),
"lot_number": batch["CHARG"].strip(),
"plant": plant,
"production_date": batch.get("HSDAT"),
"expiry_date_sap": batch.get("VFDAT"),
"batch_status": batch.get("ZUSTD"),
"quantity_kg": float(batch.get("CLABS", 0)),
"temp_cond_sap": batch.get("MHDRZ", "0001"),
"_source": "SAP_ERP",
"_extracted_at": pd.Timestamp.utcnow().isoformat(),
})
except pyrfc.ABAPApplicationError as e:
logger.error(
"SAP BAPI error per plant %s: %s",
plant,
str(e),
)
# Non fallisce tutta l'estrazione per un singolo plant
continue
return pd.DataFrame(all_batches)
def get_stock_movements(
self,
date_from: str,
date_to: str,
movement_types: list[str],
) -> pd.DataFrame:
"""
Estrae movimenti di magazzino SAP via MB51 BAPI equivalent.
movement_types: ['101'=GR, '261'=GI to production, '601'=GI to customer]
"""
result = self._conn.call(
"BAPI_GOODSMVT_GETDETAIL",
GOODSMVT_DATE_FROM=date_from,
GOODSMVT_DATE_TO=date_to,
)
movements = []
for item in result.get("GOODSMVT_ITEMS", []):
if item["BWART"] in movement_types:
movements.append({
"movement_id": f"{item['MBLNR']}-{item['ZEILE']}",
"material": item["MATNR"].strip(),
"plant": item["WERKS"],
"movement_type": item["BWART"],
"quantity": float(item.get("MENGE", 0)),
"unit": item.get("MEINS", "KG"),
"lot_number": item.get("CHARG", ""),
"posting_date": item["BUDAT"],
"_source": "SAP_MOVEMENTS",
})
return pd.DataFrame(movements)
CDC と Debezium によるレガシー データベースからのストリーミング
# debezium-config/oracle-erp-connector.json
# Configurazione Debezium per CDC da Oracle EBS (ERP legacy)
{
"name": "oracle-erp-food-supply-chain",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "oracle-erp.internal",
"database.port": "1521",
"database.user": "debezium_cdc",
"database.password": "${ORACLE_CDC_PASSWORD}",
"database.dbname": "FOOD_ERP",
"database.pdb.name": "FOOD_PDB",
"database.server.name": "oracle-erp",
"table.include.list": "FOOD_ERP.INV_TRANSACTIONS,FOOD_ERP.MTL_LOT_NUMBERS,FOOD_ERP.WSH_DELIVERY_DETAILS",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.oracle.food_erp",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.oracle.food_erp",
"transforms": "route,addFields",
"transforms.route.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.addFields.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addFields.static.field": "_cdc_source",
"transforms.addFields.static.value": "oracle-erp",
"include.schema.changes": "true",
"snapshot.mode": "initial_only",
"decimal.handling.mode": "double",
"time.precision.mode": "connect",
"tombstones.on.delete": "true",
"topic.prefix": "food-cdc"
}
}
小売注文用の EDIFACT EDI パーサー
# src/connectors/edi_connector.py
"""
Parser EDI EDIFACT per messaggi ORDERS, DESADV, RECADV (GS1 subset).
Usa pydifact library per parsing EDIFACT messages.
"""
import pandas as pd
from pydifact.segmentcollection import SegmentCollection
from pydifact.segments import Segment
from pathlib import Path
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def parse_edifact_orders(file_path: str | Path) -> pd.DataFrame:
"""
Parsa un file EDI EDIFACT ORDERS D.01B (GS1 EANCOM).
Restituisce DataFrame con righe ordine normalizzate.
Struttura messaggio ORDERS EDIFACT:
UNB (Interchange header)
UNH (Message header)
BGM (Order message identifier)
DTM (Date/time)
NAD+BY (Buyer = retailer)
NAD+SU (Supplier = nostro sistema)
LIN (Line item)
+QTY (Quantity ordered)
+DTM (Requested delivery date)
+PIA (Product code GTIN)
UNT (Message trailer)
"""
file_content = Path(file_path).read_text(encoding="latin-1")
collection = SegmentCollection.from_str(file_content)
orders = []
current_order: dict = {}
current_line: dict = {}
for segment in collection.segments:
tag = segment.tag
if tag == "BGM":
# Inizio nuovo ordine
current_order = {
"order_number": segment.elements[1] if len(segment.elements) > 1 else None,
"document_type": segment.elements[0][0] if segment.elements else None,
"lines": [],
}
elif tag == "DTM" and current_order:
# Data ordine o data consegna richiesta
qualifier = segment.elements[0][0]
date_str = segment.elements[0][1]
date_fmt = segment.elements[0][2]
if qualifier == "137": # Document date
try:
current_order["order_date"] = datetime.strptime(
date_str, "%Y%m%d"
).date().isoformat()
except ValueError:
logger.warning("DTM parse error: %s", date_str)
elif qualifier == "2": # Requested delivery date
try:
current_order["requested_delivery_date"] = datetime.strptime(
date_str, "%Y%m%d"
).date().isoformat()
except ValueError:
pass
elif tag == "NAD":
qualifier = segment.elements[0]
party_id = segment.elements[1][0] if len(segment.elements) > 1 else None
party_name = segment.elements[3] if len(segment.elements) > 3 else None
if qualifier == "BY": # Buyer (retailer)
current_order["buyer_gln"] = party_id
current_order["buyer_name"] = party_name
elif qualifier == "SU": # Supplier
current_order["supplier_gln"] = party_id
elif tag == "LIN":
# Salva riga precedente se esiste
if current_line and current_order:
current_order["lines"].append(current_line)
current_line = {
"line_number": segment.elements[0],
"gtin": None,
"qty_ordered": None,
"delivery_date": None,
}
elif tag == "PIA" and current_line:
# Product identification: GTIN-13 o GTIN-14
if len(segment.elements) > 1:
qualifier = segment.elements[1][1] if len(segment.elements[1]) > 1 else ""
if qualifier in ["SRV", "GE1"]: # GTIN qualifiers
current_line["gtin"] = segment.elements[1][0]
elif tag == "QTY" and current_line:
qualifier = segment.elements[0][0]
if qualifier == "21": # Ordered quantity
current_line["qty_ordered"] = float(segment.elements[0][1])
current_line["qty_unit"] = segment.elements[0][2] if len(segment.elements[0]) > 2 else "PCE"
# Normalizza output
for order in orders:
for line in order.get("lines", []):
orders.append({
"order_number": order.get("order_number"),
"order_date": order.get("order_date"),
"requested_delivery_date": order.get("requested_delivery_date"),
"buyer_gln": order.get("buyer_gln"),
"buyer_name": order.get("buyer_name"),
"supplier_gln": order.get("supplier_gln"),
"line_number": line.get("line_number"),
"gtin": line.get("gtin"),
"qty_ordered": line.get("qty_ordered"),
"qty_unit": line.get("qty_unit"),
"_source": "EDI_EDIFACT_ORDERS",
"_parsed_at": datetime.utcnow().isoformat(),
})
return pd.DataFrame(orders)
ケーススタディ: 500 の農家を持つ農業協同組合
これらのパターンが実際のケースにどのように適用されるかを見てみましょう: ある大規模な農業協同組合 500の関連農業会社、3つの加工および加工センターを擁する北イタリアの 大規模な小売取引と地元の市場の間には 200 の販売拠点があります。
協力概要
- 500 の関連農場: 果物と野菜、シリアル、マメ科植物 - 8,000 個のセンサーからの IoT MQTT データ
- マシニングセンター3台: SAP R/3 レガシー (2005)、カスタム MES、LIMS
- 200 販売ポイント: 異種 POS (3 つの異なるベンダー)、15 の小売店による EDI EDIFACT
- 開始状況: Excel シート、サイロ化されたデータ、エンドツーエンドの可視性なし
- 目標: 廃棄物を 40% 削減、OTIF を 95% に改善、FSMA 準拠
フェーズ 1: 評価とソース インベントリ (第 1 ~ 4 週)
# Inventario sorgenti dati: output assessment
SOURCE_INVENTORY = {
"farm_iot": {
"count": 8000,
"protocol": "MQTT v3.1.1",
"broker": "Mosquitto on-premise",
"format": "JSON custom",
"issues": [
"Schema non standardizzato tra vendor IoT diversi",
"15% sensori con timestamp errati (clock drift)",
"3 farm su 500 senza connettivita stabile (4G intermittente)",
],
"recommended_fix": "Normalizzazione schema + edge buffer (Mosquitto local)",
},
"sap_erp": {
"version": "SAP R/3 4.7 (anno 2005)",
"tables_relevant": ["MCHA", "MCH1", "MSEG", "MKPF", "VBAK", "VBAP"],
"issues": [
"No REST API: solo RFC/BAPI",
"Batch extraction window: 01:00-03:00 (finestra ristretta)",
"Codifica caratteri: ISO-8859-1 (non UTF-8)",
"Date in formato SAP (YYYYMMDD come intero)",
],
"recommended_fix": "pyrfc + ETL incremental con delta timestamp",
},
"edi_partners": {
"partners": 15,
"standards": ["EANCOM D.01B", "X12 850/856", "GS1 XML 3.1"],
"transport": "SFTP (12 partner), AS2 (3 partner)",
"issues": [
"3 retailer usano EDIFACT non-standard (varianti proprietarie)",
"Mancanza di RECADV da 5 retailer: no conferma ricezione",
],
},
"pos_retail": {
"vendors": ["Cassa Easy", "NCR Aloha", "Custom PHP legacy"],
"issues": [
"3 formati CSV diversi",
"Nessun campo shelf_life nei dati POS",
"Granularità: ora (non transazione)",
],
},
}
フェーズ 2: テクニカル スタックの実装 (第 5 週から第 16 週)
# Stack tecnico implementato per la cooperativa
INFRASTRUCTURE = {
"data_lake": "AWS S3 (Medallion: Bronze/Silver/Gold)",
"compute": "AWS EMR Serverless (Spark per bulk load iniziale)",
"warehouse": "Snowflake (pay-per-query, snowpark per ML features)",
"orchestration": "Apache Airflow 2.9 su AWS MWAA (managed)",
"transformation": "dbt Cloud (Team plan: 8 developers)",
"data_quality": "Great Expectations + Slack alerts",
"streaming": "Apache Kafka su MSK (managed) per cold chain",
"monitoring": "Grafana + Prometheus (Airflow metrics, DWH queries)",
"ci_cd": "GitHub Actions (dbt CI: test su ogni PR)",
"secret_management": "AWS Secrets Manager",
}
TIMELINE = [
{"week": "1-4", "milestone": "Assessment sorgenti + architettura design"},
{"week": "5-7", "milestone": "Bronze layer: connettori SAP + IoT MQTT"},
{"week": "8-10", "milestone": "Bronze layer: EDI + POS + LIMS"},
{"week": "11-12","milestone": "Silver layer: dbt models + data quality"},
{"week": "13-14","milestone": "Gold layer: KPI dashboard + alerting"},
{"week": "15-16","milestone": "UAT + go-live + team training"},
}
TEAM = {
"data_engineers": 3,
"dbt_developers": 2,
"sap_consultant": 1, # part-time per RFC/BAPI
"project_manager": 1,
}
6か月後に得られた結果
| KPI | 前 (ベースライン) | 6か月後 | 改善 |
|---|---|---|---|
| 廃棄率 | 7.2% | 4.3% | -40% 廃棄物 (-2.9pp) |
| OTIF | 88% | 94% | +6pp |
| 受領時の保存期限 | 58% | 74% | +16pp |
| コールドチェーンのコンプライアンス | 94% (推定) | 99.1% (測定値) | 実際の視認性で +5.1pp |
| リコール解決時間 | 48~72時間 | 2~4時間 | -93% の応答時間 |
| ディレクターマニュアルレポート | 週に 3 ~ 4 時間 | 自動ダッシュボード | -100%手作業 |
| 予測精度に関する質問 | 71% | 86% | +15pp |
| 供給日数(生鮮) | 8~12日 | 4~6日 | -50% 余剰在庫 |
ケーススタディから学んだ教訓
- SAP RFC/BAPI は思っているよりも遅いです。 夜間のバッチウィンドウ が最も重大なボトルネックでした。解決策は IT 部門と窓口を交渉することでした 幅を広くし、増分抽出を使用してボリュームを最小限に抑えます。
- EDI は決して標準ではありません。 小売パートナー 15 社のうち 11 社にばらつきがありました EANCOM 標準の所有者。 EDIFACT パーサーの完成には 2 週間かかりました 各パートナーの仕様に合わせた校正。
- データ品質は単なる技術的な問題ではなく、文化的な問題です。 30% 品質問題は、オペレーターによる SAP への誤った手動入力によって発生しました。 技術的解決策 (Great Expectations) は問題を浮き彫りにしましたが、解決策はまだ残っています。 トレーニングと変更管理が必要です。
- ブロンズ層をすべて保存: 半年に3回必要だった Silver 変換を再実行して、dbt モデルのバグを修正します。おかげで ブロンズ層は不変で、データは失われません。
- dbt CI/CD は作業方法を変えます。 すべてのモデルをすべてのモデルでテストする PR により、最初の 1 か月で推定 4 件の本番インシデントが防止されました。
FoodTechシリーズ総まとめ
業界を変革するテクノロジーを乗り越えるには長い時間がかかりました 食べ物。この 10 回の記事シリーズで私たちがまとめたものは次のとおりです。
| # | アイテム | 学んだ重要な概念 | 技術スタック |
|---|---|---|---|
| 00 | 精密農業向けの IoT パイプライン | MQTT ブローカー、フィールド センサー、データ レイクの取り込み、QoS レベル | Python、MQTT、モスキート、MinIO |
| 01 | 作物の病気を検出するための ML Edge | TensorFlow Lite 量子化、エッジ推論、ARM 上でのモデル展開 | TFLite、Raspberry Pi、Python、OpenCV |
| 02 | アグリテック向けの衛星および気象 API | Sentinel-2 NDVI、Planet Labs API、ML 用の気象特徴エンジニアリング | Sentinel Hub、OpenMeteo、rasterio、GeoPandas |
| 03 | 食品トレーサビリティシステム | プライベートブロックチェーン (Hyperledger)、RFID EPCIS、GS1 標準 | ハイパーレジャーファブリック、RFID、EPCIS、Python |
| 04 | 品質管理のためのコンピュータビジョン | 製品データセットのYOLO微調整、産業用パイプライン推論 | PyTorch、YOLO、FastAPI、OpenCV |
| 05 | FSMA 204 自動化 | KDE、CTE が必要とする食品、ロットのトレーサビリティ、リコールの自動化 | Python、FastAPI、PostgreSQL、アラート |
| 06 | 垂直農法の自動化 | 環境パラメータ制御、ロボットスケジューリング、農場用REST API | Python、FastAPI、InfluxDB、MQTT |
| 07 | 廃棄物削減のための需要予測 | LSTM、Prophet、食品の季節性特徴エンジニアリング、バックテスト | PyTorch、Prophet、MLflow、パンダ |
| 08 | ファームIoT用のリアルタイムダッシュボード | Angular シグナル、Grafana プロビジョニング、InfluxDB クエリ、アラート | Angular 21、Grafana、InfluxDB、WebSocket |
| 09 | 農場から小売業者までのサプライチェーン ETL | メダリオン アーキテクチャ、エアフロー DAG、dbt モデル、Great Expectations、KPI | エアフロー、dbt、スノーフレーク、GE、SAP RFC、EDI |
さらに詳しく知りたい人のためのロードマップ
シリーズ全体を読んだ場合は、実際の FoodTech プロジェクトに取り組むための強固な基盤ができたことになります。 各ドメインをさらに深く掘り下げるための構造化されたロードマップは次のとおりです。
おすすめの徹底講座
- IoT とエッジ コンピューティング: MQTT 5.0 (今回使用されている 3.1.1 と比較) を検討してください 記事)、マネージド ソリューションには Azure IoT Hub または AWS IoT Core、データには Apache NiFi 異種の IoT ソースからの取り込み。
- 本番環境での ML: このブログの MLOps シリーズでは、その方法を説明します。 バージョン モデル、A/B テストを実施し、MLflow と Seldon を使用して本番環境のドリフトを監視します。
- データアーキテクチャ: Data Lakehouse について詳しく見る (Data & AI シリーズ) Business ) Snowflake、Databricks、Apache Iceberg がどのように比較されるかを理解するため サプライチェーン分析プラットフォーム。
- サプライチェーン向けの LLM と生成 AI: 大規模な言語モデルでは、 コンプライアンス文書から構造化された情報を抽出し、サプライヤー契約を分析する 自動レポートを生成します。ブログの AI エンジニアリング シリーズでは、RAG と微調整について説明します。
- 検討すべき業界標準: GS1 グローバル規格 (GTIN、GLN、SSCC、 EPCIS)、FSMA 204 (FDA)、EU Reg. 178/2002 (食品トレーサビリティ)、ISO 22000 (HACCP)。
- 関連する認定: AWS 認定データエンジニア、dbt 認定 開発者、天文学者認定 Apache Airflow 開発者、Databricks データ エンジニア アソシエイト。
食品ETLのベストプラクティスとアンチパターン
ベストプラクティス
- ブロンズ層は不変です: 生データは決して変更しないでください。バグを見つけたら シルバー dbt モデルでは、モデルを修正して再実行します。ブロンズはそのまま残ります。 これは FSMA および EU Reg にとって基本的なものです。 178/2002 監査証跡。
-
エアフロー DAG の冪等性: 各タスクは再実行可能でなければなりません
副作用なしで。アメリカ合衆国
replace=TrueS3 アップロードの場合、MERGEの代わりにINSERTデータベースの場合、および増分 dbt モデルの一意のキー。 -
各タスクの明示的なタイムアウト: SAP RFC のようなレガシー システムでは、
無期限にブロックします。常に設定する
execution_timeout=timedelta(hours=2)各エアフロータスクで。 - 取り込みラグのモニタリング: 02:00に開始されるパイプライン しかし、06:00(業務シフト前)までに完了しないと役に立ちません。モニターi 完了時間を設定し、Airflow で SLA アラートを設定します。
- パートナー向けの EDI スキームの分離: 専用の EDI パーサーを維持する 各小売パートナーごとに。独自のバリアントによりパーサーが不可能になる ユニークな普遍性。
- 文書化された血統の日付: dbt は自動的に次のグラフを生成します。 血統。これを使用して、「この OTIF KPI はどこから来たのですか?」という監査の質問に答えることができます。
避けるべきアンチパターン
- 抽出レイヤーでの変換 (クラシック ETL): 変換する ブロンズ レイヤーに保存する前のデータは、実行できる可能性が失われることを意味します。 バックフィルまたはエラーを修正します。常に生のままロードし、後で dbt で変換します。
- すべてに対応するモノリシック DAG: 50 以上のタスクを含む単一の DAG は次のようになります。 デバッグ不可能。ドメイン(SAP DAG、IoT DAG、EDI DAG)ごとに分けて利用する DAG 間の依存関係の Airflow TriggerDagRunOperator。
- タイムゾーンを無視する: シチリア島の農場、配送センター ミラノの小売業者とドイツの小売業者は異なるタイムゾーンを使用しています。常に UTC に変換します ブロンズ層にあり、あらゆる場所でタイムゾーン対応のタイムスタンプを使用します。
- DAG 内のハードコーディングされた認証情報: SAP パスワードや API トークンは決して入力しないでください エアフローコード内。 Airflow 接続と変数、または AWS Secrets Manager を使用します。
- ブロンズでデータ品質をスキップ: ゴールドレイヤーのみを検証しすぎます 遅い。問題はできるだけ早く特定する必要があります: 温度測定値が正しくない ブロンズでは、ゴールドでは誤検知リコールになります。
- エグゼキュータとしてのエアフロー スケジューラ: タスクは実行してはなりません Airflow スケジューラ自体から。常に別の Executor (CeleryExecutor、 KubernetesExecutor) を使用して、重いワークロードを分離します。
結論: 道の終わり、別の道の始まり
フードテックシリーズもこれで終わりです。 10 回の記事で、私たちは部分を構築しました 食品サプライチェーンのビジョンを一つずつ 完全にデータドリブン: リアルタイムでデータを送信する現場のセンサー、病気を検出する ML モデル 人間の目の前で、不変のトレーサビリティのためのブロックチェーン、 品質管理、自動化されたFSMAコンプライアンス、API経由で制御される垂直農場、 無駄を削減する需要予測、管理用のリアルタイム ダッシュボード、 この最新の記事では、すべてをまとめる ETL パイプラインについて説明します。
農産物・食品分野は、デジタル化が最も複雑な分野の 1 つです。予測できない季節性、 生鮮食品、厳しい規制、数十年にわたって統合されたレガシー システム、サプライ チェーン 数百人のプレイヤーがいるグローバルなゲームです。しかし、だからこそ、価値を生み出す機会が生まれるのです。 テクノロジーを使えば、それらは巨大になります。協同組合の廃棄物率を 7% から 4% に削減 500 の農場では、それは単なる経営改善ではなく、お金、食料、環境への影響です。
2030 年の食料サプライチェーンは次のように定義されます。 リアルタイムデータ (各ロットは播種からフォークまで追跡されます)、 予測AI (過剰在庫ゼロ、欠品ゼロ、廃棄ゼロ)、 自動化されたコンプライアンス (FSMA 監査は 2 週間ではなく 2 時間で完了します) e 測定可能な持続可能性 (区画ごとの二酸化炭素排出量、ヘクタールごとの水使用量)。
テクノロジーはすべてそこにあります。この記事で学んだ ETL パターン — Airflow オーケストレーション用、変換用の dbt、データ品質に対する大きな期待、メダリオン 構造のためのアーキテクチャ — あらゆる農産食品会社にすぐに適用可能 中型または大型のサイズ。
他の関連ブログ シリーズを調べる
- データ&AIビジネス(シリーズ14): データウェアハウスを詳しく調べる (Snowflake、Databricks、BigQuery)、dbt および Airbyte を使用した ETL/ELT、MLOps エンタープライズ データ ガバナンス — この FoodTech シリーズを自然に補完するものです。
- MLOps (シリーズ 5): オンデマンドの ML モデルを本番環境に導入する方法 記事 01、04、07 で構築した予測とコンピューター ビジョン。
- AI エンジニアリング/RAG (シリーズ 6): LLM を使用してクエリを実行する方法 コンプライアンス文書、HACCP 証明書の分析、自動レポートの生成 サプライチェーンのために。
- PostgreSQL AI (10 シリーズ): セマンティック検索に pgvector を使用する方法 製品の説明、レシピ、食品の技術仕様など、次のような場合に役立ちます。 食品分野における製品カタログの管理。
- エナジーテック(シリーズ): 多くの IoT テクノロジーとデータ パイプライン FoodTechシリーズはエネルギーモニタリングに使用されるものと同じです 工業用食品工場の様子。
FoodTechシリーズを最後までご覧いただきありがとうございます。コーディングを楽しんでください。そして食欲をそそります。







