Food Supply Chain: ETL Patterns from Farm to Retailer
Every year, roughly one third of all food produced for human consumption is lost or wasted along the supply chain: a value exceeding $473 billion in the US alone, translating into one billion meals discarded every day worldwide. Paradoxically, a significant share of this waste does not stem from climate or biological factors, but from a data problem: insufficient visibility, unintegrated legacy systems, KPIs calculated with delays, and decisions made on incomplete or inaccurate information.
The global food supply chain management market is valued at $182.81 billion in 2025 and will grow to $359.39 billion by 2034 (CAGR 7.8%). Yet 48% of food suppliers still use spreadsheets for daily operations, and 60% report repetitive manual activities that consume valuable time. The gap between what technology enables and what agri-food companies actually implement is enormous.
This article — the last in the FoodTech series — addresses the problem at its root: how to build a robust, scalable ETL/ELT pipeline to integrate heterogeneous data from farm to retailer, orchestrate workflows with Apache Airflow, transform data with dbt, ensure quality with Great Expectations, and measure performance with food-industry-specific supply chain KPIs.
What You Will Learn
- End-to-end data architecture for the food supply chain (farm → processing → distribution → retail)
- Heterogeneous data sources: ERP (SAP/Oracle), MES, LIMS, WMS, TMS, POS, IoT, EDI
- ETL vs ELT: when to choose which approach for food data
- Complete Apache Airflow DAGs for food pipelines with error handling and retry logic
- dbt models for supply chain KPIs: waste rate, OTIF, fill rate, days of supply
- Data quality with Great Expectations for cold chain and expiration dates
- Legacy system integration: SAP RFC/BAPI, EDI EDIFACT, CDC with Debezium
- Case study: agricultural cooperative with 500 farms, 3 processing plants, 200 retail points
- Full recap of the entire FoodTech series (10 articles)
The FoodTech Series: Where We Stand
This is the tenth and final installment of the FoodTech series, which has explored the key digital technologies applicable across the entire food supply chain: from field data collection all the way to retail. Here is the full picture:
| # | Title | Technologies | Level |
|---|---|---|---|
| 00 | IoT Pipeline for Precision Agriculture with Python and MQTT | IoT, MQTT, Python, Data Lake | Advanced |
| 01 | Edge ML for Crop Disease Detection: TensorFlow Lite on Raspberry Pi | TensorFlow Lite, Edge ML, Raspberry Pi | Advanced |
| 02 | Satellite and Weather APIs for AgriTech: Predictive Data | Sentinel, Planet, Weather API, NDVI | Advanced |
| 03 | Food Traceability System: Blockchain, RFID and IoT | Blockchain, RFID, IoT, Compliance | Advanced |
| 04 | Computer Vision for Food Quality Control with PyTorch YOLO | YOLO, PyTorch, Computer Vision | Advanced |
| 05 | FSMA 204 Automation: Tracking, Alerts and Recall via Python | FSMA, Compliance, Python, Recall | Advanced |
| 06 | Vertical Farming Automation: Robotic Control via API | Robotics, API, Automation | Advanced |
| 07 | Demand Forecasting for Waste Reduction: ML Time-Series | LSTM, Prophet, Time-Series ML | Advanced |
| 08 | Real-Time Dashboard for Farm IoT with Angular and Grafana | Angular, Grafana, InfluxDB, Real-Time | Advanced |
| 09 | Food Supply Chain: ETL Patterns from Farm to Retailer <-- You are here | Airflow, dbt, Great Expectations, ETL | Advanced |
Why this article closes the series
Previous articles tackled individual nodes of the chain: IoT data collection in the field, edge ML models, blockchain traceability, computer vision for quality, FSMA compliance, vertical farming, demand forecasting and real-time dashboards. This article connects them all: the ETL pipeline is the backbone that integrates every data source, transforms raw information into actionable KPIs and brings end-to-end visibility from the farm to the supermarket shelf.
Food Supply Chain Data Architecture
The food supply chain is composed of distinct nodes, each with its own information systems, data formats and update frequencies. An ETL pipeline must handle this heterogeneity robustly.
# Logical architecture of the food supply chain
# (data generated at each step)
FARM SYSTEMS
|-- IoT Sensors (MQTT): soil temp, humidity, pH, EC
|-- Agricultural ERP: planned crops, input costs
|-- Pesticide/treatment registry (LIMS)
|-- GPS geo-data from farm machinery
|-- Weather APIs (OpenMeteo, Copernicus)
|
v
PROCESSING PLANTS
|-- MES (Manufacturing Execution System): production orders, yields
|-- LIMS: chemical, microbiological, allergen analyses
|-- ERP SAP/Oracle: bill of materials, lots, traceability
|-- WMS (Warehouse Management): raw/finished goods stock
|-- Cold chain sensors: temperature/humidity during processing
|
v
DISTRIBUTION CENTERS
|-- WMS: stock management, picking, slotting
|-- TMS (Transport Management): shipments, vehicles, routing
|-- Cold chain sensors: temperature loggers in refrigerated trucks
|-- EDI: orders from retailers (EDIFACT ORDERS, DESADV)
|-- GPS trackers: vehicle positions in real time
|
v
RETAIL POS
|-- POS data: sales by SKU, store, hour
|-- Shelf life management: products near expiration
|-- EDI: receipt confirmations (RECADV), invoices (INVOIC)
|-- Store inventory: available stock per point of sale
|
v
CONSUMER (indirect signals)
|-- Loyalty app: purchases per customer, frequency
|-- Product reviews: perceived quality
|-- Social sentiment: consumption trends
Each node generates data in different formats (relational SQL, flat CSV/EDI files, JSON from REST APIs, binary MQTT messages, legacy SFTP files) with frequencies ranging from milliseconds (IoT sensors) to weekly (ERP reports). The challenge for a food data engineer is to unify all of this into a single coherent analytical model.
Heterogeneous Data Sources: Reference Table
Before designing any pipeline, it is essential to inventory the sources. Here are the main ones for a moderately complex food supply chain:
| System | Type | Format | Volume/day | Frequency | Acceptable Latency |
|---|---|---|---|---|---|
| SAP ERP (farm/plant) | ERP | RFC/BAPI, IDoc, SQL | 50K–500K records | Nightly batch / CDC | T+1h |
| Oracle EBS | ERP | SQL, REST API | 100K–1M records | Batch / CDC | T+1h |
| MES (production) | MES | OPC-UA, SQL, REST | 1M–10M events | Near real-time (1 min) | 5 min |
| LIMS (laboratory) | LIMS | HL7, CSV, REST | 1K–10K analyses | Daily | T+4h |
| WMS (warehouse) | WMS | SQL, EDI, REST | 10K–100K movements | Every 15 min | 15 min |
| TMS (transport) | TMS | REST API, EDI | 5K–50K shipments | Every 5 min (GPS) | 5 min |
| Retail POS | POS | CSV, REST, SQL | 1M–50M transactions | Hourly / evening batch | T+2h |
| IoT cold chain sensors | IoT | MQTT, JSON, Protobuf | 100M+ readings | Every 30 sec–5 min | Real-time (<1 min) |
| EDI partners (EDIFACT) | EDI | EDIFACT, X12, GS1 XML | 100–10K messages | Event-driven / batch | T+30 min |
| Weather API (OpenMeteo) | External API | JSON REST | 1K–10K calls | Hourly | T+1h |
| CAP subsidy bodies | Public agency | XML, CSV SFTP | 1–100 files | Monthly / seasonal | T+24h |
| Vehicle GPS trackers | IoT/Telematics | REST, WebSocket | 10M+ GPS points | Every 30 sec | <2 min |
Warning: Legacy ERPs and "Extraction Windows"
Many agricultural and food ERPs do not support native CDC. Often the only way to extract data is through nightly batch jobs that lock tables, or through SAP RFC/BAPI interfaces with rigid maintenance windows. Plan extraction windows in advance and provide recovery mechanisms in case the nightly batch fails.
ETL vs ELT for the Food Supply Chain
The ETL vs ELT debate is particularly relevant in the food industry, where high-frequency data (IoT cold chain) coexists with traditional batch data (SAP ERP) and stringent regulatory requirements (traceability, FSMA 204).
| Criterion | Traditional ETL | Modern ELT (dbt + Data Lake) |
|---|---|---|
| Transformation | Before loading (staging server) | Inside the data warehouse (native SQL) |
| Scalability | Limited by the ETL server | Scales with the DWH (Snowflake, BigQuery, DuckDB) |
| Raw data retention | Often no (data already transformed) | Yes: Bronze layer preserves raw data |
| Auditability (FSMA) | Difficult: obscured lineage | Excellent: complete dbt lineage graph |
| Latency | Batch (T+1h, T+1d) | Micro-batch or streaming (Flink/Spark) |
| Cold chain failures | Data loss if ETL fails | Raw always saved; retry only on transformation |
| Required team skills | Java/Informatica/SSIS developer | SQL + Python (dbt + Airflow) |
| Infrastructure cost | Dedicated always-on server | Pay-per-query (Snowflake/BigQuery) |
For the modern food supply chain, the recommendation is a Medallion ELT architecture:
# Medallion Architecture for Food Supply Chain
BRONZE LAYER (Raw, immutable)
|-- All source data loaded as-is
|-- Partitioned by: source_system, ingestion_date
|-- Retention: 7 years (FSMA/EU compliance)
|-- Format: Parquet on S3/GCS or Delta Lake
|
v
SILVER LAYER (Cleansed, deduplicated)
|-- dbt models: heterogeneous schema normalization
|-- Deduplication of lots, SKUs, location codes
|-- Type casting: dates, temperatures (C/F), weights (kg/lb)
|-- NULL handling: offline sensors, ERP null fields
|-- PII masking: operator and driver data
|
v
GOLD LAYER (Business-ready KPIs)
|-- Aggregations: fill rate, waste rate, OTIF per SKU
|-- Cold chain metrics: temperature violations per lot
|-- BI dashboards: Power BI, Tableau, Grafana
|-- ML API: feature store for demand forecasting
|
v
DATA MART
|-- Retail mart: sell-through, shelf life at receipt
|-- Operations mart: plant OEE, production yields
|-- Finance mart: cost per lot, margin per SKU
|-- Compliance mart: FSMA traceability, recall readiness
Apache Airflow for Food Pipeline Orchestration
Apache Airflow is the de facto standard for orchestrating complex ETL workflows. In the food supply chain, a single DAG must coordinate dozens of sources with different latencies, dependencies and retry policies. Let us look at a complete DAG for the daily pipeline.
Main DAG: Food Supply Chain Daily ETL
# food_supply_chain_dag.py
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.trigger_rule import TriggerRule
logger = logging.getLogger(__name__)
DEFAULT_ARGS = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
with DAG(
dag_id="food_supply_chain_daily_etl",
description="Daily ETL pipeline: farm -> processing -> distribution -> retail",
default_args=DEFAULT_ARGS,
schedule_interval="0 2 * * *", # every night at 02:00 UTC
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["foodtech", "supply-chain", "etl", "production"],
max_active_runs=1,
doc_md="""
## Food Supply Chain Daily ETL
Extracts data from: SAP ERP, plant MES, WMS, TMS, retail POS.
Loads to Bronze S3, then runs dbt for Silver and Gold layers.
""",
) as dag:
# ----------------------------------------------------------------
# STEP 1: Health checks on data sources
# ----------------------------------------------------------------
check_sap_available = HttpSensor(
task_id="check_sap_api_available",
http_conn_id="sap_erp_api",
endpoint="/api/health",
timeout=30,
poke_interval=60,
mode="reschedule",
soft_fail=False,
)
check_mes_available = HttpSensor(
task_id="check_mes_api_available",
http_conn_id="mes_plant_api",
endpoint="/health",
timeout=30,
poke_interval=60,
mode="reschedule",
)
# ----------------------------------------------------------------
# STEP 2: SAP ERP extraction (lots, orders, inventory)
# ----------------------------------------------------------------
@task(task_id="extract_sap_erp", retries=3)
def extract_sap_erp(**context: Any) -> dict:
"""Extracts data from SAP ERP via RFC/BAPI REST wrapper."""
from src.connectors.sap_connector import SAPConnector
execution_date = context["ds"] # 'YYYY-MM-DD'
sap = SAPConnector(conn_id="sap_erp_api")
# Extract: production lots from the previous day
batches = sap.get_production_batches(
date=execution_date,
plants=["PL001", "PL002", "PL003"],
)
# Extract: stock movements
stock_movements = sap.get_stock_movements(
date=execution_date,
movement_types=["101", "261", "501", "601"],
)
# Extract: confirmed sales orders
sales_orders = sap.get_sales_orders(
date=execution_date,
status=["CONF", "DLVR"],
)
# Save to S3 Bronze layer
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/sap_erp/dt={execution_date}"
s3.load_string(
string_data=batches.to_json(orient="records", lines=True),
key=f"{prefix}/batches.jsonl",
bucket_name="food-data-lake",
replace=True,
)
s3.load_string(
string_data=stock_movements.to_json(orient="records", lines=True),
key=f"{prefix}/stock_movements.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"SAP extraction OK: %d batches, %d movements, %d orders",
len(batches),
len(stock_movements),
len(sales_orders),
)
return {
"batches_count": len(batches),
"movements_count": len(stock_movements),
"orders_count": len(sales_orders),
}
# ----------------------------------------------------------------
# STEP 3: Cold chain data extraction (IoT temperature sensors)
# ----------------------------------------------------------------
@task(task_id="extract_cold_chain_iot")
def extract_cold_chain_iot(**context: Any) -> dict:
"""Extracts temperature readings from InfluxDB (IoT cold chain)."""
from influxdb_client import InfluxDBClient
execution_date = context["ds"]
client = InfluxDBClient(
url="{{ var.value.influxdb_url }}",
token="{{ var.value.influxdb_token }}",
org="foodtech-org",
)
query_api = client.query_api()
# Flux query: temperature readings per transport lot
flux_query = f"""
from(bucket: "cold-chain")
|> range(start: {execution_date}T00:00:00Z,
stop: {execution_date}T23:59:59Z)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "celsius")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> pivot(rowKey: ["_time", "lot_id"], columnKey: ["_field"], valueColumn: "_value")
"""
result = query_api.query_data_frame(flux_query)
# Calculate violations: temperature out of range per product category
violations = result[
(result["celsius"] < result["min_temp_required"]) |
(result["celsius"] > result["max_temp_required"])
]
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/cold_chain/dt={execution_date}"
s3.load_string(
string_data=result.to_json(orient="records", lines=True),
key=f"{prefix}/temperature_readings.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"Cold chain extraction OK: %d readings, %d violations",
len(result),
len(violations),
)
if len(violations) > 0:
logger.warning(
"COLD CHAIN VIOLATIONS: %d out-of-temperature events detected!",
len(violations),
)
return {
"readings_count": len(result),
"violations_count": len(violations),
}
# ----------------------------------------------------------------
# STEP 4: EDI extraction from retailer partners
# ----------------------------------------------------------------
@task(task_id="extract_edi_partners")
def extract_edi_partners(**context: Any) -> dict:
"""Processes EDI EDIFACT files from SFTP partners."""
from src.connectors.edi_connector import EDIFACTConnector
execution_date = context["ds"]
edi = EDIFACTConnector(sftp_conn_id="edi_sftp_partners")
# Download and parse ORDERS (EDIFACT ORDERS D.01B)
orders = edi.process_orders(date=execution_date)
# Download and parse DESADV (Despatch Advice)
despatch_advices = edi.process_desadv(date=execution_date)
# Download and parse RECADV (Receiving Advice)
receiving_advices = edi.process_recadv(date=execution_date)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/edi_partners/dt={execution_date}"
s3.load_string(
string_data=orders.to_json(orient="records", lines=True),
key=f"{prefix}/orders.jsonl",
bucket_name="food-data-lake",
replace=True,
)
return {
"orders_count": len(orders),
"desadv_count": len(despatch_advices),
"recadv_count": len(receiving_advices),
}
# ----------------------------------------------------------------
# STEP 5: Retail POS extraction
# ----------------------------------------------------------------
@task(task_id="extract_pos_data")
def extract_pos_data(**context: Any) -> dict:
"""Extracts POS sales data from retailer partner database."""
from src.connectors.retail_connector import RetailPOSConnector
execution_date = context["ds"]
pos = RetailPOSConnector(conn_id="retail_pos_db")
# Sales by SKU/store with shelf life info
sales = pos.get_daily_sales(
date=execution_date,
include_promotions=True,
include_markdowns=True,
)
# Near-expiry: products with <3 days remaining still on shelf
near_expiry = pos.get_near_expiry_stock(
date=execution_date,
days_threshold=3,
)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/pos_retail/dt={execution_date}"
s3.load_string(
string_data=sales.to_json(orient="records", lines=True),
key=f"{prefix}/sales.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info("POS extraction OK: %d transactions", len(sales))
return {
"sales_count": len(sales),
"near_expiry_skus": len(near_expiry),
}
# ----------------------------------------------------------------
# STEP 6: dbt run (Silver + Gold transformations)
# ----------------------------------------------------------------
dbt_run_silver = BashOperator(
task_id="dbt_run_silver_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:silver --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
dbt_run_gold = BashOperator(
task_id="dbt_run_gold_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:gold --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
# ----------------------------------------------------------------
# STEP 7: dbt test (data quality checks)
# ----------------------------------------------------------------
dbt_test = BashOperator(
task_id="dbt_test_all",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt test --select tag:silver tag:gold --target prod"
),
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# ----------------------------------------------------------------
# STEP 8: Great Expectations validation
# ----------------------------------------------------------------
@task(task_id="run_great_expectations_validation")
def run_ge_validation(**context: Any) -> dict:
"""Runs Great Expectations to validate cold chain data quality."""
from great_expectations.data_context import get_context
context_ge = get_context(project_root_dir="/opt/great_expectations")
execution_date = context["ds"]
results = {}
suites_to_run = [
("cold_chain_temperature", "bronze_cold_chain_checkpoint"),
("lot_traceability", "silver_lots_checkpoint"),
("kpi_supply_chain", "gold_kpi_checkpoint"),
]
for suite_name, checkpoint_name in suites_to_run:
result = context_ge.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request={
"runtime_parameters": {"path": f"s3://food-data-lake/bronze/cold_chain/dt={execution_date}"},
"batch_identifiers": {"execution_date": execution_date},
},
)
results[suite_name] = result.success
failed = [name for name, ok in results.items() if not ok]
if failed:
raise ValueError(f"GE validation FAILED for suites: {failed}")
logger.info("Great Expectations: all suites passed")
return results
# ----------------------------------------------------------------
# STEP 9: KPI alerts (email if waste rate > threshold)
# ----------------------------------------------------------------
@task(task_id="check_kpi_alerts", trigger_rule=TriggerRule.ALL_SUCCESS)
def check_kpi_alerts(**context: Any) -> None:
"""Checks critical KPIs and sends alerts if out of threshold."""
from src.services.kpi_alert_service import KPIAlertService
execution_date = context["ds"]
alert_service = KPIAlertService(conn_id="snowflake_dw")
kpis = alert_service.get_daily_kpis(date=execution_date)
alerts_sent = []
# Waste rate > 5%: critical alert
if kpis["waste_rate_pct"] > 5.0:
alert_service.send_alert(
severity="HIGH",
message=f"Waste rate {kpis['waste_rate_pct']:.1f}% exceeds 5% threshold",
kpi="waste_rate",
value=kpis["waste_rate_pct"],
)
alerts_sent.append("waste_rate")
# OTIF < 95%: warning alert
if kpis["otif_pct"] < 95.0:
alert_service.send_alert(
severity="MEDIUM",
message=f"OTIF {kpis['otif_pct']:.1f}% below 95% threshold",
kpi="otif",
value=kpis["otif_pct"],
)
alerts_sent.append("otif")
# Cold chain violations > 0: immediate alert
if kpis["cold_chain_violations"] > 0:
alert_service.send_alert(
severity="CRITICAL",
message=f"{kpis['cold_chain_violations']} cold chain temperature violations detected!",
kpi="cold_chain_violations",
value=kpis["cold_chain_violations"],
)
alerts_sent.append("cold_chain")
logger.info("KPI check complete. Alerts sent: %s", alerts_sent)
# ----------------------------------------------------------------
# DAG wiring: task dependencies
# ----------------------------------------------------------------
extract_sap = extract_sap_erp()
extract_cold = extract_cold_chain_iot()
extract_edi = extract_edi_partners()
extract_pos = extract_pos_data()
ge_validation = run_ge_validation()
kpi_alerts = check_kpi_alerts()
# Health checks before everything else
[check_sap_available, check_mes_available] >> extract_sap
# Parallel extraction of all sources
[extract_sap, extract_cold, extract_edi, extract_pos] >> dbt_run_silver
# Silver before Gold
dbt_run_silver >> dbt_run_gold
# Tests after Gold
dbt_run_gold >> dbt_test
# GE validation and KPI alerts after tests
dbt_test >> ge_validation >> kpi_alerts
dbt for Transformations: SQL Models for Food KPIs
dbt (data build tool) is the ideal tool for transforming data in the Silver and Gold layers. Let us look at the main models for the food supply chain, with integrated tests and documentation.
Schema.yml: dbt Documentation and Tests
# models/silver/schema.yml
version: 2
models:
- name: silver_lots
description: "Production lots normalized from SAP ERP"
config:
tags: ["silver", "lots", "traceability"]
materialized: incremental
incremental_strategy: merge
unique_key: lot_id
columns:
- name: lot_id
description: "Unique lot identifier (GS1 SGTIN)"
tests:
- not_null
- unique
- name: production_date
tests:
- not_null
- name: expiry_date
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "expiry_date > production_date"
- name: plant_code
tests:
- not_null
- accepted_values:
values: ["PL001", "PL002", "PL003", "PL004"]
- name: temperature_class
tests:
- accepted_values:
values: ["ambient", "chilled", "frozen", "ultra_frozen"]
- name: net_weight_kg
tests:
- dbt_utils.expression_is_true:
expression: "net_weight_kg > 0 AND net_weight_kg < 50000"
- name: silver_shipments
description: "Shipments normalized from TMS and EDI DESADV"
config:
tags: ["silver", "logistics"]
materialized: incremental
unique_key: shipment_id
columns:
- name: shipment_id
tests: [not_null, unique]
- name: planned_delivery_date
tests: [not_null]
- name: actual_delivery_date
description: "NULL if not yet delivered"
- name: otif_flag
description: "True if delivered On-Time and In-Full"
tests:
- accepted_values:
values: [true, false, null]
quote: false
- name: gold_kpi_supply_chain
description: "Daily supply chain KPIs aggregated by SKU/plant/customer"
config:
tags: ["gold", "kpi", "dashboard"]
materialized: table
columns:
- name: kpi_date
tests: [not_null]
- name: waste_rate_pct
description: "% product wasted out of total product"
tests:
- dbt_utils.expression_is_true:
expression: "waste_rate_pct >= 0 AND waste_rate_pct <= 100"
- name: otif_pct
tests:
- dbt_utils.expression_is_true:
expression: "otif_pct >= 0 AND otif_pct <= 100"
Silver Model: Production Lots
-- models/silver/silver_lots.sql
-- {{ config(tags=["silver", "lots"], materialized="incremental") }}
WITH source_sap AS (
SELECT
lot_number AS lot_id,
material_number AS sku_code,
plant AS plant_code,
production_date AS production_date,
-- Normalize expiry date (SAP uses YYYYMMDD format)
TO_DATE(CAST(expiry_date_sap AS VARCHAR), 'YYYYMMDD') AS expiry_date,
quantity_kg AS net_weight_kg,
-- Temperature classes from SAP material master
CASE
WHEN temp_cond_sap = '0001' THEN 'ambient'
WHEN temp_cond_sap = '0002' THEN 'chilled'
WHEN temp_cond_sap = '0003' THEN 'frozen'
WHEN temp_cond_sap = '0004' THEN 'ultra_frozen'
ELSE 'unknown'
END AS temperature_class,
-- Required temperature range (degrees Celsius)
CASE
WHEN temp_cond_sap = '0001' THEN STRUCT(15.0 AS min_c, 25.0 AS max_c)
WHEN temp_cond_sap = '0002' THEN STRUCT(2.0 AS min_c, 8.0 AS max_c)
WHEN temp_cond_sap = '0003' THEN STRUCT(-25.0 AS min_c, -15.0 AS max_c)
WHEN temp_cond_sap = '0004' THEN STRUCT(-60.0 AS min_c, -30.0 AS max_c)
END AS temp_range,
batch_status AS lot_status,
_ingestion_timestamp AS ingested_at
FROM {{ source('bronze', 'sap_erp_batches') }}
WHERE lot_number IS NOT NULL
AND quantity_kg > 0
{% if is_incremental() %}
AND _ingestion_timestamp > (
SELECT MAX(ingested_at) FROM {{ this }}
)
{% endif %}
),
deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY lot_id
ORDER BY ingested_at DESC
) AS row_num
FROM source_sap
),
final AS (
SELECT
lot_id,
sku_code,
plant_code,
production_date,
expiry_date,
net_weight_kg,
temperature_class,
temp_range,
lot_status,
-- Total shelf life in days
DATEDIFF('day', production_date, expiry_date) AS shelf_life_days,
-- Remaining shelf life relative to today
DATEDIFF('day', CURRENT_DATE, expiry_date) AS shelf_life_remaining_days,
ingested_at
FROM deduplicated
WHERE row_num = 1
AND expiry_date > production_date
)
SELECT * FROM final
Gold Model: Supply Chain KPIs
-- models/gold/gold_kpi_supply_chain.sql
-- {{ config(tags=["gold", "kpi"], materialized="table") }}
WITH lots AS (
SELECT * FROM {{ ref('silver_lots') }}
),
shipments AS (
SELECT * FROM {{ ref('silver_shipments') }}
),
pos_sales AS (
SELECT * FROM {{ ref('silver_pos_sales') }}
),
inventory AS (
SELECT * FROM {{ ref('silver_inventory_snapshot') }}
),
-- KPI 1: Waste Rate (% product wasted)
waste_kpi AS (
SELECT
production_date AS kpi_date,
plant_code,
-- Product expired or disposed as waste
SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END) AS wasted_kg,
SUM(net_weight_kg) AS total_produced_kg,
ROUND(
100.0 * SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END)
/ NULLIF(SUM(net_weight_kg), 0),
2
) AS waste_rate_pct
FROM lots
GROUP BY 1, 2
),
-- KPI 2: OTIF (On-Time In-Full)
otif_kpi AS (
SELECT
DATE(planned_delivery_date) AS kpi_date,
customer_code,
COUNT(*) AS total_orders,
SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END) AS otif_orders,
ROUND(
100.0 * SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END)
/ NULLIF(COUNT(*), 0),
2
) AS otif_pct
FROM shipments
WHERE actual_delivery_date IS NOT NULL
GROUP BY 1, 2
),
-- KPI 3: Days of Supply (stock in days)
dos_kpi AS (
SELECT
inv.snapshot_date AS kpi_date,
inv.sku_code,
inv.location_code,
inv.stock_qty_units,
COALESCE(avg_daily_sales.avg_daily_units, 0) AS avg_daily_demand,
ROUND(
inv.stock_qty_units
/ NULLIF(avg_daily_sales.avg_daily_units, 0),
1
) AS days_of_supply
FROM inventory inv
LEFT JOIN (
SELECT sku_code, location_code,
AVG(units_sold) AS avg_daily_units
FROM pos_sales
WHERE sale_date >= DATEADD('day', -30, CURRENT_DATE)
GROUP BY 1, 2
) avg_daily_sales
ON inv.sku_code = avg_daily_sales.sku_code
AND inv.location_code = avg_daily_sales.location_code
),
-- KPI 4: Shelf Life at Receipt (quality upon delivery)
slr_kpi AS (
SELECT
DATE(s.actual_delivery_date) AS kpi_date,
l.sku_code,
-- % remaining shelf life at time of delivery
ROUND(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0),
1
) AS shelf_life_pct_at_receipt,
-- Average per SKU
AVG(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0)
) OVER (PARTITION BY DATE(s.actual_delivery_date), l.sku_code)
AS avg_slr_pct
FROM shipments s
INNER JOIN lots l ON s.lot_id = l.lot_id
WHERE s.actual_delivery_date IS NOT NULL
)
-- Final join for unified KPI dashboard
SELECT
w.kpi_date,
w.plant_code,
w.waste_rate_pct,
w.wasted_kg,
w.total_produced_kg,
o.otif_pct,
o.total_orders,
d.days_of_supply,
d.avg_daily_demand,
s.avg_slr_pct AS shelf_life_at_receipt_pct,
-- Overall supply chain score (0-100)
ROUND(
(
(100.0 - COALESCE(w.waste_rate_pct, 0)) * 0.30 +
COALESCE(o.otif_pct, 0) * 0.40 +
LEAST(d.days_of_supply / 14.0 * 100, 100) * 0.15 +
COALESCE(s.avg_slr_pct, 0) * 0.15
),
1
) AS supply_chain_health_score
FROM waste_kpi w
LEFT JOIN otif_kpi o ON w.kpi_date = o.kpi_date
LEFT JOIN dos_kpi d ON w.kpi_date = d.kpi_date
LEFT JOIN slr_kpi s ON w.kpi_date = s.kpi_date
Data Quality for Food Data: Great Expectations
In the food industry, data quality is not merely a cleanliness concern: it is a regulatory requirement. Incorrect data on temperatures, expiration dates or lot codes can result in FSMA penalties, costly recalls and reputational damage. Great Expectations allows you to define declarative validation rules and integrate them directly into the Airflow pipeline.
# great_expectations/expectations/cold_chain_temperature_suite.py
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
context = gx.get_context()
# Create or update the expectations suite
suite = context.add_or_update_expectation_suite(
expectation_suite_name="cold_chain_temperature_suite"
)
validator = context.get_validator(
batch_request=BatchRequest(
datasource_name="s3_bronze_datasource",
data_connector_name="cold_chain_connector",
data_asset_name="temperature_readings",
),
expectation_suite_name="cold_chain_temperature_suite",
)
# RULE 1: Temperature never NULL for refrigerated/frozen products
validator.expect_column_values_to_not_be_null(
column="celsius",
mostly=0.99, # 1% tolerance for temporarily offline sensors
meta={
"notes": "IoT cold chain sensors: max 1% missing readings",
"regulatory_reference": "EU Reg 37/2005 cold chain monitoring",
}
)
# RULE 2: Absolute temperature range (covers all product classes)
validator.expect_column_values_to_be_between(
column="celsius",
min_value=-30.0,
max_value=35.0,
meta={
"notes": "Absolute range: covers frozen (-25°C) and ambient (25°C)",
}
)
# RULE 3: Lot ID always present and valid GS1 SGTIN format
validator.expect_column_values_to_match_regex(
column="lot_id",
regex=r"^[0-9]{14}$", # GS1 GTIN-14 format
meta={"notes": "GS1 SGTIN-14 standard for food traceability"}
)
# RULE 4: Monotonically increasing timestamps per sensor
validator.expect_column_values_to_be_increasing(
column="reading_timestamp",
strictly=False, # allow duplicates (e.g. batch upload)
parse_strings_as_datetimes=True,
meta={"notes": "Verify timestamps are not backdated"}
)
# RULE 5: Number of readings per lot (at least 1 every 30 min)
validator.expect_table_row_count_to_be_between(
min_value=48, # 24h * 2 readings/hour = 48 minimum
max_value=10000, # reasonable max for 24h
meta={"notes": "Minimum 1 reading every 30 min for active cold chain"}
)
# RULE 6: Temperature distribution for refrigerated product class
# Chilled products: median must be 2-8°C
validator.expect_column_median_to_be_between(
column="celsius",
min_value=1.0,
max_value=9.0,
meta={
"notes": "Only for lots with temperature_class='chilled'",
"applies_to": "filtered_batches_chilled",
}
)
# Save the expectations
validator.save_expectation_suite(discard_failed_expectations=False)
# Checkpoint for Airflow
checkpoint = context.add_or_update_checkpoint(
name="bronze_cold_chain_checkpoint",
config_version=1,
template_name=None,
module_name="great_expectations.checkpoint",
class_name="Checkpoint",
run_name_template="%Y%m%d-%H%M%S-cold-chain",
expectation_suite_name="cold_chain_temperature_suite",
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "send_slack_notification_on_failure",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "{{ var.value.slack_webhook_cold_chain }}",
"notify_on": "failure",
"renderer": {
"module_name": "great_expectations.render.renderer",
"class_name": "SlackRenderer",
},
},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
],
)
Food Supply Chain KPIs and Metrics
Defining the right KPIs is the prerequisite for making data-driven decisions. Here are the fundamental metrics for the food supply chain, with formulas, industry benchmarks and realistic targets.
| KPI | Formula | Industry Benchmark | Best-in-Class Target | Alert Threshold |
|---|---|---|---|---|
| Perfect Order Rate | Perfect orders / Total orders * 100 | 85-92% | >95% | <90% |
| OTIF (On-Time In-Full) | (On-time and complete deliveries) / Total * 100 | 88-94% | >97% | <92% |
| Waste Rate % | Wasted kg / Total produced kg * 100 | 3-8% | <2% | >5% |
| Inventory Turnover | Annual COGS / Average inventory | 12-20x (fresh) | >25x (fresh) | <10x |
| Days of Supply (DOS) | Available stock / Average daily demand | 3-7 days (fresh) | 2-4 days | >10 days |
| Fill Rate | Units delivered / Units ordered * 100 | 92-96% | >98% | <94% |
| Shelf Life at Receipt (SLR) | Remaining days at delivery / Total shelf life * 100 | 60-75% | >80% | <60% |
| Cold Chain Compliance | Lots without temp violations / Total lots * 100 | 97-99% | >99.5% | <98% |
| Forecast Accuracy | 1 - (MAE / Average demand) * 100 | 75-85% | >90% | <75% |
| Cash-to-Cash Cycle | DIO + DSO - DPO (days) | 15-35 days | <15 days | >45 days |
How to Calculate the Supply Chain Health Score
Weighting KPIs into a single composite score helps management monitor the overall health of the supply chain with a single number:
- OTIF: 40% weight (direct impact on customer satisfaction)
- Waste Rate: 30% weight (impact on margins and sustainability)
- Shelf Life at Receipt: 15% weight (quality to the consumer)
- Cold Chain Compliance: 15% weight (food safety and compliance)
Score > 90: excellent | 80-90: good | 70-80: needs improvement | <70: critical
Real-Time vs Batch: When to Use Which Architecture
Not everything needs to be real-time. The cost of a streaming architecture is significantly higher than a traditional batch architecture. In the food supply chain, the right choice depends on acceptable latency and operational consequences.
| Use Case | Architecture | Latency | Tools | Rationale |
|---|---|---|---|---|
| Cold chain monitoring (temperature) | Real-time streaming | <1 min | Kafka + Flink | Temp violation = immediate lot loss |
| Recall management | Real-time / event-driven | <5 min | Kafka + alerting | FSMA: trace lots in <2 hours |
| Vehicle GPS tracking | Near real-time | <2 min | MQTT + InfluxDB | Updated ETA for customers |
| Daily KPIs (OTIF, waste) | Daily batch | T+2h | Airflow + dbt | Morning operational report |
| Demand forecasting | Daily/weekly batch | T+6h | Airflow + MLflow | Production planning does not require real-time |
| Inventory snapshot | Micro-batch (every 15 min) | 15 min | Airflow + Snowflake | Warehouse operators: sufficient visibility |
| Weekly waste analysis | Weekly batch | T+24h | dbt + BI tool | Strategic decisions, not operational |
| FSMA compliance audit | On-demand batch | On request | dbt + data vault | Planned inspections, not daily |
# Lambda Architecture for Food Supply Chain
# Speed layer (real-time) + Batch layer (accuracy)
# SPEED LAYER: Kafka + Apache Flink
# Processes events in milliseconds: cold chain, recall alerts
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("cold-chain-readings") \
.set_group_id("flink-cold-chain-consumer") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
cold_chain_stream = env.from_source(
kafka_source,
watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(
Duration.of_seconds(30)
),
source_name="cold-chain-kafka",
)
# Filter temperature violations in real time
violations_stream = cold_chain_stream \
.map(lambda msg: json.loads(msg)) \
.filter(lambda r: (
r.get("temperature_class") == "chilled" and
(r["celsius"] < 2.0 or r["celsius"] > 8.0)
)) \
.map(lambda r: {
**r,
"violation_severity": "CRITICAL" if r["celsius"] > 12.0 else "WARNING",
"violation_detected_at": datetime.utcnow().isoformat(),
})
# Write violations to Kafka alert topic
violations_stream.sink_to(
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("cold-chain-violations")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)
env.execute("food-cold-chain-monitoring")
Legacy System Integration
Most agri-food companies operate with legacy systems: SAP R/3 from the 1990s, EDI EDIFACT files over SFTP, Oracle databases without a REST API. Integrating these systems requires specific approaches for each connector type.
SAP Connector: RFC and BAPI
# src/connectors/sap_connector.py
"""
SAP ERP connector via pyrfc (SAP RFC/BAPI wrapper).
Requires: SAP NetWeaver RFC SDK + pyrfc library.
"""
import pyrfc
import pandas as pd
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class SAPConnectionConfig:
ashost: str
sysnr: str
client: str
user: str
passwd: str
lang: str = "EN"
class SAPConnector:
"""Wrapper for SAP RFC/BAPI calls for supply chain data."""
def __init__(self, config: SAPConnectionConfig):
self._config = config
self._conn: Optional[pyrfc.Connection] = None
def __enter__(self):
self._conn = pyrfc.Connection(
ashost=self._config.ashost,
sysnr=self._config.sysnr,
client=self._config.client,
user=self._config.user,
passwd=self._config.passwd,
lang=self._config.lang,
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn:
self._conn.close()
def get_production_batches(
self,
date_from: str,
date_to: str,
plants: list[str],
) -> pd.DataFrame:
"""
Extracts production lots from SAP via BAPI_BATCH_GET_DETAIL.
Args:
date_from: Start date in YYYYMMDD format
date_to: End date in YYYYMMDD format
plants: List of SAP plant codes
"""
all_batches = []
for plant in plants:
try:
# Call SAP BAPI for lot list
result = self._conn.call(
"BAPI_BATCH_GET_DETAIL",
PLANT=plant,
DATE_FROM=date_from,
DATE_TO=date_to,
BATCH_STATUS="", # all statuses
)
batches = result.get("BATCH_DETAIL_LIST", [])
logger.info(
"SAP plant %s: %d lots extracted",
plant,
len(batches),
)
for batch in batches:
all_batches.append({
"lot_id": f"{batch['MATNR']}-{batch['CHARG']}",
"material_number": batch["MATNR"].strip(),
"lot_number": batch["CHARG"].strip(),
"plant": plant,
"production_date": batch.get("HSDAT"),
"expiry_date_sap": batch.get("VFDAT"),
"batch_status": batch.get("ZUSTD"),
"quantity_kg": float(batch.get("CLABS", 0)),
"temp_cond_sap": batch.get("MHDRZ", "0001"),
"_source": "SAP_ERP",
"_extracted_at": pd.Timestamp.utcnow().isoformat(),
})
except pyrfc.ABAPApplicationError as e:
logger.error(
"SAP BAPI error for plant %s: %s",
plant,
str(e),
)
# Do not fail the entire extraction for a single plant
continue
return pd.DataFrame(all_batches)
def get_stock_movements(
self,
date_from: str,
date_to: str,
movement_types: list[str],
) -> pd.DataFrame:
"""
Extracts SAP stock movements via MB51 BAPI equivalent.
movement_types: ['101'=GR, '261'=GI to production, '601'=GI to customer]
"""
result = self._conn.call(
"BAPI_GOODSMVT_GETDETAIL",
GOODSMVT_DATE_FROM=date_from,
GOODSMVT_DATE_TO=date_to,
)
movements = []
for item in result.get("GOODSMVT_ITEMS", []):
if item["BWART"] in movement_types:
movements.append({
"movement_id": f"{item['MBLNR']}-{item['ZEILE']}",
"material": item["MATNR"].strip(),
"plant": item["WERKS"],
"movement_type": item["BWART"],
"quantity": float(item.get("MENGE", 0)),
"unit": item.get("MEINS", "KG"),
"lot_number": item.get("CHARG", ""),
"posting_date": item["BUDAT"],
"_source": "SAP_MOVEMENTS",
})
return pd.DataFrame(movements)
CDC with Debezium for Streaming from Legacy Databases
# debezium-config/oracle-erp-connector.json
# Debezium CDC configuration for Oracle EBS (legacy ERP)
{
"name": "oracle-erp-food-supply-chain",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "oracle-erp.internal",
"database.port": "1521",
"database.user": "debezium_cdc",
"database.password": "






