식품 공급망: 농장에서 소매점까지 ETL 패턴
매년, 대략 인간이 소비하기 위해 생산된 모든 식품의 3분의 1이 손실되거나 낭비됩니다. 식품 공급망을 따라: 미국에서만 4,730억 달러를 초과하는 가치 전 세계적으로 매일 10억 끼의 식사가 버려지고 있습니다. 역설적이게도 상당한 비중을 차지하고 있다. 이 폐기물은 기후나 생물학적 요인이 아니라 데이터 문제인 가시성에 달려 있습니다. 부족함, 레거시 시스템이 통합되지 않음, KPI 계산이 늦어짐, 정보에 기반한 의사결정 불완전하거나 부정확합니다.
글로벌 시장 식품 공급망 관리 2025년에는 1,828억 1천만 달러의 가치가 있습니다. 2034년에는 3,593억 9천만 달러(CAGR 7.8%)로 성장할 것입니다. 그러나 식품 공급업체의 48%는 여전히 일상 업무용 스프레드시트를 사용하고 있으며 60%는 시간이 많이 소요되는 반복적인 수동 작업을 보고했습니다. 소중한 시간. 기술이 허용하는 것과 농식품 기업이 허용하는 것 사이의 격차 그들은 실제로 구현하고 거대합니다.
FoodTech 시리즈의 최신 기사인 이 기사는 근본적인 문제를 해결합니다. 강력하고 확장 가능한 ETL/ELT 파이프라인 농장에서 소매업체까지 이기종 데이터를 통합하고, Apache Airflow로 흐름을 조정하고, dbt로 데이터를 변환하고, Great로 품질을 보장하세요. 식품 부문별 공급망 KPI를 통해 기대치를 예측하고 성과를 측정합니다.
무엇을 배울 것인가
- 식품 공급망(농장 → 가공 → 유통 → 소매)의 End-to-End 데이터 아키텍처
- 이기종 데이터 소스: ERP(SAP/Oracle), MES, LIMS, WMS, TMS, POS, IoT, EDI
- ETL과 ELT: 식품 데이터에 대한 접근 방식을 선택하는 시기
- 오류 처리 및 재시도 기능을 갖춘 푸드 파이프라인용 Apache Airflow DAG 완성
- 공급망 KPI를 위한 DBT 모델: 폐기물 비율, OTIF, 채우기 비율, 공급 일수
- 콜드체인 및 만료일에 대한 큰 기대를 지닌 데이터 품질
- 레거시 시스템 통합: SAP RFC/BAPI, EDI EDIFACT, Debezium을 사용한 CDC
- 사례 연구: 농업협동조합 500개 농장, 3개 가공센터, 200개 판매점
- 전체 FoodTech 시리즈 전체 요약(10개 기사)
FoodTech 시리즈: 우리가 있는 곳
이것은 시리즈의 열 번째이자 마지막 에피소드입니다. 푸드테크, 주요 내용을 살펴보았습니다. 식품 공급망 전체에 적용 가능한 디지털 기술: 현장 데이터 수집부터 소매. 전체 그림은 다음과 같습니다.
| # | 제목 | 기술 | 수준 |
|---|---|---|---|
| 00 | Python 및 MQTT를 사용한 정밀 농업용 IoT 파이프라인 | IoT, MQTT, Python, 데이터 레이크 | 고급의 |
| 01 | 농작물 질병 감지를 위한 ML Edge: Raspberry Pi의 TensorFlow Lite | TensorFlow Lite, Edge ML, 라즈베리 파이 | 고급의 |
| 02 | AgriTech용 위성 및 날씨 API: 예측 데이터 | 센티넬, 플래닛, 날씨 API, NDVI | 고급의 |
| 03 | 식품 추적 시스템: 블록체인, RFID 및 IoT | 블록체인, RFID, IoT, 규정 준수 | 고급의 |
| 04 | PyTorch YOLO를 사용한 식품 품질 관리용 컴퓨터 비전 | YOLO, PyTorch, 컴퓨터 비전 | 고급의 |
| 05 | FSMA 204 자동화: Python을 통한 추적, 경고 및 회수 | FSMA, 규정 준수, Python, 리콜 | 고급의 |
| 06 | 수직 농업 자동화: API를 통한 로봇 제어 | 로봇공학, API, 자동화 | 고급의 |
| 07 | 폐기물 감소를 위한 수요 예측: ML 시계열 | LSTM, Prophet, 시계열 ML | 고급의 |
| 08 | Angular 및 Grafana를 사용한 농장 IoT용 실시간 대시보드 | Angular, Grafana, InfluxDB, 실시간 | 고급의 |
| 09 | 식품 공급망: 농장에서 소매점까지 ETL 패턴 <-- 당신은 여기에 있습니다 | 공기 흐름, dbt, 위대한 기대, ETL | 고급의 |
이 기사가 시리즈를 마무리하기 때문에
이전 기사에서는 체인의 개별 노드를 다루었습니다. 현장에서 IoT 데이터 수집, ML 에지 모델, 블록체인 추적성, 품질을 위한 컴퓨터 비전, FSMA 규정 준수, 수직 농업, 수요 예측 및 실시간 대시보드. 이 기사는 이들 모두를 연결합니다. ETL 파이프라인과 각 데이터 소스를 통합하는 백본은 원시 정보를 변환합니다. 실행 가능한 KPI로 전환하고 농장에서 슈퍼마켓 선반까지 엔드투엔드 가시성을 제공합니다.
식품 공급망의 데이터 아키텍처
식품 공급망은 각각 고유한 정보 시스템, 형식을 갖춘 별개의 노드로 구성됩니다. 데이터 및 업데이트 빈도. ETL 파이프라인은 이러한 이질성을 강력하게 처리해야 합니다.
# Architettura logica della supply chain alimentare
# (dati generati ad ogni step)
FARM SYSTEMS
|-- Sensori IoT (MQTT): temp suolo, umidita, pH, EC
|-- Sistema ERP agricolo: colture pianificate, costi input
|-- Registro trattamenti fitosanitari (LIMS)
|-- Geo-dati GPS macchinari agricoli
|-- API meteo (OpenMeteo, Copernicus)
|
v
PROCESSING PLANTS (Centri di Lavorazione)
|-- MES (Manufacturing Execution System): ordini produzione, rese
|-- LIMS: analisi chimiche, microbiologiche, allergeni
|-- ERP SAP/Oracle: distinta base, lotti, tracciabilita
|-- WMS (Warehouse Management): magazzino materie prime/finiti
|-- Sensori cold chain: temperatura/umidita durante lavorazione
|
v
DISTRIBUTION CENTERS
|-- WMS: gestione stock, picking, slotting
|-- TMS (Transport Management): spedizioni, veicoli, routing
|-- Sensori cold chain: logger temperatura nei camion frigoriferi
|-- EDI: ordini da retailer (EDIFACT ORDERS, DESADV)
|-- Tracker GPS: posizione veicoli in tempo reale
|
v
RETAIL POS
|-- POS data: vendite per SKU, store, ora
|-- Gestione shelf life: prodotti prossimi scadenza
|-- EDI: conferme ricezione (RECADV), fatture (INVOIC)
|-- Inventario store: stock disponibile per punto vendita
|
v
CONSUMER (segnali indiretti)
|-- App loyalty: acquisti per cliente, frequenza
|-- Reviews prodotto: qualità percepita
|-- Social sentiment: trend di consumo
각 노드는 다양한 형식(관계형 SQL, CSV/EDI 플랫 파일, REST API의 JSON, 밀리초 단위의 주파수를 갖는 바이너리 MQTT 메시지, 레거시 SFTP 파일(IoT 센서) 주간(ERP 보고서)으로. 식품 데이터 엔지니어의 과제는 이 모든 것을 통합하는 것입니다. 하나의 일관된 분석 모델로 전환합니다.
이기종 데이터 소스: 참조 테이블
파이프라인을 설계하기 전에 소스를 조사하는 것이 필수적입니다. 주요 내용은 다음과 같습니다. 중간 정도의 복잡성을 지닌 식품 공급망의 경우:
| 체계 | 유형 | 체재 | 거래량/일 | 빈도 | 허용 가능한 지연 시간 |
|---|---|---|---|---|---|
| SAP ERP(농장/플랜트) | ERP | RFC/BAPI, IDoc, SQL | 50,000~500,000개의 레코드 | 야간 배치/CDC | T+1시간 |
| 오라클 EBS | ERP | SQL, REST API | 10만~100만 개의 레코드 | 배치/CDC | T+1시간 |
| MES(생산) | MES | OPC-UA, SQL, REST | 100만~1000만 이벤트 | 거의 실시간(1분) | 5분 |
| LIMS (실험실) | 림스 | HL7, CSV, REST | 1K~10K 분석 | 일일 | T+4시간 |
| WMS(창고) | WMS | SQL, EDI, REST | 10K~100K 움직임 | 15분마다 | 15분 |
| TMS(운송) | TMS | REST API, EDI | 5,000~50,000개 배송 | 5분마다(GPS) | 5분 |
| 소매 POS | POS | CSV, REST, SQL | 100만~5000만 건의 거래 | 시간별/저녁 배치 | 티+2시간 |
| IoT 콜드체인 센서 | IoT | MQTT, JSON, 프로토부프 | 1억 개 이상의 판독값 | 30초~5분마다 | 실시간(<1분) |
| EDI 파트너(EDIFACT) | EDI | EDIFACT, X12, GS1 XML | 메시지 100~10,000개 | 이벤트 기반/배치 | T+30분 |
| 날씨 API(OpenMeteo) | 외부 API | JSON REST | 통화 1,000~10,000개 | 시간별 | T+1시간 |
| AGEA(CAP 보조금) | 공공기관 | XML, CSV SFTP | 1~100개 파일 | 월간/계절 | T+24시간 |
| GPS 차량 추적기 | IoT/텔레매틱스 | REST, 웹소켓 | 1천만 개 이상의 GPS 포인트 | 30초마다 | <2분 |
주의: 레거시 ERP 및 "추출 창"
많은 농업 및 농식품 ERP는 기본 CDC를 지원하지 않습니다. 추출하는 유일한 방법인 경우가 많습니다. 데이터와 테이블을 잠그는 야간 일괄 작업 또는 SAP RFC/BAPI 인터페이스를 통해 엄격한 유지 관리 기간. 추출 기간을 미리 계획하고 예측하세요. 야간 배치 실패 시 복구 메커니즘.
식품 공급망의 ETL과 ELT
ETL 대 ELT 논쟁은 특히 데이터가 공존하는 식품 부문과 관련이 있습니다. 기존 배치 데이터(SAP ERP) 및 규제 요구 사항을 갖춘 고주파수(IoT 콜드 체인) 엄격함(추적성, FSMA 204).
| 표준 | 기존 ETL | 최신 ELT(dbt + 데이터 레이크) |
|---|---|---|
| 변환 | 업로드 전(스테이징 서버) | 데이터 웨어하우스(네이티브 SQL) |
| 확장성 | ETL 서버에 의해 제한됨 | DWH로 확장(Snowflake, BigQuery, DuckDB) |
| 원시 데이터 보존 | 그렇지 않은 경우가 많음(데이터가 이미 변환됨) | 예: 청동층은 원시 상태를 보존합니다. |
| 감사 가능성(FSMA) | 어려움: 흐릿한 계보 | 우수: 완전한 DBT 계보 그래프 |
| 숨어 있음 | 배치(T+1h, T+1d) | 마이크로 배치 또는 스트리밍(Flink/Spark) |
| 콜드체인 오류 | ETL 실패 시 데이터 손실 | Raw는 항상 저장됩니다. 변환만 재시도 |
| 필요한 팀 기술 | Java/Informatica/SSIS 개발자 | SQL + Python(dbt + Airflow) |
| 인프라 비용 | 상시 전용 서버 | 쿼리당 지불(Snowflake/BigQuery) |
현대 식품 공급망의 경우 권장되는 아키텍처는 다음과 같습니다. 메달리온 ELT:
# Architettura Medallion per Food Supply Chain
BRONZE LAYER (Raw, immutabile)
|-- Tutti i dati sorgente caricati as-is
|-- Partitioned by: source_system, ingestion_date
|-- Retention: 7 anni (compliance FSMA/EU)
|-- Formato: Parquet su S3/GCS o Delta Lake
|
v
SILVER LAYER (Cleansed, deduplicato)
|-- Dbt models: normalizzazione schemi eterogenei
|-- Deduplicazione lotti, SKU, location codes
|-- Type casting: date, temperature (C/F), pesi (kg/lb)
|-- NULL handling: sensori offline, ERP null fields
|-- PII masking: dati operatori, conducenti
|
v
GOLD LAYER (KPI pronti per business)
|-- Aggregazioni: fill rate, waste rate, OTIF per SKU
|-- Metriche cold chain: violazioni temperatura per lotto
|-- Dashboard BI: Power BI, Tableau, Grafana
|-- API ML: feature store per demand forecasting
|
v
DATA MART
|-- Retail mart: sell-through, shelf life at receipt
|-- Operations mart: OEE impianti, rendimenti produzione
|-- Finance mart: costo per lotto, margine per SKU
|-- Compliance mart: tracciabilita FSMA, recall readiness
식품 파이프라인 오케스트레이션을 위한 Apache Airflow
Apache Airflow는 복잡한 ETL 워크플로를 조정하기 위한 사실상의 표준입니다. 공급망에서 단일 DAG는 지연 시간, 종속성, 정책을 통해 수십 개의 소스를 조정해야 합니다. 다양한 재시도. 일일 파이프라인에 대한 전체 DAG를 살펴보겠습니다.
주요 DAG: 식품 공급망 일일 ETL
# food_supply_chain_dag.py
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.trigger_rule import TriggerRule
logger = logging.getLogger(__name__)
DEFAULT_ARGS = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
with DAG(
dag_id="food_supply_chain_daily_etl",
description="Pipeline ETL giornaliera: farm -> processing -> distribution -> retail",
default_args=DEFAULT_ARGS,
schedule_interval="0 2 * * *", # ogni notte alle 02:00 UTC
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["foodtech", "supply-chain", "etl", "production"],
max_active_runs=1,
doc_md="""
## Food Supply Chain Daily ETL
Estrae dati da: SAP ERP, MES impianti, WMS, TMS, POS retail.
Carica su Bronze S3, poi lancia dbt per Silver e Gold.
""",
) as dag:
# ----------------------------------------------------------------
# STEP 1: Health checks sulle sorgenti
# ----------------------------------------------------------------
check_sap_available = HttpSensor(
task_id="check_sap_api_available",
http_conn_id="sap_erp_api",
endpoint="/api/health",
timeout=30,
poke_interval=60,
mode="reschedule",
soft_fail=False,
)
check_mes_available = HttpSensor(
task_id="check_mes_api_available",
http_conn_id="mes_plant_api",
endpoint="/health",
timeout=30,
poke_interval=60,
mode="reschedule",
)
# ----------------------------------------------------------------
# STEP 2: Estrazione SAP ERP (lotti, ordini, inventario)
# ----------------------------------------------------------------
@task(task_id="extract_sap_erp", retries=3)
def extract_sap_erp(**context: Any) -> dict:
"""Estrae dati da SAP ERP via RFC/BAPI wrapper REST."""
from src.connectors.sap_connector import SAPConnector
execution_date = context["ds"] # 'YYYY-MM-DD'
sap = SAPConnector(conn_id="sap_erp_api")
# Estrai: lotti produzione del giorno precedente
batches = sap.get_production_batches(
date=execution_date,
plants=["PL001", "PL002", "PL003"],
)
# Estrai: movimenti magazzino
stock_movements = sap.get_stock_movements(
date=execution_date,
movement_types=["101", "261", "501", "601"],
)
# Estrai: ordini di vendita confermati
sales_orders = sap.get_sales_orders(
date=execution_date,
status=["CONF", "DLVR"],
)
# Salva su S3 Bronze layer
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/sap_erp/dt={execution_date}"
s3.load_string(
string_data=batches.to_json(orient="records", lines=True),
key=f"{prefix}/batches.jsonl",
bucket_name="food-data-lake",
replace=True,
)
s3.load_string(
string_data=stock_movements.to_json(orient="records", lines=True),
key=f"{prefix}/stock_movements.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"SAP extraction OK: %d batches, %d movements, %d orders",
len(batches),
len(stock_movements),
len(sales_orders),
)
return {
"batches_count": len(batches),
"movements_count": len(stock_movements),
"orders_count": len(sales_orders),
}
# ----------------------------------------------------------------
# STEP 3: Estrazione dati cold chain (IoT sensori temperatura)
# ----------------------------------------------------------------
@task(task_id="extract_cold_chain_iot")
def extract_cold_chain_iot(**context: Any) -> dict:
"""Estrae letture temperatura da InfluxDB (IoT cold chain)."""
from influxdb_client import InfluxDBClient
execution_date = context["ds"]
client = InfluxDBClient(
url="{{ var.value.influxdb_url }}",
token="{{ var.value.influxdb_token }}",
org="foodtech-org",
)
query_api = client.query_api()
# Query Flux: temperature readings per lotto di trasporto
flux_query = f"""
from(bucket: "cold-chain")
|> range(start: {execution_date}T00:00:00Z,
stop: {execution_date}T23:59:59Z)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "celsius")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> pivot(rowKey: ["_time", "lot_id"], columnKey: ["_field"], valueColumn: "_value")
"""
result = query_api.query_data_frame(flux_query)
# Calcola violazioni: temperatura fuori range per categoria prodotto
violations = result[
(result["celsius"] < result["min_temp_required"]) |
(result["celsius"] > result["max_temp_required"])
]
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/cold_chain/dt={execution_date}"
s3.load_string(
string_data=result.to_json(orient="records", lines=True),
key=f"{prefix}/temperature_readings.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"Cold chain extraction OK: %d readings, %d violations",
len(result),
len(violations),
)
if len(violations) > 0:
logger.warning(
"COLD CHAIN VIOLATIONS: %d eventi fuori temperatura!",
len(violations),
)
return {
"readings_count": len(result),
"violations_count": len(violations),
}
# ----------------------------------------------------------------
# STEP 4: Estrazione EDI dai partner retailer
# ----------------------------------------------------------------
@task(task_id="extract_edi_partners")
def extract_edi_partners(**context: Any) -> dict:
"""Processa file EDI EDIFACT da SFTP partners."""
from src.connectors.edi_connector import EDIFACTConnector
execution_date = context["ds"]
edi = EDIFACTConnector(sftp_conn_id="edi_sftp_partners")
# Download e parsing ORDERS (EDIFACT ORDERS D.01B)
orders = edi.process_orders(date=execution_date)
# Download e parsing DESADV (Despatch Advice)
despatch_advices = edi.process_desadv(date=execution_date)
# Download e parsing RECADV (Receiving Advice)
receiving_advices = edi.process_recadv(date=execution_date)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/edi_partners/dt={execution_date}"
s3.load_string(
string_data=orders.to_json(orient="records", lines=True),
key=f"{prefix}/orders.jsonl",
bucket_name="food-data-lake",
replace=True,
)
return {
"orders_count": len(orders),
"desadv_count": len(despatch_advices),
"recadv_count": len(receiving_advices),
}
# ----------------------------------------------------------------
# STEP 5: Estrazione POS retail
# ----------------------------------------------------------------
@task(task_id="extract_pos_data")
def extract_pos_data(**context: Any) -> dict:
"""Estrae dati vendite POS da database retailer partner."""
from src.connectors.retail_connector import RetailPOSConnector
execution_date = context["ds"]
pos = RetailPOSConnector(conn_id="retail_pos_db")
# Vendite per SKU/store con shelf life info
sales = pos.get_daily_sales(
date=execution_date,
include_promotions=True,
include_markdowns=True,
)
# Near-expiry: prodotti a <3 giorni scadenza ancora in shelf
near_expiry = pos.get_near_expiry_stock(
date=execution_date,
days_threshold=3,
)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/pos_retail/dt={execution_date}"
s3.load_string(
string_data=sales.to_json(orient="records", lines=True),
key=f"{prefix}/sales.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info("POS extraction OK: %d transazioni", len(sales))
return {
"sales_count": len(sales),
"near_expiry_skus": len(near_expiry),
}
# ----------------------------------------------------------------
# STEP 6: dbt run (Silver + Gold transformations)
# ----------------------------------------------------------------
dbt_run_silver = BashOperator(
task_id="dbt_run_silver_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:silver --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
dbt_run_gold = BashOperator(
task_id="dbt_run_gold_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:gold --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
# ----------------------------------------------------------------
# STEP 7: dbt test (data quality checks)
# ----------------------------------------------------------------
dbt_test = BashOperator(
task_id="dbt_test_all",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt test --select tag:silver tag:gold --target prod"
),
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# ----------------------------------------------------------------
# STEP 8: Great Expectations validation
# ----------------------------------------------------------------
@task(task_id="run_great_expectations_validation")
def run_ge_validation(**context: Any) -> dict:
"""Lancia Great Expectations per validare data quality cold chain."""
from great_expectations.data_context import get_context
context_ge = get_context(project_root_dir="/opt/great_expectations")
execution_date = context["ds"]
results = {}
suites_to_run = [
("cold_chain_temperature", "bronze_cold_chain_checkpoint"),
("lot_traceability", "silver_lots_checkpoint"),
("kpi_supply_chain", "gold_kpi_checkpoint"),
]
for suite_name, checkpoint_name in suites_to_run:
result = context_ge.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request={
"runtime_parameters": {"path": f"s3://food-data-lake/bronze/cold_chain/dt={execution_date}"},
"batch_identifiers": {"execution_date": execution_date},
},
)
results[suite_name] = result.success
failed = [name for name, ok in results.items() if not ok]
if failed:
raise ValueError(f"GE validation FAILED per suite: {failed}")
logger.info("Great Expectations: tutte le suite OK")
return results
# ----------------------------------------------------------------
# STEP 9: KPI alert (email se waste rate > soglia)
# ----------------------------------------------------------------
@task(task_id="check_kpi_alerts", trigger_rule=TriggerRule.ALL_SUCCESS)
def check_kpi_alerts(**context: Any) -> None:
"""Controlla KPI critici e invia alert se fuori soglia."""
from src.services.kpi_alert_service import KPIAlertService
execution_date = context["ds"]
alert_service = KPIAlertService(conn_id="snowflake_dw")
kpis = alert_service.get_daily_kpis(date=execution_date)
alerts_sent = []
# Waste rate > 5%: alert critico
if kpis["waste_rate_pct"] > 5.0:
alert_service.send_alert(
severity="HIGH",
message=f"Waste rate {kpis['waste_rate_pct']:.1f}% supera soglia 5%",
kpi="waste_rate",
value=kpis["waste_rate_pct"],
)
alerts_sent.append("waste_rate")
# OTIF < 95%: alert warning
if kpis["otif_pct"] < 95.0:
alert_service.send_alert(
severity="MEDIUM",
message=f"OTIF {kpis['otif_pct']:.1f}% sotto soglia 95%",
kpi="otif",
value=kpis["otif_pct"],
)
alerts_sent.append("otif")
# Cold chain violations > 0: alert immediato
if kpis["cold_chain_violations"] > 0:
alert_service.send_alert(
severity="CRITICAL",
message=f"{kpis['cold_chain_violations']} violazioni temperatura cold chain!",
kpi="cold_chain_violations",
value=kpis["cold_chain_violations"],
)
alerts_sent.append("cold_chain")
logger.info("KPI check completato. Alert inviati: %s", alerts_sent)
# ----------------------------------------------------------------
# DAG wiring: dipendenze tra task
# ----------------------------------------------------------------
extract_sap = extract_sap_erp()
extract_cold = extract_cold_chain_iot()
extract_edi = extract_edi_partners()
extract_pos = extract_pos_data()
ge_validation = run_ge_validation()
kpi_alerts = check_kpi_alerts()
# Health checks prima di tutto
[check_sap_available, check_mes_available] >> extract_sap
# Estrazione parallela di tutte le sorgenti
[extract_sap, extract_cold, extract_edi, extract_pos] >> dbt_run_silver
# Silver prima di Gold
dbt_run_silver >> dbt_run_gold
# Test dopo Gold
dbt_run_gold >> dbt_test
# GE validation e KPI alerts dopo i test
dbt_test >> ge_validation >> kpi_alerts
변환을 위한 dbt: 식품 KPI용 SQL 모델
dbt(데이터 구축 도구)는 Silver 및 Gold 레이어의 데이터를 변환하는 데 이상적인 도구입니다. 통합 테스트 및 문서화를 통해 식품 공급망의 주요 모델을 살펴보겠습니다.
Schema.yml: 문서화 및 테스트 DBT
# models/silver/schema.yml
version: 2
models:
- name: silver_lots
description: "Lotti di produzione normalizzati da SAP ERP"
config:
tags: ["silver", "lots", "traceability"]
materialized: incremental
incremental_strategy: merge
unique_key: lot_id
columns:
- name: lot_id
description: "Identificativo univoco lotto (GS1 SGTIN)"
tests:
- not_null
- unique
- name: production_date
tests:
- not_null
- name: expiry_date
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "expiry_date > production_date"
- name: plant_code
tests:
- not_null
- accepted_values:
values: ["PL001", "PL002", "PL003", "PL004"]
- name: temperature_class
tests:
- accepted_values:
values: ["ambient", "chilled", "frozen", "ultra_frozen"]
- name: net_weight_kg
tests:
- dbt_utils.expression_is_true:
expression: "net_weight_kg > 0 AND net_weight_kg < 50000"
- name: silver_shipments
description: "Spedizioni normalizzate da TMS e EDI DESADV"
config:
tags: ["silver", "logistics"]
materialized: incremental
unique_key: shipment_id
columns:
- name: shipment_id
tests: [not_null, unique]
- name: planned_delivery_date
tests: [not_null]
- name: actual_delivery_date
description: "NULL se non ancora consegnato"
- name: otif_flag
description: "True se consegnato On-Time and In-Full"
tests:
- accepted_values:
values: [true, false, null]
quote: false
- name: gold_kpi_supply_chain
description: "KPI giornalieri supply chain aggregati per SKU/plant/customer"
config:
tags: ["gold", "kpi", "dashboard"]
materialized: table
columns:
- name: kpi_date
tests: [not_null]
- name: waste_rate_pct
description: "% prodotto sprecato su prodotto totale"
tests:
- dbt_utils.expression_is_true:
expression: "waste_rate_pct >= 0 AND waste_rate_pct <= 100"
- name: otif_pct
tests:
- dbt_utils.expression_is_true:
expression: "otif_pct >= 0 AND otif_pct <= 100"
실버 모델: 생산 배치
-- models/silver/silver_lots.sql
-- {{ config(tags=["silver", "lots"], materialized="incremental") }}
WITH source_sap AS (
SELECT
lot_number AS lot_id,
material_number AS sku_code,
plant AS plant_code,
production_date AS production_date,
-- Normalizza data scadenza (SAP usa formato YYYYMMDD)
TO_DATE(CAST(expiry_date_sap AS VARCHAR), 'YYYYMMDD') AS expiry_date,
quantity_kg AS net_weight_kg,
-- Classi temperatura da tabella materiali SAP
CASE
WHEN temp_cond_sap = '0001' THEN 'ambient'
WHEN temp_cond_sap = '0002' THEN 'chilled'
WHEN temp_cond_sap = '0003' THEN 'frozen'
WHEN temp_cond_sap = '0004' THEN 'ultra_frozen'
ELSE 'unknown'
END AS temperature_class,
-- Range temperatura richiesto (gradi Celsius)
CASE
WHEN temp_cond_sap = '0001' THEN STRUCT(15.0 AS min_c, 25.0 AS max_c)
WHEN temp_cond_sap = '0002' THEN STRUCT(2.0 AS min_c, 8.0 AS max_c)
WHEN temp_cond_sap = '0003' THEN STRUCT(-25.0 AS min_c, -15.0 AS max_c)
WHEN temp_cond_sap = '0004' THEN STRUCT(-60.0 AS min_c, -30.0 AS max_c)
END AS temp_range,
batch_status AS lot_status,
_ingestion_timestamp AS ingested_at
FROM {{ source('bronze', 'sap_erp_batches') }}
WHERE lot_number IS NOT NULL
AND quantity_kg > 0
{% if is_incremental() %}
AND _ingestion_timestamp > (
SELECT MAX(ingested_at) FROM {{ this }}
)
{% endif %}
),
deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY lot_id
ORDER BY ingested_at DESC
) AS row_num
FROM source_sap
),
final AS (
SELECT
lot_id,
sku_code,
plant_code,
production_date,
expiry_date,
net_weight_kg,
temperature_class,
temp_range,
lot_status,
-- Shelf life totale in giorni
DATEDIFF('day', production_date, expiry_date) AS shelf_life_days,
-- Shelf life residua rispetto a oggi
DATEDIFF('day', CURRENT_DATE, expiry_date) AS shelf_life_remaining_days,
ingested_at
FROM deduplicated
WHERE row_num = 1
AND expiry_date > production_date
)
SELECT * FROM final
골드 모델: KPI 공급망
-- models/gold/gold_kpi_supply_chain.sql
-- {{ config(tags=["gold", "kpi"], materialized="table") }}
WITH lots AS (
SELECT * FROM {{ ref('silver_lots') }}
),
shipments AS (
SELECT * FROM {{ ref('silver_shipments') }}
),
pos_sales AS (
SELECT * FROM {{ ref('silver_pos_sales') }}
),
inventory AS (
SELECT * FROM {{ ref('silver_inventory_snapshot') }}
),
-- KPI 1: Waste Rate (% prodotto sprecato)
waste_kpi AS (
SELECT
production_date AS kpi_date,
plant_code,
-- Prodotto scaduto o smaltito come spreco
SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END) AS wasted_kg,
SUM(net_weight_kg) AS total_produced_kg,
ROUND(
100.0 * SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END)
/ NULLIF(SUM(net_weight_kg), 0),
2
) AS waste_rate_pct
FROM lots
GROUP BY 1, 2
),
-- KPI 2: OTIF (On-Time In-Full)
otif_kpi AS (
SELECT
DATE(planned_delivery_date) AS kpi_date,
customer_code,
COUNT(*) AS total_orders,
SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END) AS otif_orders,
ROUND(
100.0 * SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END)
/ NULLIF(COUNT(*), 0),
2
) AS otif_pct
FROM shipments
WHERE actual_delivery_date IS NOT NULL
GROUP BY 1, 2
),
-- KPI 3: Days of Supply (scorta in giorni)
dos_kpi AS (
SELECT
inv.snapshot_date AS kpi_date,
inv.sku_code,
inv.location_code,
inv.stock_qty_units,
COALESCE(avg_daily_sales.avg_daily_units, 0) AS avg_daily_demand,
ROUND(
inv.stock_qty_units
/ NULLIF(avg_daily_sales.avg_daily_units, 0),
1
) AS days_of_supply
FROM inventory inv
LEFT JOIN (
SELECT sku_code, location_code,
AVG(units_sold) AS avg_daily_units
FROM pos_sales
WHERE sale_date >= DATEADD('day', -30, CURRENT_DATE)
GROUP BY 1, 2
) avg_daily_sales
ON inv.sku_code = avg_daily_sales.sku_code
AND inv.location_code = avg_daily_sales.location_code
),
-- KPI 4: Shelf Life at Receipt (qualità al ricevimento)
slr_kpi AS (
SELECT
DATE(s.actual_delivery_date) AS kpi_date,
l.sku_code,
-- % shelf life residua al momento della consegna
ROUND(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0),
1
) AS shelf_life_pct_at_receipt,
-- Media per SKU
AVG(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0)
) OVER (PARTITION BY DATE(s.actual_delivery_date), l.sku_code)
AS avg_slr_pct
FROM shipments s
INNER JOIN lots l ON s.lot_id = l.lot_id
WHERE s.actual_delivery_date IS NOT NULL
)
-- Join finale per dashboard KPI unificato
SELECT
w.kpi_date,
w.plant_code,
w.waste_rate_pct,
w.wasted_kg,
w.total_produced_kg,
o.otif_pct,
o.total_orders,
d.days_of_supply,
d.avg_daily_demand,
s.avg_slr_pct AS shelf_life_at_receipt_pct,
-- Score complessivo supply chain (0-100)
ROUND(
(
(100.0 - COALESCE(w.waste_rate_pct, 0)) * 0.30 +
COALESCE(o.otif_pct, 0) * 0.40 +
LEAST(d.days_of_supply / 14.0 * 100, 100) * 0.15 +
COALESCE(s.avg_slr_pct, 0) * 0.15
),
1
) AS supply_chain_health_score
FROM waste_kpi w
LEFT JOIN otif_kpi o ON w.kpi_date = o.kpi_date
LEFT JOIN dos_kpi d ON w.kpi_date = d.kpi_date
LEFT JOIN slr_kpi s ON w.kpi_date = s.kpi_date
식품 데이터의 데이터 품질: 큰 기대
식품 산업에서 데이터 품질은 단순히 청결도의 문제가 아니라 필수 사항입니다. 규제. 온도, 유효 기간 또는 배치 코드에 대한 잘못된 데이터가 발생할 수 있습니다. FSMA 벌금, 매우 비싼 리콜 및 평판 손상. 큰 기대를 통해 다음을 수행할 수 있습니다. 선언적 검증 규칙을 정의하고 이를 Airflow 파이프라인에 통합합니다.
# great_expectations/expectations/cold_chain_temperature_suite.py
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
context = gx.get_context()
# Crea o aggiorna la suite di aspettative
suite = context.add_or_update_expectation_suite(
expectation_suite_name="cold_chain_temperature_suite"
)
validator = context.get_validator(
batch_request=BatchRequest(
datasource_name="s3_bronze_datasource",
data_connector_name="cold_chain_connector",
data_asset_name="temperature_readings",
),
expectation_suite_name="cold_chain_temperature_suite",
)
# REGOLA 1: Temperatura mai NULL per prodotti refrigerati/surgelati
validator.expect_column_values_to_not_be_null(
column="celsius",
mostly=0.99, # tolleranza 1% per sensori offline temporanei
meta={
"notes": "Sensori IoT cold chain: max 1% di letture mancanti",
"regulatory_reference": "EU Reg 37/2005 cold chain monitoring",
}
)
# REGOLA 2: Range temperatura prodotti refrigerati (2-8°C)
validator.expect_column_values_to_be_between(
column="celsius",
min_value=-30.0,
max_value=35.0,
meta={
"notes": "Range assoluto: include frozen (-25°C) e ambient (25°C)",
}
)
# REGOLA 3: Lot ID sempre presente e formato GS1 SGTIN valido
validator.expect_column_values_to_match_regex(
column="lot_id",
regex=r"^[0-9]{14}$", # GS1 GTIN-14 format
meta={"notes": "GS1 SGTIN-14 standard per tracciabilita alimentare"}
)
# REGOLA 4: Timestamp monotonicamente crescente per sensore
validator.expect_column_values_to_be_increasing(
column="reading_timestamp",
strictly=False, # allow duplicates (es. batch upload)
parse_strings_as_datetimes=True,
meta={"notes": "Verifico che i timestamp non siano retroattivi"}
)
# REGOLA 5: Numero di letture per lotto (almeno 1 ogni 30 min)
validator.expect_table_row_count_to_be_between(
min_value=48, # 24h * 2 letture/ora = 48 minimo
max_value=10000, # max ragionevole per 24h
meta={"notes": "Minimo 1 lettura ogni 30 min per cold chain attiva"}
)
# REGOLA 6: Distribuzione temperatura per classe prodotto
# Prodotti refrigerati: mediana deve essere 2-8°C
validator.expect_column_median_to_be_between(
column="celsius",
min_value=1.0,
max_value=9.0,
meta={
"notes": "Solo per lotti con temperature_class='chilled'",
"applies_to": "filtered_batches_chilled",
}
)
# Salva le aspettative
validator.save_expectation_suite(discard_failed_expectations=False)
# Checkpoint per Airflow
checkpoint = context.add_or_update_checkpoint(
name="bronze_cold_chain_checkpoint",
config_version=1,
template_name=None,
module_name="great_expectations.checkpoint",
class_name="Checkpoint",
run_name_template="%Y%m%d-%H%M%S-cold-chain",
expectation_suite_name="cold_chain_temperature_suite",
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "send_slack_notification_on_failure",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "{{ var.value.slack_webhook_cold_chain }}",
"notify_on": "failure",
"renderer": {
"module_name": "great_expectations.render.renderer",
"class_name": "SlackRenderer",
},
},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
],
)
식품 공급망 KPI 및 지표
데이터 기반 의사결정을 내리기 위한 올바른 KPI와 기초를 정의하세요. 측정항목은 다음과 같습니다. 공식, 부문 벤치마크 및 현실적인 목표를 통해 식품 공급망의 기본이 됩니다.
| KPI | 공식 | 업계 벤치마크 | 최고 수준의 목표 | 경고 임계값 |
|---|---|---|---|---|
| 완벽한 주문율 | 완벽한 주문 / 총 주문 * 100 | 85-92% | >95% | <90% |
| OTIF(정시 전체) | (신속하고 완벽한 배송) / 합계 * 100 | 88-94% | >97% | <92% |
| 폐기물 비율 % | Kg 낭비 / Kg 총 제품 * 100 | 3~8% | <2% | >5% |
| 재고 회전율 | 연간매출원가/평균재고 | 12-20x (신선한) | >25x(신선) | <10배 |
| 공급 일수(DOS) | 재고 있음 / 일일 평균 수요 | 3~7일(신선) | 2~4일 | >10일 |
| 채우기 비율 | 배송된 수량 / 주문된 수량 * 100 | 92-96% | >98% | <94% |
| 수령 시 보관 기간(SLR) | 남은 배송일수 / 총 유통기한 * 100 | 60-75% | >80% | <60% |
| 콜드체인 규정 준수 | 임시 위반이 없는 로트 / 총 로트 * 100 | 97-99% | >99.5% | <98% |
| 예측 정확도 | 1 - (MAE / 평균 수요) * 100 | 75-85% | >90% | <75% |
| 현금-현금 순환 | DIO + DSO - DPO(일) | 15~35일 | <15일 | >45일 |
공급망 상태 점수를 계산하는 방법
단일 종합 점수로 KPI에 가중치를 부여하면 경영진이 상태를 모니터링하는 데 도움이 됩니다. 단일 번호로 구성된 전체 공급망:
- OTIF: 가중치 40%(고객 만족도에 직접적인 영향)
- 폐기물 비율: 중량 30%(마진 및 지속 가능성에 영향)
- 수령 시 유통기한: 중량 15%(소비자 품질)
- 콜드체인 규정 준수: 중량 15%(식품 안전 및 규정 준수)
점수 > 90: 훌륭함 | 80-90: 좋음 | 70~80: 개선 예정 | <70: 크리티컬
실시간 vs 배치: 언제 어떤 아키텍처를 사용해야 할까요?
모든 것이 실시간일 필요는 없습니다. 스트리밍 아키텍처의 비용은 상당히 높습니다. 기존 배치 아키텍처보다 우수합니다. 식품 공급망에서 선택 허용 가능한 대기 시간과 운영 결과에 따라 달라질 수 있습니다.
| 사용 사례 | 건축학 | 숨어 있음 | 악기 | 동기 부여 |
|---|---|---|---|---|
| 콜드체인 모니터링(온도) | 실시간 스트리밍 | <1분 | 카프카 + 플링크 | 임시 위반 = 즉각적인 배치 손실 |
| 리콜 관리 | 실시간/이벤트 중심 | <5분 | Kafka + 경고 | FSMA: 2시간 이내에 배치 추적 |
| GPS 차량 추적 | 거의 실시간 | <2분 | MQTT + InfluxDB | 고객을 위한 업데이트된 ETA |
| 일일 KPI(OTIF, 낭비) | 일일 배치 | 티+2시간 | 기류 + DBT | 오전 업무 보고 |
| 수요 예측 | 일일/주간 배치 | T+6시간 | 공기 흐름 + MLflow | 생산 계획에는 실시간이 필요하지 않습니다. |
| 재고 스냅샷 | 마이크로 배치(15분마다) | 15분 | 공기 흐름 + 눈송이 | 창고 운영자: 충분한 가시성 |
| 주간 폐기물 분석 | 주간 배치 | T+24시간 | DBT + BI 도구 | 운영 결정이 아닌 전략적 결정 |
| FSMA 규정 준수 감사 | 주문형 배치 | 요청 시 | DBT + 데이터 저장소 | 일일 점검이 아닌 정기 점검 |
# Architettura Lambda per Food Supply Chain
# Speed layer (real-time) + Batch layer (accuracy)
# SPEED LAYER: Kafka + Apache Flink
# Processa eventi in milliseconds: cold chain, recall alerts
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("cold-chain-readings") \
.set_group_id("flink-cold-chain-consumer") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
cold_chain_stream = env.from_source(
kafka_source,
watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(
Duration.of_seconds(30)
),
source_name="cold-chain-kafka",
)
# Filtra violazioni temperatura in tempo reale
violations_stream = cold_chain_stream \
.map(lambda msg: json.loads(msg)) \
.filter(lambda r: (
r.get("temperature_class") == "chilled" and
(r["celsius"] < 2.0 or r["celsius"] > 8.0)
)) \
.map(lambda r: {
**r,
"violation_severity": "CRITICAL" if r["celsius"] > 12.0 else "WARNING",
"violation_detected_at": datetime.utcnow().isoformat(),
})
# Scrivi violations su Kafka alert topic
violations_stream.sink_to(
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("cold-chain-violations")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)
env.execute("food-cold-chain-monitoring")
레거시 시스템과의 통합
대부분의 농식품 회사는 레거시 시스템으로 운영됩니다: SAP R/3 '90, SFTP를 통한 EDIFACT EDI 파일, REST API가 없는 Oracle 데이터베이스. 이들의 통합 시스템에는 구체적인 접근 방식이 필요합니다.
SAP 커넥터: RFC 및 BAPI
# src/connectors/sap_connector.py
"""
Connettore SAP ERP via pyrfc (SAP RFC/BAPI wrapper).
Richiede: SAP NetWeaver RFC SDK + pyrfc library.
"""
import pyrfc
import pandas as pd
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class SAPConnectionConfig:
ashost: str
sysnr: str
client: str
user: str
passwd: str
lang: str = "IT"
class SAPConnector:
"""Wrapper per SAP RFC/BAPI calls per dati supply chain."""
def __init__(self, config: SAPConnectionConfig):
self._config = config
self._conn: Optional[pyrfc.Connection] = None
def __enter__(self):
self._conn = pyrfc.Connection(
ashost=self._config.ashost,
sysnr=self._config.sysnr,
client=self._config.client,
user=self._config.user,
passwd=self._config.passwd,
lang=self._config.lang,
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn:
self._conn.close()
def get_production_batches(
self,
date_from: str,
date_to: str,
plants: list[str],
) -> pd.DataFrame:
"""
Estrae lotti di produzione da SAP tramite BAPI_BATCH_GET_DETAIL.
Args:
date_from: Data inizio formato YYYYMMDD
date_to: Data fine formato YYYYMMDD
plants: Lista codici stabilimento SAP
"""
all_batches = []
for plant in plants:
try:
# Chiama BAPI SAP per lista lotti
result = self._conn.call(
"BAPI_BATCH_GET_DETAIL",
PLANT=plant,
DATE_FROM=date_from,
DATE_TO=date_to,
BATCH_STATUS="", # tutti gli stati
)
batches = result.get("BATCH_DETAIL_LIST", [])
logger.info(
"SAP plant %s: %d lotti estratti",
plant,
len(batches),
)
for batch in batches:
all_batches.append({
"lot_id": f"{batch['MATNR']}-{batch['CHARG']}",
"material_number": batch["MATNR"].strip(),
"lot_number": batch["CHARG"].strip(),
"plant": plant,
"production_date": batch.get("HSDAT"),
"expiry_date_sap": batch.get("VFDAT"),
"batch_status": batch.get("ZUSTD"),
"quantity_kg": float(batch.get("CLABS", 0)),
"temp_cond_sap": batch.get("MHDRZ", "0001"),
"_source": "SAP_ERP",
"_extracted_at": pd.Timestamp.utcnow().isoformat(),
})
except pyrfc.ABAPApplicationError as e:
logger.error(
"SAP BAPI error per plant %s: %s",
plant,
str(e),
)
# Non fallisce tutta l'estrazione per un singolo plant
continue
return pd.DataFrame(all_batches)
def get_stock_movements(
self,
date_from: str,
date_to: str,
movement_types: list[str],
) -> pd.DataFrame:
"""
Estrae movimenti di magazzino SAP via MB51 BAPI equivalent.
movement_types: ['101'=GR, '261'=GI to production, '601'=GI to customer]
"""
result = self._conn.call(
"BAPI_GOODSMVT_GETDETAIL",
GOODSMVT_DATE_FROM=date_from,
GOODSMVT_DATE_TO=date_to,
)
movements = []
for item in result.get("GOODSMVT_ITEMS", []):
if item["BWART"] in movement_types:
movements.append({
"movement_id": f"{item['MBLNR']}-{item['ZEILE']}",
"material": item["MATNR"].strip(),
"plant": item["WERKS"],
"movement_type": item["BWART"],
"quantity": float(item.get("MENGE", 0)),
"unit": item.get("MEINS", "KG"),
"lot_number": item.get("CHARG", ""),
"posting_date": item["BUDAT"],
"_source": "SAP_MOVEMENTS",
})
return pd.DataFrame(movements)
레거시 데이터베이스에서 스트리밍하기 위해 Debezium을 사용하는 CDC
# debezium-config/oracle-erp-connector.json
# Configurazione Debezium per CDC da Oracle EBS (ERP legacy)
{
"name": "oracle-erp-food-supply-chain",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "oracle-erp.internal",
"database.port": "1521",
"database.user": "debezium_cdc",
"database.password": "${ORACLE_CDC_PASSWORD}",
"database.dbname": "FOOD_ERP",
"database.pdb.name": "FOOD_PDB",
"database.server.name": "oracle-erp",
"table.include.list": "FOOD_ERP.INV_TRANSACTIONS,FOOD_ERP.MTL_LOT_NUMBERS,FOOD_ERP.WSH_DELIVERY_DETAILS",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.oracle.food_erp",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.oracle.food_erp",
"transforms": "route,addFields",
"transforms.route.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.addFields.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addFields.static.field": "_cdc_source",
"transforms.addFields.static.value": "oracle-erp",
"include.schema.changes": "true",
"snapshot.mode": "initial_only",
"decimal.handling.mode": "double",
"time.precision.mode": "connect",
"tombstones.on.delete": "true",
"topic.prefix": "food-cdc"
}
}
소매 주문용 EDIFACT EDI 파서
# src/connectors/edi_connector.py
"""
Parser EDI EDIFACT per messaggi ORDERS, DESADV, RECADV (GS1 subset).
Usa pydifact library per parsing EDIFACT messages.
"""
import pandas as pd
from pydifact.segmentcollection import SegmentCollection
from pydifact.segments import Segment
from pathlib import Path
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def parse_edifact_orders(file_path: str | Path) -> pd.DataFrame:
"""
Parsa un file EDI EDIFACT ORDERS D.01B (GS1 EANCOM).
Restituisce DataFrame con righe ordine normalizzate.
Struttura messaggio ORDERS EDIFACT:
UNB (Interchange header)
UNH (Message header)
BGM (Order message identifier)
DTM (Date/time)
NAD+BY (Buyer = retailer)
NAD+SU (Supplier = nostro sistema)
LIN (Line item)
+QTY (Quantity ordered)
+DTM (Requested delivery date)
+PIA (Product code GTIN)
UNT (Message trailer)
"""
file_content = Path(file_path).read_text(encoding="latin-1")
collection = SegmentCollection.from_str(file_content)
orders = []
current_order: dict = {}
current_line: dict = {}
for segment in collection.segments:
tag = segment.tag
if tag == "BGM":
# Inizio nuovo ordine
current_order = {
"order_number": segment.elements[1] if len(segment.elements) > 1 else None,
"document_type": segment.elements[0][0] if segment.elements else None,
"lines": [],
}
elif tag == "DTM" and current_order:
# Data ordine o data consegna richiesta
qualifier = segment.elements[0][0]
date_str = segment.elements[0][1]
date_fmt = segment.elements[0][2]
if qualifier == "137": # Document date
try:
current_order["order_date"] = datetime.strptime(
date_str, "%Y%m%d"
).date().isoformat()
except ValueError:
logger.warning("DTM parse error: %s", date_str)
elif qualifier == "2": # Requested delivery date
try:
current_order["requested_delivery_date"] = datetime.strptime(
date_str, "%Y%m%d"
).date().isoformat()
except ValueError:
pass
elif tag == "NAD":
qualifier = segment.elements[0]
party_id = segment.elements[1][0] if len(segment.elements) > 1 else None
party_name = segment.elements[3] if len(segment.elements) > 3 else None
if qualifier == "BY": # Buyer (retailer)
current_order["buyer_gln"] = party_id
current_order["buyer_name"] = party_name
elif qualifier == "SU": # Supplier
current_order["supplier_gln"] = party_id
elif tag == "LIN":
# Salva riga precedente se esiste
if current_line and current_order:
current_order["lines"].append(current_line)
current_line = {
"line_number": segment.elements[0],
"gtin": None,
"qty_ordered": None,
"delivery_date": None,
}
elif tag == "PIA" and current_line:
# Product identification: GTIN-13 o GTIN-14
if len(segment.elements) > 1:
qualifier = segment.elements[1][1] if len(segment.elements[1]) > 1 else ""
if qualifier in ["SRV", "GE1"]: # GTIN qualifiers
current_line["gtin"] = segment.elements[1][0]
elif tag == "QTY" and current_line:
qualifier = segment.elements[0][0]
if qualifier == "21": # Ordered quantity
current_line["qty_ordered"] = float(segment.elements[0][1])
current_line["qty_unit"] = segment.elements[0][2] if len(segment.elements[0]) > 2 else "PCE"
# Normalizza output
for order in orders:
for line in order.get("lines", []):
orders.append({
"order_number": order.get("order_number"),
"order_date": order.get("order_date"),
"requested_delivery_date": order.get("requested_delivery_date"),
"buyer_gln": order.get("buyer_gln"),
"buyer_name": order.get("buyer_name"),
"supplier_gln": order.get("supplier_gln"),
"line_number": line.get("line_number"),
"gtin": line.get("gtin"),
"qty_ordered": line.get("qty_ordered"),
"qty_unit": line.get("qty_unit"),
"_source": "EDI_EDIFACT_ORDERS",
"_parsed_at": datetime.utcnow().isoformat(),
})
return pd.DataFrame(orders)
사례 연구: 500개 농장이 있는 농업 협동조합
이러한 패턴이 실제 사례(대규모 농업 협동조합)에 어떻게 적용되는지 살펴보겠습니다. 500개의 관련 농업 회사, 3개의 가공 및 가공 센터를 보유한 북부 이탈리아 대규모 소매업과 현지 시장 간 200개 매장이 있습니다.
협력 프로필
- 연결된 팜 500개: 과일 및 야채, 시리얼, 콩과 식물 - 8,000개 센서의 IoT MQTT 데이터
- 3개의 머시닝 센터: SAP R/3 레거시(2005), 맞춤형 MES, LIMS
- 200개 판매 지점: 이기종 POS(3개 공급업체), 15개 소매업체가 있는 EDI EDIFACT
- 시작 상황: Excel 시트, 고립된 데이터, 엔드투엔드 가시성 없음
- 목표: 폐기물 40% 감소, OTIF 95% 개선, FSMA 준수
1단계: 평가 및 소스 인벤토리(1~4주)
# Inventario sorgenti dati: output assessment
SOURCE_INVENTORY = {
"farm_iot": {
"count": 8000,
"protocol": "MQTT v3.1.1",
"broker": "Mosquitto on-premise",
"format": "JSON custom",
"issues": [
"Schema non standardizzato tra vendor IoT diversi",
"15% sensori con timestamp errati (clock drift)",
"3 farm su 500 senza connettivita stabile (4G intermittente)",
],
"recommended_fix": "Normalizzazione schema + edge buffer (Mosquitto local)",
},
"sap_erp": {
"version": "SAP R/3 4.7 (anno 2005)",
"tables_relevant": ["MCHA", "MCH1", "MSEG", "MKPF", "VBAK", "VBAP"],
"issues": [
"No REST API: solo RFC/BAPI",
"Batch extraction window: 01:00-03:00 (finestra ristretta)",
"Codifica caratteri: ISO-8859-1 (non UTF-8)",
"Date in formato SAP (YYYYMMDD come intero)",
],
"recommended_fix": "pyrfc + ETL incremental con delta timestamp",
},
"edi_partners": {
"partners": 15,
"standards": ["EANCOM D.01B", "X12 850/856", "GS1 XML 3.1"],
"transport": "SFTP (12 partner), AS2 (3 partner)",
"issues": [
"3 retailer usano EDIFACT non-standard (varianti proprietarie)",
"Mancanza di RECADV da 5 retailer: no conferma ricezione",
],
},
"pos_retail": {
"vendors": ["Cassa Easy", "NCR Aloha", "Custom PHP legacy"],
"issues": [
"3 formati CSV diversi",
"Nessun campo shelf_life nei dati POS",
"Granularità: ora (non transazione)",
],
},
}
2단계: 기술 스택 구현(5~16주)
# Stack tecnico implementato per la cooperativa
INFRASTRUCTURE = {
"data_lake": "AWS S3 (Medallion: Bronze/Silver/Gold)",
"compute": "AWS EMR Serverless (Spark per bulk load iniziale)",
"warehouse": "Snowflake (pay-per-query, snowpark per ML features)",
"orchestration": "Apache Airflow 2.9 su AWS MWAA (managed)",
"transformation": "dbt Cloud (Team plan: 8 developers)",
"data_quality": "Great Expectations + Slack alerts",
"streaming": "Apache Kafka su MSK (managed) per cold chain",
"monitoring": "Grafana + Prometheus (Airflow metrics, DWH queries)",
"ci_cd": "GitHub Actions (dbt CI: test su ogni PR)",
"secret_management": "AWS Secrets Manager",
}
TIMELINE = [
{"week": "1-4", "milestone": "Assessment sorgenti + architettura design"},
{"week": "5-7", "milestone": "Bronze layer: connettori SAP + IoT MQTT"},
{"week": "8-10", "milestone": "Bronze layer: EDI + POS + LIMS"},
{"week": "11-12","milestone": "Silver layer: dbt models + data quality"},
{"week": "13-14","milestone": "Gold layer: KPI dashboard + alerting"},
{"week": "15-16","milestone": "UAT + go-live + team training"},
}
TEAM = {
"data_engineers": 3,
"dbt_developers": 2,
"sap_consultant": 1, # part-time per RFC/BAPI
"project_manager": 1,
}
6개월 후 얻은 결과
| KPI | 이전(기준) | 6개월 후 | 개선 |
|---|---|---|---|
| 폐기물 비율 | 7.2% | 4.3% | -40% 폐기물 (-2.9pp) |
| 오티프 | 88% | 94% | +6pp |
| 수령 시 유통기한 | 58% | 74% | +16pp |
| 콜드체인 규정 준수 | 94%(추정) | 99.1%(측정) | 실제 가시성을 갖춘 +5.1pp |
| 해결 시간 회상 | 48~72시간 | 2~4시간 | -93% 응답 시간 |
| 이사 매뉴얼 보고서 | 주 3~4시간 | 자동 대시보드 | -100% 수작업 |
| 예측 정확도 질문 | 71% | 86% | +15pp |
| 공급 일수(신선) | 8~12일 | 4~6일 | -50% 초과 재고 |
사례 연구에서 얻은 교훈
- SAP RFC/BAPI는 생각보다 느립니다. 야간 배치 창 가장 심각한 병목 현상이었습니다. 해결책은 IT와 기간을 협상하는 것이었습니다. 더 넓게 확장하고 증분 추출을 사용하여 볼륨을 최소화합니다.
- EDI는 결코 표준이 아닙니다. 15개의 소매업체 파트너 중 11개에 변형이 있었습니다. EANCOM 표준의 소유자입니다. EDIFACT 파서를 완료하는 데 2주가 걸렸습니다. 각 파트너의 사양에 대한 교정.
- 데이터 품질은 단지 기술적인 문제가 아니라 문화적인 문제입니다. 30%의 운영자가 SAP에 잘못된 수동 입력으로 인해 품질 문제가 발생했습니다. 기술적인 해결책(Great Expectations)은 문제를 부각시켰지만 해결책은 교육 및 변경 관리가 필요합니다.
- 브론즈 레이어 모두 저장: 6개월 동안 세 번이나 필요했다 dbt 모델의 버그를 수정하려면 Silver 변환을 다시 실행하세요. 덕분에 브론즈 레이어는 변경할 수 없으며 데이터가 손실되지 않습니다.
- dbt CI/CD는 작업 방식을 변화시킵니다. 모든 모델을 매번 테스트 PR은 첫 달에 약 4건의 생산 사고를 예방했습니다.
FoodTech 시리즈 전체 요약
이는 업계를 변화시키는 기술을 통과하는 데 오랜 시간이 걸렸습니다. 음식. 10개 기사로 구성된 이 시리즈에서 우리가 함께 구축한 내용은 다음과 같습니다.
| # | Articolo | 학습한 주요 개념 | 기술 스택 |
|---|---|---|---|
| 00 | 정밀 농업을 위한 IoT 파이프라인 | MQTT 브로커, 필드 센서, 데이터 레이크 수집, QoS 수준 | Python, MQTT, 모기, MinIO |
| 01 | 농작물 질병 감지를 위한 ML Edge | TensorFlow Lite 양자화, 에지 추론, ARM에서의 모델 배포 | TFLite, 라즈베리 파이, Python, OpenCV |
| 02 | AgriTech용 위성 및 기상 API | Sentinel-2 NDVI, Planet Labs API, ML을 위한 날씨 특성 엔지니어링 | 센티넬 허브, OpenMeteo, rasterio, GeoPandas |
| 03 | 식품 추적 시스템 | 프라이빗 블록체인(Hyperledger), RFID EPCIS, GS1 표준 | 하이퍼레저 패브릭, RFID, EPCIS, Python |
| 04 | 품질 관리를 위한 컴퓨터 비전 | 제품 데이터 세트에 대한 YOLO 미세 조정, 산업 파이프라인 추론 | PyTorch, YOLO, FastAPI, OpenCV |
| 05 | FSMA 204 자동화 | KDE, CTE 필수 식품, 로트 추적성, 리콜 자동화 | Python, FastAPI, PostgreSQL, 경고 |
| 06 | 수직 농업 자동화 | 환경 매개변수 제어, 로봇 스케줄링, 농장용 REST API | 파이썬, FastAPI, InfluxDB, MQTT |
| 07 | 폐기물 감량을 위한 수요예측 | LSTM, Prophet, 음식 계절성 특성 엔지니어링, 백테스팅 | PyTorch, Prophet, MLflow, 팬더 |
| 08 | 농장 IoT를 위한 실시간 대시보드 | 각도 신호, Grafana 프로비저닝, InfluxDB 쿼리, 경고 | Angular 21, Grafana, InfluxDB, WebSocket |
| 09 | 농장에서 소매업체까지 공급망 ETL | 메달리온 아키텍처, Airflow DAG, dbt 모델, 큰 기대, KPI | 공기 흐름, dbt, Snowflake, GE, SAP RFC, EDI |
더 자세히 알고 싶은 사람들을 위한 로드맵
전체 시리즈를 읽었다면 이제 실제 FoodTech 프로젝트를 다루기 위한 탄탄한 기반을 갖추게 되었습니다. 다음은 각 도메인을 더 깊이 탐구하기 위한 구조화된 로드맵입니다.
추천 심화과정
- IoT 및 엣지 컴퓨팅: MQTT 5.0 연구(이번에 사용된 3.1.1과 비교) 문서), 관리형 솔루션의 경우 Azure IoT Hub 또는 AWS IoT Core, 데이터의 경우 Apache NiFi 이기종 IoT 소스로부터의 수집.
- 프로덕션 중인 ML: 이 블로그의 MLOps 시리즈에서 방법을 알려드립니다. 버전 모델을 사용하고 MLflow 및 Seldon을 사용하여 A/B 테스트를 수행하고 프로덕션 드리프트를 모니터링합니다.
- 데이터 아키텍처: Data Lakehouse(데이터 및 AI 시리즈)에 대해 자세히 알아보세요. 비즈니스) Snowflake, Databricks 및 Apache Iceberg가 어떻게 비교되는지 이해합니다. 공급망 분석 플랫폼.
- 공급망을 위한 LLM 및 생성 AI: 대규모 언어 모델은 다음을 수행할 수 있습니다. 규정 준수 문서에서 구조화된 정보 추출, 공급업체 계약 분석 자동 보고서를 생성합니다. 블로그의 AI 엔지니어링 시리즈에서는 RAG 및 미세 조정을 다룹니다.
- 연구해야 할 업계 표준: GS1 글로벌 표준(GTIN, GLN, SSCC, EPCIS), FSMA 204(FDA), EU Reg. 178/2002(식품 추적성), ISO 22000(HACCP).
- 관련 인증: AWS 공인 데이터 엔지니어, dbt 공인 개발자, 천문학자 인증 Apache Airflow 개발자, Databricks 데이터 엔지니어 어소시에이트.
식품 ETL에 대한 모범 사례 및 안티 패턴
모범 사례
- 브론즈 레이어 불변: 원시 데이터를 수정하지 마십시오. 버그를 발견한 경우 Silver dbt 모델에서는 모델을 수정하고 다시 실행합니다. Bronze는 그대로 유지됩니다. 이는 FSMA 및 EU Reg의 기본입니다. 178/2002 감사 추적.
-
Airflow DAG의 멱등성: 각 작업은 다시 실행될 수 있어야 합니다.
부작용 없이. 미국
replace=TrueS3 업로드의 경우,MERGE대신에INSERT데이터베이스의 경우 및 증분 DBT 모델의 고유 키입니다. -
각 작업에 대한 명시적인 시간 초과: SAP RFC와 같은 레거시 시스템은
무기한 차단합니다. 항상 설정
execution_timeout=timedelta(hours=2)각 Airflow 작업에 대해 - 수집 지연 모니터링: 02:00에 시작하는 파이프라인 그러나 06:00(운영 교대 전)까지 완료되지 않아 쓸모가 없습니다. 모니터 i 완료 시간을 확인하고 Airflow에 SLA 알림을 설정하세요.
- 파트너를 위한 EDI 체계 분리: 전용 EDI 파서를 유지 관리 각 소매업체 파트너별로. 독점 변형으로 인해 파서가 불가능해집니다. 독특한 보편적.
- 문서화된 계보 날짜: dbt는 자동으로 그래프를 생성합니다. 혈통. 이를 사용하여 "이 OTIF KPI는 어디에서 왔습니까?"라는 감사 질문에 답하십시오.
피해야 할 안티패턴
- 추출 계층의 변환(기본 ETL): 변환 브론즈 레이어에 데이터를 저장하기 전에 데이터를 사용할 수 있는 가능성이 상실됨을 의미합니다. 백필을 수행하거나 오류를 수정하세요. 항상 원시를 로드하고 나중에 dbt로 변환하세요.
- 모든 것을 위한 모놀리식 DAG: 50개 이상의 작업이 포함된 단일 DAG는 디버깅이 불가능합니다. 도메인(SAP DAG, IoT DAG, EDI DAG)별로 나누어 사용 DAG 간 종속성을 위한 Airflow TriggerDagRunOperator입니다.
- 시간대 무시: 시칠리아의 한 농장, 유통 센터 밀라노에 있는 소매업체와 독일에 있는 소매점은 서로 다른 시간대를 사용합니다. 항상 UTC로 변환 Bronze 레이어에 있으며 모든 곳에서 시간대 인식 타임스탬프를 사용합니다.
- DAG에 하드 코딩된 사용자 인증 정보: 절대로 SAP 비밀번호나 API 토큰을 입력하지 마세요. Airflow 코드에서. Airflow 연결 및 변수 또는 AWS Secrets Manager를 사용하십시오.
- Bronze의 데이터 품질 건너뛰기: Gold 레이어만 검증하고 너무 많은 레이어를 검증합니다. 늦었어. 문제는 가능한 한 빨리 식별되어야 합니다. 잘못된 온도 판독값 브론즈에서는 골드에서 거짓 긍정 리콜이 됩니다.
- 실행자로서의 Airflow Scheduler: 작업이 실행되어서는 안 됩니다. Airflow 스케줄러 자체에서. 항상 별도의 Executor(CeleryExecutor, KubernetesExecutor)를 사용하여 과도한 워크로드를 격리합니다.
결론: 길의 끝, 또 다른 길의 시작
푸드테크 시리즈가 끝났습니다. 10개의 기사에서 우리는 작품을 만들었습니다. 식품 공급망의 비전을 하나씩 완전히 데이터 기반: 실시간으로 데이터를 전송하는 현장 센서, 질병을 감지하는 ML 모델 인간의 눈 앞에, 불변의 추적성을 위한 블록체인, 품질 관리, 자동화된 FSMA 규정 준수, API를 통해 제어되는 수직 농장, 낭비를 줄이는 수요 예측, 관리를 위한 실시간 대시보드, 이 최신 기사는 이 모든 것을 하나로 묶는 ETL 파이프라인입니다.
농식품 부문은 예측할 수 없는 계절성, 부패하기 쉬운 제품, 엄격한 규정, 수십 년 동안 통합된 레거시 시스템, 공급망 수백 명의 플레이어가 있는 글로벌. 그러나 바로 이러한 이유로 가치를 창출할 수 있는 기회는 기술로 그들은 거대합니다. 협동조합에서 낭비율을 7%에서 4%로 줄입니다. 500개 농장의 경우 이는 단순한 운영 개선이 아닙니다. 돈, 식량, 환경에 미치는 영향입니다.
2030년의 식품 공급망은 다음과 같이 정의됩니다. 실시간 데이터 (각 로트는 파종부터 포크까지 추적됨) 예측 AI (과잉재고 제로, 품절 제로, 낭비 제로), 자동화된 규정 준수 (2주가 아닌 2시간 안에 FSMA 심사) e 측정 가능한 지속 가능성 (로트당 탄소 배출량, 헥타르당 물 사용량).
기술은 다 있습니다. 이 기사에서 배운 ETL 패턴 — Airflow 오케스트레이션용, 변환용 DBT, 데이터 품질에 대한 큰 기대, 메달리온 구조를 위한 아키텍처 - 모든 농식품 회사에 즉시 적용 가능 중간 또는 큰 크기.
다른 관련 블로그 시리즈 살펴보기
- 데이터 & AI 비즈니스(시리즈 14): 데이터 웨어하우스를 탐구하다 (Snowflake, Databricks, BigQuery), dbt 및 Airbyte를 사용한 ETL/ELT, MLOps 엔터프라이즈 및 데이터 거버넌스 — 이 FoodTech 시리즈를 자연스럽게 보완합니다.
- MLOps(시리즈 5): 주문형 ML 모델을 프로덕션으로 가져오는 방법 기사 01, 04 및 07에서 구축한 예측 및 컴퓨터 비전.
- AI 엔지니어링/RAG(시리즈 6): LLM을 사용하여 쿼리하는 방법 규정 준수 문서, HACCP 인증서 분석 및 자동 보고서 생성 공급망을 위해.
- PostgreSQL AI(10 시리즈): 의미론적 검색을 위해 pgVector를 사용하는 방법 제품 설명, 레시피 및 식품 기술 사양에 대한 정보 - 다음과 같은 경우에 유용합니다. 식품 부문의 제품 카탈로그 관리.
- EnergyTech(시리즈): 다양한 IoT 기술과 데이터 파이프라인 FoodTech 시리즈는 에너지 모니터링에 사용되는 시리즈와 동일합니다. 산업 식품 공장의.
FoodTech 시리즈를 끝까지 시청해 주셔서 감사합니다. 즐거운 코딩을 즐기시고 맛있게 드세요.







