FSMA 204 Automation: Tracking, Alerts and Recall via Python
January 20, 2026 was supposed to mark a watershed moment in the history of American food safety. The FSMA Rule 204 — Food Traceability Final Rule issued by the FDA required all food supply chain operators to implement granular traceability systems for dozens of high-risk food categories. It was the most significant regulatory update since the Food Safety Modernization Act of 2011, with global implications: anyone exporting to the US market — including Italian producers of wine, cheese, and olive oil — was in scope.
In March 2025, the FDA extended the compliance deadline by 30 months (to July 20, 2028), but the direction is unambiguous: granular traceability for high-risk foods is no longer optional. Companies that begin implementation today will have a decisive competitive advantage when compliance becomes mandatory. In this article we build a complete Python system — from database design to REST API, from alert engine to recall management — that fully satisfies FSMA 204 requirements.
What You Will Learn
- The full structure of FSMA 204: Food Traceability List, CTEs and KDEs
- PostgreSQL database design for multi-level traceability
- FastAPI REST API with Pydantic models for CTE registration and one-up/one-down queries
- Immutable event sourcing with Apache Kafka for a complete audit trail
- Alert engine for anomaly detection (cold chain breaks, suspect lots) with multi-channel notifications
- Recall management workflow: traceback/traceforward in under 24 hours (FDA requirement)
- Mock recall drill with performance metrics
- FSMA 204 vs EU Reg. 178/2002 comparison and impact on Italian exporters
The FoodTech Series: Where We Are
This is the sixth article in the FoodTech series on federicocalo.dev. Here is the full map:
| # | Title | Level | Status |
|---|---|---|---|
| 01 | IoT Pipeline for Precision Agriculture | Intermediate | Published |
| 02 | Computer Vision for Food Quality Control | Intermediate | Published |
| 03 | ML and Edge Computing for Crop Prediction | Advanced | Published |
| 04 | Blockchain and Transparent Food Supply Chain | Advanced | Published |
| 05 | Demand Forecasting for Retail with Prophet and LSTM | Advanced | Published |
| 06 | FSMA 204 Automation: Tracking, Alerts and Recall via Python | Advanced | YOU ARE HERE |
| 07 | Vertical Farming: Dashboard and Automated Control | Intermediate | Coming Soon |
| 08 | Satellite APIs for Crop Monitoring | Advanced | Coming Soon |
| 09 | Farm-to-Fork Dashboard with Streamlit | Intermediate | Coming Soon |
| 10 | Supply Chain Resilience: Optimization with OR-Tools | Advanced | Coming Soon |
FSMA Rule 204: The Most Significant Regulatory Change Since 2011
The Food Safety Modernization Act (FSMA) of 2011 transformed the FDA's approach to food safety: from reactive (responding to contaminations) to preventive (preventing them). Section 204 delegated to the FDA the authority to identify high-risk foods and impose additional traceability requirements. The result is the Food Traceability Final Rule, published in November 2022.
The objective is radical: in the event of an outbreak or contamination, operators must be able to identify the full distribution chain of a lot — from field to fork — and deliver records to the FDA in electronic, sortable format within 24 hours of the request. Before FSMA 204, this process took an average of 7–10 days. The FDA estimates the impact on reducing illnesses and deaths from contaminated food at hundreds of prevented cases per year.
Food Traceability List (FTL): Covered Foods
The FTL includes foods historically associated with serious outbreaks. The main categories are:
| Category | Specific examples | Primary risk |
|---|---|---|
| Fresh cheeses | Soft/semi-soft unaged, fresh mozzarella, ricotta | Listeria monocytogenes |
| Shell eggs | Unpasteurized chicken eggs | Salmonella Enteritidis |
| Seafood — finfish | Tuna, salmon, cod, swordfish (fresh/frozen) | Scombroid, Listeria |
| Crustaceans | Shrimp, crab, lobster | Vibrio, Norovirus |
| Bivalve mollusks | Oysters, clams, mussels | Norovirus, Vibrio |
| RTE deli salads | Potato salad, egg salad, seafood salad | Listeria, Salmonella |
| Fresh produce | Cucumbers, herbs, leafy greens, melons, peppers, sprouts, tomatoes, tropical fruits | E. coli O157:H7, Salmonella |
| Nut butters | Peanut butter, almond butter, cashew butter | Salmonella |
Important Note for Italian Exporters
Fresh cheeses (mozzarella, burrata, ricotta, stracchino) are explicitly on the FTL. Italian companies exporting these products to the US must implement FSMA 204 traceability for all lots destined for the American market. Hard aged cheeses (Parmigiano Reggiano, Grana Padano, Pecorino Romano) are not on the current FTL.
Critical Tracking Events (CTEs)
CTEs are the moments in the supply chain at which traceability must be recorded. FSMA 204 defines seven main ones, each with specific KDEs:
| CTE | Description | Applies to |
|---|---|---|
| Growing | Harvesting fresh produce directly from the field | Produce, herbs |
| Harvesting | Gathering non-cultivated foods (shellfish, wild-caught fish) | Wild-caught seafood |
| Cooling | First cooling operation after harvest | Produce, seafood |
| Initial Packing | First time the product is packed in its final form | All FTL items |
| Shipping | Every transfer of ownership or custody | All FTL items |
| Receiving | Receipt of an FTL item from another operator | All FTL items |
| Transformation | When an FTL item is incorporated into a new product | Processors/manufacturers |
Key Data Elements (KDEs) per CTE
For each CTE the rule defines mandatory KDEs. Example for Shipping CTE:
- Traceability lot code (TLC) — unique lot identifier
- Quantity and unit of measure shipped
- Ship date
- Location description of the shipping point (TLC source reference)
- Transport document reference number
- Recipient name and address
- Food description (including FDA Food Facility Registration number)
For Receiving CTE:
- TLC received (from the preceding shipment)
- Quantity received
- Receive date
- Location of the receiving point
- Reference to the shipping document
- TLC source (lot provenance)
FSMA 204 Compliance System Architecture
A production-grade FSMA 204 compliance system must handle three main flows: real-time data capture during operations, processing and storage of events in an immutable fashion, and rapid response when an FDA request or recall occurs. The architecture proposed here is event-driven and cloud-native:
Technology Stack
- API Layer: FastAPI (Python 3.11+) — REST endpoints for CTE registration
- Primary database: PostgreSQL 16 — relational schema for traceability
- Event streaming: Apache Kafka — CTE events as immutable messages
- Graph traversal: NetworkX or Neo4j — supply chain traceback/traceforward
- Alert engine: Python + Celery + Redis — async multi-channel rules
- Recall management: FastAPI + PostgreSQL stored procedures
- Reporting: automatic FDA recall notice generation in JSON/XML
The end-to-end flow:
- Data Capture: operators register CTEs via mobile/web API or ERP integration
- Event Processing: each CTE is validated, enriched and published to Kafka
- Traceability Graph: the lot genealogy graph is updated in real time
- Alert Engine: event rules detect anomalies and send notifications
- Recall Management: on a critical alert, automatically triggers the recall workflow
- FDA Reporting: generates the sortable list in electronic format within 24 hours
PostgreSQL Database Design for FSMA 204 Traceability
The schema must satisfy two opposing requirements: performance for day-to-day operations (CTE inserts, business queries) and extreme speed for traceback queries during a recall. We use PostgreSQL with a combination of relational tables and JSONB indexes for CTE-variable KDEs.
-- ============================================================
-- FSMA 204 Compliance Database Schema
-- PostgreSQL 16
-- ============================================================
-- UUID generation extension
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
CREATE EXTENSION IF NOT EXISTS "ltree"; -- For path-based genealogy queries
-- ============================================================
-- TABLE: locations
-- All supply chain locations (companies, warehouses, ports)
-- ============================================================
CREATE TABLE locations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
fda_facility_id VARCHAR(12) UNIQUE, -- FDA Food Facility Registration Number
name VARCHAR(255) NOT NULL,
address_line1 VARCHAR(255) NOT NULL,
city VARCHAR(100) NOT NULL,
state_province VARCHAR(100),
country_code CHAR(2) NOT NULL, -- ISO 3166-1 alpha-2
postal_code VARCHAR(20),
location_type VARCHAR(50) NOT NULL
CHECK (location_type IN (
'farm', 'packing_house', 'processor',
'distributor', 'retailer', 'port', 'cold_storage'
)),
contact_email VARCHAR(255),
contact_phone VARCHAR(50),
coordinates POINT, -- lat/lon for geo-queries
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABLE: food_items
-- FTL food catalog with FSMA classification
-- ============================================================
CREATE TABLE food_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
ftl_category VARCHAR(50) NOT NULL
CHECK (ftl_category IN (
'fresh_cheese', 'shell_eggs', 'finfish',
'crustaceans', 'bivalve_mollusks', 'nut_butter',
'rte_deli_salads', 'fresh_produce'
)),
name VARCHAR(255) NOT NULL,
description TEXT,
fda_product_code VARCHAR(7), -- FDA product code
unit_of_measure VARCHAR(20) NOT NULL DEFAULT 'kg',
storage_temp_min DECIMAL(5,2), -- Celsius
storage_temp_max DECIMAL(5,2),
shelf_life_days INTEGER,
is_ftl_covered BOOLEAN DEFAULT true,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABLE: traceability_lots
-- Core: each traceable lot with TLC (Traceability Lot Code)
-- ============================================================
CREATE TABLE traceability_lots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tlc VARCHAR(50) UNIQUE NOT NULL, -- Traceability Lot Code
food_item_id UUID NOT NULL REFERENCES food_items(id),
origin_location_id UUID NOT NULL REFERENCES locations(id),
-- Quantity and lot size
initial_quantity DECIMAL(12,4) NOT NULL,
remaining_quantity DECIMAL(12,4) NOT NULL,
unit_of_measure VARCHAR(20) NOT NULL,
-- Critical dates
production_date DATE,
harvest_date DATE,
best_before_date DATE,
expiry_date DATE,
-- Genealogy: which parent lots this was derived from
parent_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
genealogy_path ltree, -- e.g. "LOT_A.LOT_B.LOT_C"
-- Lot status
status VARCHAR(20) NOT NULL DEFAULT 'active'
CHECK (status IN (
'active', 'recalled', 'quarantined',
'consumed', 'disposed', 'archived'
)),
-- FDA-specific attributes
tlc_source_reference TEXT, -- TLC provenance description
-- Extra metadata (variable per food type)
attributes JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes for fast genealogy queries
CREATE INDEX idx_lots_genealogy ON traceability_lots USING GIST (genealogy_path);
CREATE INDEX idx_lots_status ON traceability_lots(status);
CREATE INDEX idx_lots_tlc ON traceability_lots(tlc);
CREATE INDEX idx_lots_food_item ON traceability_lots(food_item_id);
CREATE INDEX idx_lots_origin ON traceability_lots(origin_location_id);
CREATE INDEX idx_lots_attributes ON traceability_lots USING GIN (attributes);
-- ============================================================
-- TABLE: cte_events
-- Immutable register of all Critical Tracking Events
-- This table must NEVER receive UPDATE or DELETE operations
-- ============================================================
CREATE TABLE cte_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(30) NOT NULL
CHECK (event_type IN (
'growing', 'harvesting', 'cooling',
'initial_packing', 'shipping', 'receiving',
'transformation'
)),
-- Lots involved
lot_id UUID NOT NULL REFERENCES traceability_lots(id),
related_lot_ids UUID[] DEFAULT ARRAY[]::UUID[], -- for transformation
-- Event location
location_id UUID NOT NULL REFERENCES locations(id),
-- Event timestamp (when it happened, not when it was recorded)
event_timestamp TIMESTAMPTZ NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW(),
recorded_by VARCHAR(255) NOT NULL, -- user/system that recorded it
-- KDEs as JSONB (structure varies per CTE type)
kde JSONB NOT NULL,
-- For shipments: recipient location
destination_location_id UUID REFERENCES locations(id),
-- Reference document (bill of lading, PO, etc.)
reference_document_number VARCHAR(100),
reference_document_type VARCHAR(50),
-- Quantity moved
quantity DECIMAL(12,4),
unit_of_measure VARCHAR(20),
-- Hash for immutability (SHA-256 of payload)
event_hash CHAR(64) NOT NULL,
-- Optional digital signature
digital_signature TEXT,
-- Metadata
notes TEXT,
metadata JSONB DEFAULT '{}'
-- NO updated_at: this record is immutable
);
-- Events must not be modified: protection rules
CREATE RULE no_update_cte AS ON UPDATE TO cte_events DO INSTEAD NOTHING;
CREATE RULE no_delete_cte AS ON DELETE TO cte_events DO INSTEAD NOTHING;
CREATE INDEX idx_cte_lot ON cte_events(lot_id);
CREATE INDEX idx_cte_type ON cte_events(event_type);
CREATE INDEX idx_cte_timestamp ON cte_events(event_timestamp DESC);
CREATE INDEX idx_cte_location ON cte_events(location_id);
CREATE INDEX idx_cte_destination ON cte_events(destination_location_id);
CREATE INDEX idx_cte_kde ON cte_events USING GIN (kde);
-- ============================================================
-- TABLE: recall_events
-- Recall management with full workflow
-- ============================================================
CREATE TABLE recall_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
recall_number VARCHAR(20) UNIQUE NOT NULL, -- e.g. "RECALL-2026-001"
recall_type VARCHAR(20) NOT NULL
CHECK (recall_type IN ('class_i', 'class_ii', 'class_iii', 'market_withdrawal')),
-- Root cause
trigger_lot_ids UUID[] NOT NULL, -- Lots that triggered the recall
trigger_reason VARCHAR(50) NOT NULL
CHECK (trigger_reason IN (
'pathogen_contamination', 'allergen_undeclared',
'foreign_material', 'chemical_contamination',
'mislabeling', 'temperature_abuse', 'other'
)),
trigger_description TEXT NOT NULL,
-- Automatically computed scope
affected_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
affected_quantity DECIMAL(12,4),
affected_unit VARCHAR(20),
-- Workflow status
status VARCHAR(30) NOT NULL DEFAULT 'identified'
CHECK (status IN (
'identified', 'scope_determined', 'notifications_sent',
'removal_in_progress', 'effectiveness_check',
'closed', 'fda_reported'
)),
-- Workflow timestamps
identified_at TIMESTAMPTZ DEFAULT NOW(),
scope_determined_at TIMESTAMPTZ,
notifications_sent_at TIMESTAMPTZ,
fda_reported_at TIMESTAMPTZ,
closed_at TIMESTAMPTZ,
-- FDA notification (within 24h of request)
fda_notified BOOLEAN DEFAULT false,
fda_report_json JSONB,
-- Responsible party
initiated_by VARCHAR(255) NOT NULL,
-- Drill metrics
traceback_seconds INTEGER,
traceforward_seconds INTEGER,
lots_traced_pct DECIMAL(5,2),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABLE: alert_rules
-- Alert engine rule configuration
-- ============================================================
CREATE TABLE alert_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
description TEXT,
rule_type VARCHAR(30) NOT NULL
CHECK (rule_type IN (
'temperature_breach', 'lot_age', 'missing_cte',
'quantity_discrepancy', 'supplier_blacklist',
'pathogen_alert', 'custom'
)),
condition_json JSONB NOT NULL,
severity VARCHAR(10) NOT NULL CHECK (severity IN ('low', 'medium', 'high', 'critical')),
channels TEXT[] NOT NULL, -- ['email', 'sms', 'webhook', 'slack']
recipients JSONB NOT NULL, -- {"email": [...], "phone": [...]}
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABLE: alerts
-- Alerts generated by the engine
-- ============================================================
CREATE TABLE alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
rule_id UUID REFERENCES alert_rules(id),
lot_id UUID REFERENCES traceability_lots(id),
cte_event_id UUID REFERENCES cte_events(id),
severity VARCHAR(10) NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT NOT NULL,
data JSONB DEFAULT '{}',
status VARCHAR(20) NOT NULL DEFAULT 'open'
CHECK (status IN ('open', 'acknowledged', 'resolved', 'escalated')),
acknowledged_by VARCHAR(255),
acknowledged_at TIMESTAMPTZ,
resolved_at TIMESTAMPTZ,
auto_recall_triggered BOOLEAN DEFAULT false,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_alerts_lot ON alerts(lot_id);
CREATE INDEX idx_alerts_severity ON alerts(severity);
CREATE INDEX idx_alerts_status ON alerts(status);
CREATE INDEX idx_alerts_created ON alerts(created_at DESC);
REST API with FastAPI: Pydantic Models and CTE Endpoints
The API is the entry point for all supply chain operators. We use FastAPI for its speed, automatic validation with Pydantic, and automatic OpenAPI documentation generation — invaluable during FDA audits. Pydantic models mirror the KDEs required by the rule.
# fsma204/models.py
# Pydantic v2 models for the FSMA 204 API
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List, Dict, Any
from datetime import datetime, date
from uuid import UUID
from enum import Enum
class CTEEventType(str, Enum):
GROWING = "growing"
HARVESTING = "harvesting"
COOLING = "cooling"
INITIAL_PACKING = "initial_packing"
SHIPPING = "shipping"
RECEIVING = "receiving"
TRANSFORMATION = "transformation"
class FTLCategory(str, Enum):
FRESH_CHEESE = "fresh_cheese"
SHELL_EGGS = "shell_eggs"
FINFISH = "finfish"
CRUSTACEANS = "crustaceans"
BIVALVE_MOLLUSKS = "bivalve_mollusks"
NUT_BUTTER = "nut_butter"
RTE_DELI_SALADS = "rte_deli_salads"
FRESH_PRODUCE = "fresh_produce"
class LotStatus(str, Enum):
ACTIVE = "active"
RECALLED = "recalled"
QUARANTINED = "quarantined"
CONSUMED = "consumed"
DISPOSED = "disposed"
# ── KDE Models per CTE type ──────────────────────────────────
class ShippingKDE(BaseModel):
"""Key Data Elements for Shipping CTE - FSMA 204 §1.1330"""
tlc: str = Field(..., description="Traceability Lot Code")
quantity_shipped: float = Field(..., gt=0)
unit_of_measure: str
ship_date: date
tlc_source_reference: str = Field(..., description="TLC provenance description")
transport_document_number: str
transport_document_type: str = Field(default="bill_of_lading")
receiver_fda_facility_id: str = Field(..., description="Recipient FDA registration")
receiver_name: str
receiver_address: str
class ReceivingKDE(BaseModel):
"""Key Data Elements for Receiving CTE - FSMA 204 §1.1335"""
tlc_received: str = Field(..., description="TLC as it appeared in the shipping document")
quantity_received: float = Field(..., gt=0)
unit_of_measure: str
receive_date: date
location_description: str
reference_document_number: str
reference_document_type: str
lot_code_generator_location: Optional[str] = None
class GrowingKDE(BaseModel):
"""Key Data Elements for Growing CTE"""
tlc: str
harvest_date: date
field_id: str # Field/plot identifier
grower_fda_facility_id: str
commodity: str
growing_method: str = "conventional" # conventional, organic, hydroponic
class InitialPackingKDE(BaseModel):
"""Key Data Elements for Initial Packing CTE"""
tlc: str
pack_date: date
quantity_packed: float
unit_of_measure: str
packaging_type: str
facility_fda_id: str
lot_code_generator_location: str
class TransformationKDE(BaseModel):
"""Key Data Elements for Transformation CTE"""
new_tlc: str # TLC of the new product
input_tlcs: List[str] # TLCs of input ingredients
transformation_date: date
facility_fda_id: str
product_description: str
quantity_produced: float
unit_of_measure: str
# ── Request/Response Models ──────────────────────────────────
class CreateLotRequest(BaseModel):
food_item_id: UUID
origin_location_id: UUID
tlc: str = Field(..., min_length=3, max_length=50)
initial_quantity: float = Field(..., gt=0)
unit_of_measure: str = Field(default="kg")
production_date: Optional[date] = None
harvest_date: Optional[date] = None
best_before_date: Optional[date] = None
expiry_date: Optional[date] = None
parent_lot_ids: List[UUID] = Field(default_factory=list)
attributes: Dict[str, Any] = Field(default_factory=dict)
@field_validator('tlc')
@classmethod
def validate_tlc(cls, v: str) -> str:
"""TLC must not contain visually ambiguous characters"""
forbidden = set('IO0l') # ambiguous characters
if any(c in forbidden for c in v.upper()):
raise ValueError("TLC contains ambiguous characters (I, O, 0, l)")
return v.upper()
class RegisterCTERequest(BaseModel):
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_by: str = Field(..., min_length=3)
kde: Dict[str, Any] = Field(..., description="KDEs specific to the CTE type")
destination_location_id: Optional[UUID] = None
reference_document_number: Optional[str] = None
reference_document_type: Optional[str] = None
quantity: Optional[float] = None
unit_of_measure: Optional[str] = None
notes: Optional[str] = None
class CTEEventResponse(BaseModel):
id: UUID
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_at: datetime
recorded_by: str
kde: Dict[str, Any]
event_hash: str
class TracebackRequest(BaseModel):
lot_id: UUID
depth: int = Field(default=10, ge=1, le=50)
class TracebackResult(BaseModel):
lot_id: UUID
tlc: str
ancestors: List[Dict[str, Any]] # upstream lots
descendants: List[Dict[str, Any]] # downstream lots
events: List[Dict[str, Any]]
total_ancestors: int
total_descendants: int
trace_depth: int
computation_ms: int
class SortableListResponse(BaseModel):
"""Sortable list format required by FDA (§1.1375)"""
generated_at: datetime
requesting_lots: List[str] # Requested TLCs
records: List[Dict[str, Any]] # Sortable records
total_records: int
format_version: str = "FSMA204_v1"
# fsma204/api.py
# FastAPI application for FSMA 204 compliance
import hashlib
import json
import time
from uuid import UUID
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .models import (
CreateLotRequest, RegisterCTERequest,
CTEEventResponse, TracebackRequest,
TracebackResult, SortableListResponse
)
from .database import get_db
from .kafka_producer import publish_cte_event
from .alert_engine import check_alert_rules
app = FastAPI(
title="FSMA 204 Traceability API",
description="Food Traceability Compliance System for FDA Rule 204",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def compute_event_hash(payload: dict) -> str:
"""SHA-256 hash of the CTE payload for immutability verification"""
canonical = json.dumps(payload, sort_keys=True, default=str)
return hashlib.sha256(canonical.encode()).hexdigest()
# ── Lot Management ──────────────────────────────────────────
@app.post("/api/v1/lots", status_code=201)
async def create_lot(
request: CreateLotRequest,
db: AsyncSession = Depends(get_db)
):
"""
Create a new traceable lot.
The TLC must be unique in the system.
"""
# Check TLC uniqueness
result = await db.execute(
text("SELECT id FROM traceability_lots WHERE tlc = :tlc"),
{"tlc": request.tlc}
)
if result.fetchone():
raise HTTPException(
status_code=409,
detail=f"TLC '{request.tlc}' already exists in the system"
)
# Build genealogy_path
genealogy_path = request.tlc
if request.parent_lot_ids:
parent_result = await db.execute(
text("SELECT genealogy_path, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.parent_lot_ids[0])}
)
parent = parent_result.fetchone()
if parent and parent.genealogy_path:
genealogy_path = f"{parent.genealogy_path}.{request.tlc}"
await db.execute(
text("""
INSERT INTO traceability_lots (
food_item_id, origin_location_id, tlc,
initial_quantity, remaining_quantity, unit_of_measure,
production_date, harvest_date, best_before_date, expiry_date,
parent_lot_ids, genealogy_path, attributes
) VALUES (
:food_item_id, :origin_location_id, :tlc,
:initial_quantity, :initial_quantity, :unit_of_measure,
:production_date, :harvest_date, :best_before_date, :expiry_date,
:parent_lot_ids::uuid[], :genealogy_path::ltree, :attributes::jsonb
)
"""),
{
"food_item_id": str(request.food_item_id),
"origin_location_id": str(request.origin_location_id),
"tlc": request.tlc,
"initial_quantity": request.initial_quantity,
"unit_of_measure": request.unit_of_measure,
"production_date": request.production_date,
"harvest_date": request.harvest_date,
"best_before_date": request.best_before_date,
"expiry_date": request.expiry_date,
"parent_lot_ids": [str(x) for x in request.parent_lot_ids],
"genealogy_path": genealogy_path,
"attributes": json.dumps(request.attributes)
}
)
await db.commit()
return {"message": "Lot created", "tlc": request.tlc}
# ── CTE Registration ────────────────────────────────────────
@app.post("/api/v1/cte-events", response_model=CTEEventResponse, status_code=201)
async def register_cte_event(
request: RegisterCTERequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
Register a Critical Tracking Event.
The record is IMMUTABLE once created (no update/delete).
"""
lot_result = await db.execute(
text("SELECT id, status, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.lot_id)}
)
lot = lot_result.fetchone()
if not lot:
raise HTTPException(status_code=404, detail="Lot not found")
if lot.status in ('recalled', 'disposed'):
raise HTTPException(
status_code=422,
detail=f"Cannot register CTE on lot with status '{lot.status}'"
)
payload = {
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp.isoformat(),
"kde": request.kde
}
event_hash = compute_event_hash(payload)
result = await db.execute(
text("""
INSERT INTO cte_events (
event_type, lot_id, location_id,
event_timestamp, recorded_by, kde,
destination_location_id, reference_document_number,
reference_document_type, quantity, unit_of_measure,
event_hash, notes
) VALUES (
:event_type, :lot_id, :location_id,
:event_timestamp, :recorded_by, :kde::jsonb,
:destination_location_id, :reference_document_number,
:reference_document_type, :quantity, :unit_of_measure,
:event_hash, :notes
)
RETURNING id, recorded_at
"""),
{
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp,
"recorded_by": request.recorded_by,
"kde": json.dumps(request.kde),
"destination_location_id": (
str(request.destination_location_id)
if request.destination_location_id else None
),
"reference_document_number": request.reference_document_number,
"reference_document_type": request.reference_document_type,
"quantity": request.quantity,
"unit_of_measure": request.unit_of_measure,
"event_hash": event_hash,
"notes": request.notes
}
)
row = result.fetchone()
await db.commit()
# Update remaining_quantity for shipping events
if request.event_type == "shipping" and request.quantity:
await db.execute(
text("""
UPDATE traceability_lots
SET remaining_quantity = remaining_quantity - :qty
WHERE id = :id
"""),
{"qty": request.quantity, "id": str(request.lot_id)}
)
await db.commit()
event_data = {
"id": str(row.id),
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"tlc": lot.tlc,
"kde": request.kde,
"event_timestamp": request.event_timestamp.isoformat()
}
background_tasks.add_task(publish_cte_event, event_data)
background_tasks.add_task(check_alert_rules, event_data, str(request.lot_id))
return CTEEventResponse(
id=row.id,
event_type=request.event_type,
lot_id=request.lot_id,
location_id=request.location_id,
event_timestamp=request.event_timestamp,
recorded_at=row.recorded_at,
recorded_by=request.recorded_by,
kde=request.kde,
event_hash=event_hash
)
# ── Traceback / Traceforward ────────────────────────────────
@app.get("/api/v1/lots/{lot_id}/traceback")
async def traceback(lot_id: UUID, depth: int = 10, db: AsyncSession = Depends(get_db)):
"""
One-step-up AND full traceback: traverses the supply chain upstream.
FDA requirement: complete within 24h of request.
"""
start_ms = time.monotonic()
root_result = await db.execute(
text("SELECT id, tlc, genealogy_path FROM traceability_lots WHERE id = :id"),
{"id": str(lot_id)}
)
root = root_result.fetchone()
if not root:
raise HTTPException(status_code=404, detail="Lot not found")
ancestors_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id, initial_quantity
FROM traceability_lots
WHERE genealogy_path @> :path::ltree
AND id != :lot_id
ORDER BY nlevel(genealogy_path) ASC
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
ancestors = [dict(r._mapping) for r in ancestors_result.fetchall()]
descendants_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id
FROM traceability_lots
WHERE genealogy_path <@ :path::ltree
AND id != :lot_id
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
descendants = [dict(r._mapping) for r in descendants_result.fetchall()]
events_result = await db.execute(
text("""
SELECT event_type, event_timestamp, recorded_by,
kde, location_id, event_hash
FROM cte_events
WHERE lot_id = :lot_id
ORDER BY event_timestamp ASC
"""),
{"lot_id": str(lot_id)}
)
events = [dict(r._mapping) for r in events_result.fetchall()]
elapsed_ms = int((time.monotonic() - start_ms) * 1000)
return TracebackResult(
lot_id=lot_id,
tlc=root.tlc,
ancestors=ancestors,
descendants=descendants,
events=events,
total_ancestors=len(ancestors),
total_descendants=len(descendants),
trace_depth=depth,
computation_ms=elapsed_ms
)
# ── FDA Sortable List ───────────────────────────────────────
@app.get("/api/v1/lots/sortable-list", response_model=SortableListResponse)
async def fda_sortable_list(
tlcs: str, # comma-separated TLC list
db: AsyncSession = Depends(get_db)
):
"""
Generate the electronic sortable list required by the FDA.
Must be producible within 24h of an FDA request (§1.1375).
"""
tlc_list = [t.strip() for t in tlcs.split(",")]
records_result = await db.execute(
text("""
SELECT
tl.tlc,
ce.event_type,
ce.event_timestamp,
ce.recorded_by,
ce.kde,
ce.quantity,
ce.unit_of_measure,
l.name AS location_name,
l.fda_facility_id,
ce.reference_document_number,
ce.event_hash
FROM cte_events ce
JOIN traceability_lots tl ON ce.lot_id = tl.id
JOIN locations l ON ce.location_id = l.id
WHERE tl.tlc = ANY(:tlcs)
ORDER BY tl.tlc, ce.event_timestamp ASC
"""),
{"tlcs": tlc_list}
)
records = []
for row in records_result.fetchall():
r = dict(row._mapping)
r['event_timestamp'] = r['event_timestamp'].isoformat()
records.append(r)
return SortableListResponse(
generated_at=__import__('datetime').datetime.utcnow(),
requesting_lots=tlc_list,
records=records,
total_records=len(records)
)
Immutable Event Sourcing with Apache Kafka
The immutable nature of CTEs maps perfectly to the Event Sourcing pattern. Every event is an immutable fact about what happened in the supply chain. Kafka ensures these events are:
- Durable: configurable retention (e.g. 7 years for FSMA 204)
- Replayable: to reconstruct state at any historical point
- Ordered per partition: guaranteed ordering per lot (partition by TLC)
- Distributed: scalable to millions of events per day
# fsma204/kafka_producer.py
# Kafka producer for CTE events
import json
import logging
from confluent_kafka import Producer
from datetime import datetime
logger = logging.getLogger(__name__)
# Avro schema for CTE event (for Schema Registry)
CTE_EVENT_SCHEMA = """
{
"type": "record",
"name": "CTEEvent",
"namespace": "com.fsma204.traceability",
"fields": [
{"name": "id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "lot_id", "type": "string"},
{"name": "tlc", "type": "string"},
{"name": "event_timestamp", "type": "string"},
{"name": "kde", "type": "string"},
{"name": "event_hash", "type": "string"}
]
}
"""
kafka_config = {
'bootstrap.servers': 'kafka:9092',
'client.id': 'fsma204-producer',
'acks': 'all', # Durability guarantee
'retries': 5,
'retry.backoff.ms': 200,
'compression.type': 'snappy'
}
producer = Producer(kafka_config)
def delivery_callback(err, msg):
if err:
logger.error(f"Kafka delivery failed: {err}")
else:
logger.debug(
f"CTE event delivered to {msg.topic()} "
f"partition {msg.partition()} offset {msg.offset()}"
)
async def publish_cte_event(event_data: dict):
"""
Publish a CTE event to Kafka.
Uses the TLC as partition key to guarantee ordering per lot.
"""
try:
message = json.dumps(event_data, default=str).encode('utf-8')
partition_key = event_data.get('tlc', event_data['lot_id'])
producer.produce(
topic='fsma204.cte.events',
key=partition_key.encode('utf-8'),
value=message,
callback=delivery_callback
)
producer.poll(0) # Async flush
except Exception as e:
logger.error(f"Failed to publish CTE event: {e}", exc_info=True)
# ── Consumer for Alert Engine ───────────────────────────────
# fsma204/kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import asyncio
async def start_cte_consumer():
"""
Kafka consumer for real-time alert processing.
Runs as a separate background service from the API.
"""
consumer_config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'fsma204-alert-engine',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Manual commit for at-least-once delivery
}
consumer = Consumer(consumer_config)
consumer.subscribe(['fsma204.cte.events'])
logger.info("CTE Consumer started, listening on fsma204.cte.events")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Kafka consumer error: {msg.error()}")
continue
try:
event = json.loads(msg.value().decode('utf-8'))
await process_cte_for_alerts(event)
consumer.commit(msg)
except Exception as e:
logger.error(f"Error processing CTE event: {e}", exc_info=True)
# Do not commit: the event will be reprocessed
finally:
consumer.close()
async def process_cte_for_alerts(event: dict):
"""Processing pipeline for each CTE event"""
event_type = event['event_type']
kde = json.loads(event.get('kde', '{}'))
# 1. Check cold chain for shipping/receiving events
if event_type in ('shipping', 'receiving'):
await check_cold_chain_continuity(event, kde)
# 2. Verify KDE completeness for the event type
await validate_kde_completeness(event_type, kde, event['lot_id'])
# 3. Check if the supplier is blacklisted
if 'supplier_fda_id' in kde:
await check_supplier_blacklist(kde['supplier_fda_id'], event['lot_id'])
# 4. Update in-memory traceability graph (NetworkX)
await update_traceability_graph(event)
Alert Engine: Real-Time Anomaly Detection
The alert engine monitors the continuous stream of CTE events and applies configurable rules. Notifications are sent across multiple channels based on severity. A critical alert can automatically trigger a recall workflow.
# fsma204/alert_engine.py
# Alert engine for FSMA 204 compliance monitoring
import asyncio
import logging
from datetime import datetime
from typing import Optional
import httpx
import smtplib
from email.mime.text import MIMEText
from twilio.rest import Client as TwilioClient
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .database import get_db_session
logger = logging.getLogger(__name__)
class AlertSeverity:
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertRule:
"""Base class for alert rules"""
def __init__(self, rule_id: str, name: str, severity: str):
self.rule_id = rule_id
self.name = name
self.severity = severity
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
"""Return a dict with alert details, or None if everything is OK"""
raise NotImplementedError
class ColdChainBreachRule(AlertRule):
"""
Detects cold chain breaks.
Rule: if the temperature recorded in KDEs exceeds the threshold
expected for the food category.
"""
TEMP_THRESHOLDS = {
'fresh_cheese': {'min': 1.0, 'max': 7.0},
'finfish': {'min': -2.0, 'max': 4.0},
'crustaceans': {'min': 0.0, 'max': 4.0},
'shell_eggs': {'min': 5.0, 'max': 7.2}, # FDA: < 45°F
'fresh_produce': {'min': 1.0, 'max': 10.0},
}
def __init__(self):
super().__init__("cold_chain_breach", "Cold Chain Breach", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
temp_recorded = kde.get('temperature_celsius')
if temp_recorded is None:
return None
result = await db.execute(
text("""
SELECT fi.ftl_category
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
WHERE tl.id = :lot_id
"""),
{"lot_id": event['lot_id']}
)
row = result.fetchone()
if not row:
return None
category = row.ftl_category
threshold = self.TEMP_THRESHOLDS.get(category)
if not threshold:
return None
breach = False
breach_detail = ""
if temp_recorded < threshold['min']:
breach = True
breach_detail = (
f"Temperature {temp_recorded}°C below minimum {threshold['min']}°C"
)
elif temp_recorded > threshold['max']:
breach = True
breach_detail = (
f"Temperature {temp_recorded}°C above maximum {threshold['max']}°C"
)
if breach:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"Cold Chain Breach - TLC {event.get('tlc', 'N/A')}",
"description": (
f"{breach_detail} for category {category}. "
f"CTE: {event['event_type']}, "
f"timestamp: {event['event_timestamp']}"
),
"lot_id": event['lot_id'],
"data": {
"temperature": temp_recorded,
"threshold": threshold,
"category": category,
"event_type": event['event_type']
}
}
return None
class MissingKDERule(AlertRule):
"""
Verifies that all mandatory KDEs are present for the CTE type.
Missing KDEs = FSMA 204 non-compliance.
"""
REQUIRED_KDE = {
'shipping': ['tlc', 'quantity_shipped', 'ship_date', 'receiver_name'],
'receiving': ['tlc_received', 'quantity_received', 'receive_date'],
'initial_packing': ['tlc', 'pack_date', 'quantity_packed', 'facility_fda_id'],
'growing': ['tlc', 'harvest_date', 'field_id', 'grower_fda_facility_id'],
'transformation': ['new_tlc', 'input_tlcs', 'transformation_date'],
}
def __init__(self):
super().__init__("missing_kde", "Missing KDE Fields", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
event_type = event['event_type']
required = self.REQUIRED_KDE.get(event_type, [])
if not required:
return None
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
missing = [f for f in required if f not in kde or kde[f] is None]
if missing:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"Missing KDEs - CTE {event_type}",
"description": (
f"CTE {event_type} for TLC {event.get('tlc')} "
f"is missing mandatory KDEs: {', '.join(missing)}"
),
"lot_id": event['lot_id'],
"data": {"missing_fields": missing, "event_type": event_type}
}
return None
# ── Notification System ──────────────────────────────────────
class MultiChannelNotifier:
def __init__(self, config: dict):
self.config = config
self.twilio = TwilioClient(
config['twilio_account_sid'],
config['twilio_auth_token']
)
async def send_alert(self, alert_data: dict, rule_config: dict):
channels = rule_config.get('channels', ['email'])
severity = alert_data['severity']
tasks = []
if 'email' in channels:
tasks.append(self._send_email(alert_data, rule_config))
if 'sms' in channels and severity in ('high', 'critical'):
tasks.append(self._send_sms(alert_data, rule_config))
if 'webhook' in channels:
tasks.append(self._send_webhook(alert_data, rule_config))
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
logger.error(f"Notification failed: {r}")
async def _send_email(self, alert: dict, config: dict):
recipients = config.get('recipients', {}).get('email', [])
msg = MIMEText(
f"FSMA 204 Alert\n\n"
f"Severity: {alert['severity'].upper()}\n"
f"Title: {alert['title']}\n"
f"Description: {alert['description']}\n"
f"Lot ID: {alert['lot_id']}\n"
f"Timestamp: {datetime.utcnow().isoformat()}"
)
msg['Subject'] = f"[FSMA204-{alert['severity'].upper()}] {alert['title']}"
msg['From'] = self.config['smtp_from']
msg['To'] = ', '.join(recipients)
with smtplib.SMTP(self.config['smtp_host'], self.config['smtp_port']) as server:
server.starttls()
server.login(self.config['smtp_user'], self.config['smtp_password'])
server.sendmail(self.config['smtp_from'], recipients, msg.as_string())
async def _send_sms(self, alert: dict, config: dict):
phones = config.get('recipients', {}).get('phone', [])
message_body = (
f"FSMA204 ALERT {alert['severity'].upper()}: "
f"{alert['title']} - Lot: {alert['lot_id']}"
)
for phone in phones:
self.twilio.messages.create(
body=message_body,
from_=self.config['twilio_from'],
to=phone
)
async def _send_webhook(self, alert: dict, config: dict):
webhook_url = config.get('recipients', {}).get('webhook_url')
if not webhook_url:
return
async with httpx.AsyncClient() as client:
await client.post(
webhook_url,
json={
"event": "fsma204_alert",
"severity": alert['severity'],
"data": alert
},
timeout=10.0
)
# ── Main check function ──────────────────────────────────────
ACTIVE_RULES = [
ColdChainBreachRule(),
MissingKDERule(),
# Add: SupplierBlacklistRule(), LotAgeRule(), etc.
]
async def check_alert_rules(event: dict, lot_id: str):
"""Called as a background task after each CTE registration"""
async with get_db_session() as db:
for rule in ACTIVE_RULES:
try:
alert_data = await rule.evaluate(event, db)
if alert_data:
await db.execute(
text("""
INSERT INTO alerts (rule_id, lot_id, severity, title, description, data)
SELECT ar.id, :lot_id, :severity, :title, :description, :data::jsonb
FROM alert_rules ar WHERE ar.name = :rule_name
"""),
{
"lot_id": lot_id,
"severity": alert_data['severity'],
"title": alert_data['title'],
"description": alert_data['description'],
"data": json.dumps(alert_data.get('data', {})),
"rule_name": rule.name
}
)
await db.commit()
# Auto-trigger recall if critical
if alert_data['severity'] == AlertSeverity.CRITICAL:
await auto_trigger_recall(lot_id, alert_data, db)
logger.warning(
f"Alert generated: {alert_data['severity']} - {alert_data['title']}"
)
except Exception as e:
logger.error(f"Error evaluating rule {rule.rule_id}: {e}", exc_info=True)
Recall Management: Complete Workflow in Under 24 Hours
The most demanding requirement of FSMA 204 is the ability to complete traceback and traceforward and deliver the sortable list to the FDA within 24 hours of the request. An automated recall workflow is the only way to guarantee this SLA at scale. In 2024, the FDA recorded 241 food recalls, with an average completion time of 73 hours — well beyond the new rule's requirements.
# fsma204/recall_manager.py
# Recall Management with automated workflow
import asyncio
import json
import logging
from datetime import datetime
from uuid import UUID, uuid4
from typing import List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
import networkx as nx
logger = logging.getLogger(__name__)
class RecallWorkflow:
"""
Manages the complete FSMA 204 recall workflow.
Phases:
1. IDENTIFIED - Problem identification
2. SCOPE_DETERMINED - Scope calculation (traceback + traceforward)
3. NOTIFICATIONS_SENT - Notifications to all affected operators
4. REMOVAL_IN_PROGRESS - Market removal in progress
5. EFFECTIVENESS_CHECK - Effectiveness verification
6. CLOSED / FDA_REPORTED
"""
def __init__(self, db: AsyncSession, notifier):
self.db = db
self.notifier = notifier
async def initiate_recall(
self,
trigger_lot_ids: List[UUID],
trigger_reason: str,
trigger_description: str,
initiated_by: str,
recall_type: str = "class_i"
) -> dict:
"""
Initiates a recall. This method must complete
traceback + traceforward + scope in < 30 minutes
to meet the overall 24h FDA requirement.
"""
recall_number = self._generate_recall_number()
start_time = datetime.utcnow()
logger.info(
f"Recall {recall_number} initiated by {initiated_by} "
f"for {len(trigger_lot_ids)} lots"
)
# Phase 1: Create recall record
recall_id = await self._create_recall_record(
recall_number, trigger_lot_ids, trigger_reason,
trigger_description, initiated_by, recall_type
)
# Phase 2: Determine scope (traceback + traceforward)
scope_start = datetime.utcnow()
affected_lots = await self._determine_scope(trigger_lot_ids)
scope_seconds = int((datetime.utcnow() - scope_start).total_seconds())
# Phase 3: Update recall with scope and statuses
await self._update_recall_scope(recall_id, affected_lots, scope_seconds)
# Phase 4: Generate FDA Recall Notice
fda_report = await self._generate_fda_report(
recall_number, trigger_lot_ids, affected_lots,
trigger_reason, trigger_description
)
# Phase 5: Mark lots as recalled
await self._mark_lots_recalled(affected_lots)
# Phase 6: Send notifications to all operators
notify_start = datetime.utcnow()
await self._send_recall_notifications(affected_lots, recall_number, trigger_description)
notify_seconds = int((datetime.utcnow() - notify_start).total_seconds())
await self.db.execute(
text("""
UPDATE recall_events SET
scope_determined_at = NOW(),
notifications_sent_at = NOW(),
status = 'notifications_sent',
fda_report_json = :fda_report::jsonb,
traceback_seconds = :traceback_seconds,
lots_traced_pct = :lots_pct
WHERE id = :id
"""),
{
"id": str(recall_id),
"fda_report": json.dumps(fda_report),
"traceback_seconds": scope_seconds,
"lots_pct": 100.0
}
)
await self.db.commit()
total_seconds = int((datetime.utcnow() - start_time).total_seconds())
logger.info(
f"Recall {recall_number} scope determined in {scope_seconds}s, "
f"notifications in {notify_seconds}s, total {total_seconds}s"
)
return {
"recall_number": recall_number,
"recall_id": str(recall_id),
"affected_lots_count": len(affected_lots),
"scope_determination_seconds": scope_seconds,
"notification_seconds": notify_seconds,
"total_seconds": total_seconds,
"fda_report": fda_report
}
async def _determine_scope(self, trigger_lot_ids: List[UUID]) -> List[str]:
"""
Determines scope using graph traversal.
Uses NetworkX BFS on the genealogy graph.
"""
G = await self._build_traceability_graph()
affected = set()
for lot_id in trigger_lot_ids:
lot_str = str(lot_id)
if lot_str not in G:
continue
ancestors = nx.ancestors(G, lot_str)
affected.update(ancestors)
descendants = nx.descendants(G, lot_str)
affected.update(descendants)
affected.add(lot_str)
return list(affected)
async def _build_traceability_graph(self) -> nx.DiGraph:
"""
Builds a directed lot genealogy graph
using shipping/receiving/transformation CTE events.
"""
G = nx.DiGraph()
lots_result = await self.db.execute(
text("SELECT id, tlc, status FROM traceability_lots WHERE status != 'archived'")
)
for row in lots_result.fetchall():
G.add_node(str(row.id), tlc=row.tlc, status=row.status)
edges_result = await self.db.execute(
text("""
SELECT
s.lot_id AS from_lot,
r.lot_id AS to_lot
FROM cte_events s
JOIN cte_events r ON s.reference_document_number = r.reference_document_number
WHERE s.event_type = 'shipping'
AND r.event_type = 'receiving'
""")
)
for row in edges_result.fetchall():
G.add_edge(str(row.from_lot), str(row.to_lot))
transform_result = await self.db.execute(
text("""
SELECT lot_id, related_lot_ids
FROM cte_events
WHERE event_type = 'transformation'
""")
)
for row in transform_result.fetchall():
for parent_id in (row.related_lot_ids or []):
G.add_edge(str(parent_id), str(row.lot_id))
return G
async def _generate_fda_report(
self,
recall_number: str,
trigger_lots: List[UUID],
affected_lots: List[str],
reason: str,
description: str
) -> dict:
"""
Generates the recall notice in FDA format.
Structure based on FDA 21 CFR 7.46.
"""
lots_result = await self.db.execute(
text("""
SELECT tl.tlc, tl.initial_quantity, tl.unit_of_measure,
fi.name AS food_name, fi.ftl_category,
l.name AS origin_name, l.fda_facility_id,
tl.production_date, tl.best_before_date
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
JOIN locations l ON tl.origin_location_id = l.id
WHERE tl.id = ANY(:affected)
"""),
{"affected": affected_lots}
)
affected_details = [dict(r._mapping) for r in lots_result.fetchall()]
return {
"recall_number": recall_number,
"report_date": datetime.utcnow().isoformat(),
"regulation": "FSMA Section 204 - 21 CFR Part 1 Subpart S",
"recall_type": "Voluntary",
"reason": reason,
"description": description,
"affected_products": affected_details,
"total_affected_lots": len(affected_lots),
"fsma_compliance": {
"rule": "Food Traceability Final Rule",
"response_format": "Electronic Sortable",
"24h_compliance": True
}
}
def _generate_recall_number(self) -> str:
year = datetime.utcnow().year
unique = str(uuid4())[:8].upper()
return f"RECALL-{year}-{unique}"
async def _create_recall_record(
self, recall_number, trigger_lot_ids, reason,
description, initiated_by, recall_type
) -> UUID:
result = await self.db.execute(
text("""
INSERT INTO recall_events (
recall_number, recall_type, trigger_lot_ids,
trigger_reason, trigger_description, initiated_by
) VALUES (
:number, :type, :lots::uuid[],
:reason, :desc, :by
)
RETURNING id
"""),
{
"number": recall_number,
"type": recall_type,
"lots": [str(l) for l in trigger_lot_ids],
"reason": reason,
"desc": description,
"by": initiated_by
}
)
row = result.fetchone()
await self.db.commit()
return row.id
async def _mark_lots_recalled(self, affected_lot_ids: List[str]):
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'recalled', updated_at = NOW()
WHERE id = ANY(:ids::uuid[])
"""),
{"ids": affected_lot_ids}
)
await self.db.commit()
async def _update_recall_scope(self, recall_id, affected_lots, scope_seconds):
await self.db.execute(
text("""
UPDATE recall_events SET
affected_lot_ids = :lots::uuid[],
status = 'scope_determined',
traceback_seconds = :seconds
WHERE id = :id
"""),
{
"lots": affected_lots,
"seconds": scope_seconds,
"id": str(recall_id)
}
)
await self.db.commit()
async def _send_recall_notifications(
self,
affected_lot_ids: List[str],
recall_number: str,
description: str
):
"""Notify all operators that have handled the affected lots"""
contacts_result = await self.db.execute(
text("""
SELECT DISTINCT l.contact_email, l.contact_phone, l.name
FROM cte_events ce
JOIN locations l ON ce.location_id = l.id
WHERE ce.lot_id = ANY(:lots::uuid[])
AND l.contact_email IS NOT NULL
"""),
{"lots": affected_lot_ids}
)
for contact in contacts_result.fetchall():
alert_payload = {
"severity": "critical",
"title": f"RECALL NOTICE {recall_number}",
"description": (
f"Lots you received/shipped are involved in recall {recall_number}. "
f"Reason: {description}. "
"Immediately suspend distribution and contact your compliance officer."
),
"lot_id": "multiple"
}
rule_config = {
"channels": ["email"],
"recipients": {
"email": [contact.contact_email],
"phone": [contact.contact_phone] if contact.contact_phone else []
}
}
try:
await self.notifier.send_alert(alert_payload, rule_config)
except Exception as e:
logger.error(f"Failed to notify {contact.name}: {e}")
Mock Recall Drill: Periodic Performance Testing
The FDA strongly recommends mock recall drills — periodic simulations of the recall process to verify that the system meets the 24-hour requirement. A drill should be run at least every 6 months and results documented for audit purposes. Key metrics are: total completion time, percentage of lots traced, time to determine scope.
# fsma204/mock_recall_drill.py
# Mock recall drill with performance metrics
import asyncio
import random
import time
import json
import logging
from datetime import datetime
from typing import Dict, Any
logger = logging.getLogger(__name__)
class MockRecallDrill:
"""
Simulates a recall scenario to test system performance.
The drill uses MOCK lots (not production) with synthetic data.
"""
def __init__(self, db, recall_workflow, lot_count: int = 100):
self.db = db
self.workflow = recall_workflow
self.lot_count = lot_count
self.drill_id = f"DRILL-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
async def run_drill(self) -> Dict[str, Any]:
"""
Runs the complete drill and returns performance metrics.
"""
logger.info(f"Starting mock recall drill {self.drill_id}")
metrics = {
"drill_id": self.drill_id,
"started_at": datetime.utcnow().isoformat(),
"phases": {}
}
# Phase 1: Seed mock data
phase_start = time.monotonic()
mock_lots = await self._seed_mock_data()
metrics['phases']['data_seeding'] = {
"duration_seconds": round(time.monotonic() - phase_start, 2),
"lots_created": len(mock_lots)
}
# Phase 2: Select random trigger lot from mid-supply-chain
trigger_lot = random.choice(mock_lots[10:40])
logger.info(f"Drill trigger lot: {trigger_lot['tlc']}")
# Phase 3: Execute recall
phase_start = time.monotonic()
recall_result = await self.workflow.initiate_recall(
trigger_lot_ids=[trigger_lot['id']],
trigger_reason="pathogen_contamination",
trigger_description=f"[DRILL] Mock Listeria contamination test - {self.drill_id}",
initiated_by="recall_drill_system",
recall_type="class_i"
)
recall_duration = time.monotonic() - phase_start
metrics['phases']['recall_execution'] = {
"duration_seconds": round(recall_duration, 2),
"scope_seconds": recall_result['scope_determination_seconds'],
"notification_seconds": recall_result['notification_seconds'],
"affected_lots": recall_result['affected_lots_count'],
"recall_number": recall_result['recall_number']
}
# Phase 4: Verify SLA compliance
sla_24h_seconds = 24 * 3600
compliance_ok = recall_duration < sla_24h_seconds
metrics['sla_compliance'] = {
"target_seconds": sla_24h_seconds,
"actual_seconds": round(recall_duration, 2),
"compliant": compliance_ok,
"margin_hours": round((sla_24h_seconds - recall_duration) / 3600, 1)
}
# Phase 5: Cleanup mock data
await self._cleanup_mock_data(mock_lots)
metrics['completed_at'] = datetime.utcnow().isoformat()
metrics['overall_pass'] = compliance_ok
self._log_drill_report(metrics)
return metrics
async def _seed_mock_data(self) -> list:
"""Creates synthetic lots with CTE events for the drill"""
mock_lots = []
for i in range(self.lot_count):
tlc = f"MOCK-{self.drill_id}-LOT-{i:04d}"
lot_data = {"id": None, "tlc": tlc, "is_mock": True}
mock_lots.append(lot_data)
logger.info(f"Created {len(mock_lots)} mock lots for drill {self.drill_id}")
return mock_lots
async def _cleanup_mock_data(self, mock_lots: list):
"""Removes mock data after the drill"""
from sqlalchemy import text
tlcs = [l['tlc'] for l in mock_lots]
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'archived'
WHERE tlc LIKE 'MOCK-%' AND tlc = ANY(:tlcs)
"""),
{"tlcs": tlcs}
)
await self.db.commit()
logger.info(f"Drill {self.drill_id} cleanup complete")
def _log_drill_report(self, metrics: dict):
report_lines = [
f"",
f"=== MOCK RECALL DRILL REPORT ===",
f"Drill ID: {metrics['drill_id']}",
f"Started: {metrics['started_at']}",
f"",
f"PHASES:",
]
for phase, data in metrics.get('phases', {}).items():
report_lines.append(f" {phase}: {json.dumps(data)}")
sla = metrics.get('sla_compliance', {})
status = "PASS" if sla.get('compliant') else "FAIL"
report_lines.extend([
f"",
f"SLA 24H COMPLIANCE: {status}",
f" Target: {sla.get('target_seconds')}s",
f" Actual: {sla.get('actual_seconds')}s",
f" Margin: {sla.get('margin_hours')}h",
f"",
f"OVERALL: {'PASS' if metrics.get('overall_pass') else 'FAIL'}",
f"================================",
])
logger.info('\n'.join(report_lines))
Regulatory Comparison: FSMA 204 vs EU Reg. 178/2002 vs UK Food Safety Act
For operators exporting to multiple markets, understanding the differences between regulatory frameworks is essential. While they share the principle of "one-step-up, one-step-down," they diverge significantly in granularity and penalties.
| Dimension | FSMA 204 (USA) | EU Reg. 178/2002 | UK Food Safety Act 1990 + Regs. |
|---|---|---|---|
| Scope | FTL foods only (specific list) | All food and feed | All food in the UK |
| Granularity | CTE + event-specific KDEs | Generic one-step-up / one-step-down | One-step-up / one-step-down |
| Response time to authority | 24 hours (electronic sortable list) | No specific deadline | No specific deadline |
| Record format | Electronic, sortable, interoperable | Any documented format | Any documented format |
| Retention | 24 months | Varies by category | Varies by category |
| Penalties | Up to $10,000/day per violation | Varies by member state | Up to GBP 20,000 + imprisonment |
| Applies to importers | Yes: applies to anyone importing into the US | Yes: for operators in the EU market | Yes: for operators in the UK market |
| Technology standards | None mandated (GS1 recommended) | None mandated | None mandated |
Practical Implication for Exporters
A company exporting fresh mozzarella to the US must comply with both EU Reg. 178/2002 and FSMA 204. Because FSMA 204 is more stringent (specific CTE/KDEs, 24h response, electronic format), a system that is FSMA 204 compliant is automatically compliant with the EU Regulation as well. The optimal strategy is to implement FSMA 204 as the baseline and adapt it for EU/UK requirements.
Impact on Italian Exporters to the US Market
Italy is the third-largest exporter of food and beverage products to the US (after Canada and Mexico), with an estimated value of approximately 8 billion euros in 2024. The main categories covered by FSMA 204:
| Category | IT-USA Export 2024 (est.) | FSMA 204 coverage | Required action |
|---|---|---|---|
| Wine | ~$2.1 billion | No (not on FTL) | FDA registration only |
| Aged cheeses (Parmigiano, Grana, Pecorino) | ~$350 million | No (aged excluded) | FDA registration + Prior Notice only |
| Fresh cheeses (mozzarella, burrata, ricotta) | ~$120 million | YES — critical priority | Full CTE/KDE implementation |
| Olive oil | ~$706 million | No | FDA registration only |
| Preserved vegetables (tomatoes, passata) | ~$200 million | Partial (fresh tomatoes are on FTL) | Verify whether fresh ingredients remain unprocessed |
Estimated Compliance Costs for an Italian Food SME
| Component | Setup (one-time) | Annual operating cost |
|---|---|---|
| Traceability software (SaaS or custom) | €15,000 – €80,000 | €8,000 – €20,000/year |
| Existing ERP/MES integration | €10,000 – €40,000 | €2,000 – €5,000/year |
| Hardware (scanners, RFID readers, sensors) | €5,000 – €25,000 | €1,000 – €3,000/year (maintenance) |
| Staff training | €3,000 – €10,000 | €1,500 – €3,000/year |
| Legal / FDA compliance consulting | €5,000 – €20,000 | €3,000 – €8,000/year |
| Estimated total | €38,000 – €175,000 | €15,500 – €39,000/year |
Available Funding: PNRR Transizione 5.0
Italian food SMEs can access PNRR Transizione 5.0 funds (12.7 billion EUR allocated, tax credit up to 45%) for the implementation costs of digital traceability systems. FSMA 204 systems qualify as "digitization of production processes" when they integrate IoT sensors for cold chain monitoring. Consult your accountant to confirm eligibility.
Recommended Compliance Roadmap for Italian SMEs
- Months 1–2: Gap analysis — map existing flows, identify FTL products, assess current IT systems
- Months 3–4: Database design + API development (use the code in this article as a baseline)
- Months 5–6: Integration with existing ERP (SAP, Dynamics, Sage), operator training
- Months 7–8: Pilot on one product/line, verify KDE completeness, first mock drill
- Months 9–10: Full rollout, alert engine live, FDA documentation prepared
- Months 11–12: Second mock drill, fine-tuning, preparation for FDA audit
Best Practices and Anti-Patterns
Best Practices
- Unique, unambiguous TLCs: avoid visually similar characters (I/l/1, O/0). Use GS1-128 or SSCC as the standard for interoperability with US supply chain partners
- Immutability enforced at the DB level: use PostgreSQL triggers that prevent UPDATE/DELETE on cte_events — not just code conventions
- Event hash for audit trail: SHA-256 of the payload at creation time allows verification that records have not been altered
- Test the 24h SLA regularly: run mock drills every 6 months with synthetic data, document the results
- Synchronize with GS1: use GLN (Global Location Number) for location_id values — compatible with EDI and US partner systems
- KDE versioning: FSMA 204 may evolve; keep a kde_schema_version field in records to facilitate future migrations
Anti-Patterns to Avoid
- Paper-only KDEs: many companies maintain KDEs in PDFs or Excel spreadsheets. It is impossible to meet the "electronic, sortable format" 24h requirement with these systems
- Non-standard TLCs: using internal lot numbers that are not shared with downstream partners makes cross-company traceback impossible
- Alerts without escalation: an alert system that sends emails but has no escalation protocol for criticals (no response within 2h triggers an automated call) does not meet the spirit of FSMA
- Internal-only traceability: FSMA 204 requires one-step-up AND one-step-down with partners. An internal-only system is not sufficient
- No UTC synchronization: CTE timestamps must be in UTC or with an explicit offset. Ambiguous timestamps make it impossible to reconstruct the timeline in an international recall
Conclusions and Next Steps
FSMA Rule 204 represents a paradigm shift: from traceability as a paper document to traceability as a real-time software infrastructure. The system built in this article — PostgreSQL for storage, FastAPI for the API layer, Kafka for event streaming, NetworkX for graph traversal — is capable of satisfying the most critical requirement: a complete sortable list to the FDA within 24 hours of any request.
The good news for companies planning ahead: the deadline extension to July 2028 provides sufficient time for a gradual implementation. The bad news: organizations that wait until 2027 to begin will find a market of consultants and software vendors with inflated prices and long wait times.
The 2024 data speaks clearly: 241 FDA food recalls, 1,392 people sickened, hospitalizations more than doubled compared to 2023. Granular traceability is not bureaucracy — it is prevention measurable in human lives.
FoodTech Series: Next Article
The next article in the series is 07 — Vertical Farming: Dashboard and Automated Control, where we will build a real-time control system for hydroponic farms: IoT sensors (light, CO2, pH, EC), a Streamlit dashboard, and optimization algorithms to maximize yield per square meter. Stay tuned on federicocalo.dev.







