Automazione FSMA 204: Tracking, Alert e Recall via Python
Il 20 gennaio 2026 avrebbe dovuto segnare uno spartiacque nella storia della food safety americana. La FSMA Rule 204 — Food Traceability Final Rule della FDA richiedeva a tutti gli operatori della filiera alimentare di implementare sistemi di tracciabilità granulare per decine di categorie di alimenti ad alto rischio. Un aggiornamento normativo senza precedenti dal Food Safety Modernization Act del 2011, con ripercussioni globali: chiunque esporti verso il mercato USA — incluse le aziende italiane di vino, formaggi e olio d'oliva — era nel mirino.
A marzo 2025, la FDA ha esteso la deadline di 30 mesi (al 20 luglio 2028), ma la direzione è inequivocabile: la tracciabilità granulare per gli alimenti ad alto rischio non è più opzionale. Le aziende che iniziano oggi l'implementazione avranno un vantaggio competitivo enorme quando la compliance diventerà obbligatoria. In questo articolo costruiamo un sistema completo in Python — dal database design all'API REST, dall'alert engine al recall management — che soddisfa i requisiti FSMA 204.
Cosa Imparerai
- La struttura completa di FSMA 204: Food Traceability List, CTE e KDE
- Database design PostgreSQL per tracciabilità multi-livello
- API REST FastAPI con Pydantic models per registrazione CTE e query one-up/one-down
- Event Sourcing immutabile con Apache Kafka per audit trail completo
- Alert engine per anomalie (cold chain, lotti sospetti) con notifiche multi-canale
- Recall management workflow: traceback/traceforward in meno di 24 ore (requisito FDA)
- Mock recall drill con metriche di performance
- Confronto FSMA 204 vs EU Reg. 178/2002 e impatto sugli esportatori italiani
La Serie FoodTech: Dove Siamo
これは、 federicolo.dev の FoodTech シリーズの 6 番目の記事です。完全な地図は次のとおりです。
| # | タイトル | レベル | Stato |
|---|---|---|---|
| 01 | 精密農業向けの IoT パイプライン | 中級 | 発行済み |
| 02 | 食品品質管理のためのコンピュータビジョン | 中級 | 発行済み |
| 03 | 作物予測のための ML とエッジ コンピューティング | 高度な | 発行済み |
| 04 | ブロックチェーンと透明な食品サプライチェーン | 高度な | 発行済み |
| 05 | Prophet と LSTM を使用した大規模小売取引の需要予測 | 高度な | 発行済み |
| 06 | FSMA 204 自動化: Python による追跡、アラート、リコール | 高度な | あなたはここにいる |
| 07 | 垂直農業: ダッシュボードと自動制御 | 中級 | すぐ |
| 08 | 作物モニタリング用のサテライト API | 高度な | すぐ |
| 09 | Streamlit を備えた農場からフォークまでのダッシュボード | 中級 | すぐ |
| 10 | サプライチェーンの回復力: OR ツールによる最適化 | 高度な | すぐ |
FSMA 規則 204: 2011 年以来最も重要な規制変更
2011 年の食品安全近代化法 (FSMA) は、食品安全に対する FDA のアプローチを事後対応型 (汚染への対応) から予防型 (汚染の防止) に転換しました。セクション 204 は、高リスク食品を特定し、追加のトレーサビリティ要件を課す権限を FDA に委任しました。結果は、 食品トレーサビリティ最終規則、2022年11月に出版されました。
目標は根本的なものです。発生または汚染が発生した場合、オペレーターは現場からフォークまでのロットの流通チェーン全体を特定し、記録を電子形式で FDA に提出できなければなりません。 リクエストから24時間。 FSMA 204 以前は、このプロセスには平均 7 ~ 10 日かかりました。汚染された食品による病気や死亡の減少への効果は、年間数百件の症例が回避されるとFDAは推定している。
食品トレーサビリティ リスト (FTL): 含まれる食品
FTL には、歴史的に重篤な感染症の発生に関連していた食品が含まれます。主なカテゴリは次のとおりです。
| カテゴリ | 具体例 | 主なリスク |
|---|---|---|
| フレッシュチーズ | ソフト/セミソフト生、フレッシュモッツァレラ、リコッタチーズ | リステリア・モノサイトゲネス |
| 殻付きの卵 | 殺菌されていない鶏の卵 | サルモネラ・エンテリティディス |
| シーフード — 魚 | マグロ、サーモン、タラ、カジキ(生/冷凍) | スコンブロイド、リステリア |
| 甲殻類 | エビ、カニ、ロブスター | ビブリオウイルス、ノロウイルス |
| 二枚貝 | 牡蠣、ハマグリ、ムール貝 | ノロウイルス、ビブリオウイルス |
| RTEサラダ | ポテトサラダ、卵、魚介類 | リステリア菌、サルモネラ菌 |
| 新鮮な果物と野菜 | キュウリ、ハーブ、葉物野菜、メロン、ピーマン、スプラウト、トマト、トロピカルフルーツ | 大腸菌O157:H7、サルモネラ菌 |
| ナッツバター | ピーナッツバター、アーモンド、カシューナッツ | サルモネラ |
イタリアの輸出業者に対する重要な注意事項
I フレッシュチーズ (モッツァレラ、ブッラータ、リコッタ、ストラッキーノ) は明示的に FTL に含まれています。これらの製品を米国に輸出するイタリア企業は、米国市場向けのすべてのバッチに対して FSMA 204 トレーサビリティを実装する必要があります。ハード熟成チーズ (パルミジャーノ レッジャーノ、グラナ パダーノ、ペコリーノ ロマーノ) は現在の FTL には含まれていません。
重大な追跡イベント (CTE)
CTE は、サプライ チェーンにおいてトレーサビリティを記録する必要がある瞬間です。 FSMA 204 では 6 つの主要なものが定義されており、それぞれに特定の KDE が含まれています。
| CTE | 説明 | 適用対象 |
|---|---|---|
| 成長する | 新鮮な食材を産地から直接集めます | 果物と野菜、ハーブ |
| 収穫 | 非養殖物(貝類、天然魚)の収穫 | 野生の魚介類 |
| 冷却 | 最初の収穫後の冷却作業 | 果物と野菜、魚介類 |
| 初期梱包 | 製品を初めて最終形態でパッケージ化するとき | すべてのFTLアイテム |
| 配送 | 所有権または保管権の譲渡 | すべてのFTLアイテム |
| 受信中 | 他のオペレーターから FTL アイテムを受信する | すべてのFTLアイテム |
| 変換 | FTLアイテムが新製品に組み込まれる場合 | トランスフォーマー/プロデューサー |
CTE の主要データ要素 (KDE)
各 CTE に対して、ルールは必須の KDE を定義します。例 配送 CTE:
- トレーサビリティ ロット コード (TLC) — 一意のバッチ識別子
- 出荷される数量と測定単位
- 発送日
- 出荷ポイントの場所の説明 (TLC ソース参照)
- 輸送書類の参照番号
- 受取人の名前と住所
- 食品の説明(FDA食品施設登録番号を含む)
のために 受信CTE:
- TLC を受け取りました (前回の出荷から)
- 受領数量
- 受領日
- 受付場所の場所
- 出荷書類への参照
- TLCソース(バッチ原点)
FSMA 204 コンプライアンス システムのアーキテクチャ
運用環境の FSMA 204 準拠システムは、次の 3 つの主要なフローを管理する必要があります。 リアルタイムでデータをキャプチャする 手術中に、 処理と保管 不変の方法でイベントを記録し、 素早い対応 FDA の要請またはリコールの場合。私たちが提案するアーキテクチャはイベント駆動型でクラウドネイティブです。
テクノロジースタック
- API レイヤー: FastAPI (Python 3.11+) — CTE 登録用の REST エンドポイント
- プライマリデータベース: PostgreSQL 16 — トレーサビリティのためのリレーショナル スキーマ
- イベントストリーミング: Apache Kafka — 不変メッセージとしての CTE イベント
- グラフの走査: NetworkX または Neo4j — サプライ チェーンのトレースバック/トレースフォワード
- アラートエンジン: Python + Celery + Redis — マルチチャネル非同期ルール
- リコール管理: FastAPI + PostgreSQL ストアド プロシージャ
- 報告: JSON/XMLでのFDAリコール通知の自動生成
エンドツーエンドの流れ:
- データキャプチャ: オペレーターはモバイル/ウェブ API または ERP 統合を通じて CTE を記録します
- イベント処理: 各 CTE は検証され、強化され、Kafka 上で公開されます
- トレーサビリティグラフ: ロットの家系図グラフはリアルタイムで更新されます
- アラート エンジン: イベント ルールは異常を検出し、通知を送信します
- リコール管理: 重大なアラートが発生した場合、自動的にリコールワークフローが開始されます。
- FDA の報告: 24時間以内に電子形式でソート可能なリストを生成
FSMA 204 トレーサビリティのための PostgreSQL データベース設計
このスキームは、次の 2 つの相反する要件を満たす必要があります。 日常業務のパフォーマンス (CTE 挿入、ビジネス クエリ) e リコールの場合のトレースバッククエリの超高速化。 CTE の KDE 変数には、リレーショナル テーブルと JSONB インデックスを組み合わせた PostgreSQL を使用します。
-- ============================================================
-- FSMA 204 Compliance Database Schema
-- PostgreSQL 16
-- ============================================================
-- Estensione per UUID generation
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
CREATE EXTENSION IF NOT EXISTS "ltree"; -- Per path-based queries su genealogia
-- ============================================================
-- TABELLA: locations
-- Tutti i luoghi della filiera (aziende, magazzini, porti)
-- ============================================================
CREATE TABLE locations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
fda_facility_id VARCHAR(12) UNIQUE, -- FDA Food Facility Registration Number
name VARCHAR(255) NOT NULL,
address_line1 VARCHAR(255) NOT NULL,
city VARCHAR(100) NOT NULL,
state_province VARCHAR(100),
country_code CHAR(2) NOT NULL, -- ISO 3166-1 alpha-2
postal_code VARCHAR(20),
location_type VARCHAR(50) NOT NULL
CHECK (location_type IN (
'farm', 'packing_house', 'processor',
'distributor', 'retailer', 'port', 'cold_storage'
)),
contact_email VARCHAR(255),
contact_phone VARCHAR(50),
coordinates POINT, -- lat/lon per geo-queries
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: food_items
-- Catalogo alimenti FTL con classificazione FSMA
-- ============================================================
CREATE TABLE food_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
ftl_category VARCHAR(50) NOT NULL
CHECK (ftl_category IN (
'fresh_cheese', 'shell_eggs', 'finfish',
'crustaceans', 'bivalve_mollusks', 'nut_butter',
'rte_deli_salads', 'fresh_produce'
)),
name VARCHAR(255) NOT NULL,
description TEXT,
fda_product_code VARCHAR(7), -- FDA product code
unit_of_measure VARCHAR(20) NOT NULL DEFAULT 'kg',
storage_temp_min DECIMAL(5,2), -- Celsius
storage_temp_max DECIMAL(5,2),
shelf_life_days INTEGER,
is_ftl_covered BOOLEAN DEFAULT true,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: traceability_lots
-- Core: ogni lotto tracciabile con TLC (Traceability Lot Code)
-- ============================================================
CREATE TABLE traceability_lots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tlc VARCHAR(50) UNIQUE NOT NULL, -- Traceability Lot Code
food_item_id UUID NOT NULL REFERENCES food_items(id),
origin_location_id UUID NOT NULL REFERENCES locations(id),
-- Quantità e dimensione lotto
initial_quantity DECIMAL(12,4) NOT NULL,
remaining_quantity DECIMAL(12,4) NOT NULL,
unit_of_measure VARCHAR(20) NOT NULL,
-- Date critiche
production_date DATE,
harvest_date DATE,
best_before_date DATE,
expiry_date DATE,
-- Genealogia: da quali lotti parent e stato derivato
parent_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
genealogy_path ltree, -- Es: "LOT_A.LOT_B.LOT_C"
-- Stato del lotto
status VARCHAR(20) NOT NULL DEFAULT 'active'
CHECK (status IN (
'active', 'recalled', 'quarantined',
'consumed', 'disposed', 'archived'
)),
-- Attributi FDA specifici
tlc_source_reference TEXT, -- Descrizione provenienza TLC
-- Metadati extra (variabili per tipo alimento)
attributes JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indice per query di genealogia rapida
CREATE INDEX idx_lots_genealogy ON traceability_lots USING GIST (genealogy_path);
CREATE INDEX idx_lots_status ON traceability_lots(status);
CREATE INDEX idx_lots_tlc ON traceability_lots(tlc);
CREATE INDEX idx_lots_food_item ON traceability_lots(food_item_id);
CREATE INDEX idx_lots_origin ON traceability_lots(origin_location_id);
CREATE INDEX idx_lots_attributes ON traceability_lots USING GIN (attributes);
-- ============================================================
-- TABELLA: cte_events
-- Registro immutabile di tutti i Critical Tracking Events
-- Questa tabella NON deve mai avere UPDATE o DELETE
-- ============================================================
CREATE TABLE cte_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(30) NOT NULL
CHECK (event_type IN (
'growing', 'harvesting', 'cooling',
'initial_packing', 'shipping', 'receiving',
'transformation'
)),
-- Lotti coinvolti
lot_id UUID NOT NULL REFERENCES traceability_lots(id),
related_lot_ids UUID[] DEFAULT ARRAY[]::UUID[], -- per transformation
-- Localizzazione evento
location_id UUID NOT NULL REFERENCES locations(id),
-- Timestamp evento (quando e accaduto, non quando e stato registrato)
event_timestamp TIMESTAMPTZ NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW(),
recorded_by VARCHAR(255) NOT NULL, -- user/system che ha registrato
-- KDE come JSONB (struttura varia per tipo CTE)
kde JSONB NOT NULL,
-- Per spedizioni: destinatario
destination_location_id UUID REFERENCES locations(id),
-- Documento di riferimento (bill of lading, PO, etc.)
reference_document_number VARCHAR(100),
reference_document_type VARCHAR(50),
-- Quantità movimentata
quantity DECIMAL(12,4),
unit_of_measure VARCHAR(20),
-- Hash per immutabilita (SHA-256 del payload)
event_hash CHAR(64) NOT NULL,
-- Firma digitale opzionale
digital_signature TEXT,
-- Metadati
notes TEXT,
metadata JSONB DEFAULT '{}'
-- NO updated_at: questo record e immutabile
);
-- Gli eventi NON si modificano: trigger di protezione
CREATE RULE no_update_cte AS ON UPDATE TO cte_events DO INSTEAD NOTHING;
CREATE RULE no_delete_cte AS ON DELETE TO cte_events DO INSTEAD NOTHING;
CREATE INDEX idx_cte_lot ON cte_events(lot_id);
CREATE INDEX idx_cte_type ON cte_events(event_type);
CREATE INDEX idx_cte_timestamp ON cte_events(event_timestamp DESC);
CREATE INDEX idx_cte_location ON cte_events(location_id);
CREATE INDEX idx_cte_destination ON cte_events(destination_location_id);
CREATE INDEX idx_cte_kde ON cte_events USING GIN (kde);
-- ============================================================
-- TABELLA: recall_events
-- Gestione recall con workflow completo
-- ============================================================
CREATE TABLE recall_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
recall_number VARCHAR(20) UNIQUE NOT NULL, -- Es: "RECALL-2026-001"
recall_type VARCHAR(20) NOT NULL
CHECK (recall_type IN ('class_i', 'class_ii', 'class_iii', 'market_withdrawal')),
-- Root cause
trigger_lot_ids UUID[] NOT NULL, -- Lotti che hanno scatenato il recall
trigger_reason VARCHAR(50) NOT NULL
CHECK (trigger_reason IN (
'pathogen_contamination', 'allergen_undeclared',
'foreign_material', 'chemical_contamination',
'mislabeling', 'temperature_abuse', 'other'
)),
trigger_description TEXT NOT NULL,
-- Scope calcolato automaticamente
affected_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
affected_quantity DECIMAL(12,4),
affected_unit VARCHAR(20),
-- Workflow stato
status VARCHAR(30) NOT NULL DEFAULT 'identified'
CHECK (status IN (
'identified', 'scope_determined', 'notifications_sent',
'removal_in_progress', 'effectiveness_check',
'closed', 'fda_reported'
)),
-- Timestamp workflow
identified_at TIMESTAMPTZ DEFAULT NOW(),
scope_determined_at TIMESTAMPTZ,
notifications_sent_at TIMESTAMPTZ,
fda_reported_at TIMESTAMPTZ,
closed_at TIMESTAMPTZ,
-- FDA notification (entro 24h da richiesta)
fda_notified BOOLEAN DEFAULT false,
fda_report_json JSONB,
-- Responsabile
initiated_by VARCHAR(255) NOT NULL,
-- Metriche drill
traceback_seconds INTEGER, -- Tempo per completare traceback
traceforward_seconds INTEGER,
lots_traced_pct DECIMAL(5,2),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: alert_rules
-- Configurazione regole per alert engine
-- ============================================================
CREATE TABLE alert_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
description TEXT,
rule_type VARCHAR(30) NOT NULL
CHECK (rule_type IN (
'temperature_breach', 'lot_age', 'missing_cte',
'quantity_discrepancy', 'supplier_blacklist',
'pathogen_alert', 'custom'
)),
condition_json JSONB NOT NULL, -- Regola in formato JSON
severity VARCHAR(10) NOT NULL CHECK (severity IN ('low', 'medium', 'high', 'critical')),
channels TEXT[] NOT NULL, -- ['email', 'sms', 'webhook', 'slack']
recipients JSONB NOT NULL, -- {"email": [...], "phone": [...]}
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: alerts
-- Alert generati dall'engine
-- ============================================================
CREATE TABLE alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
rule_id UUID REFERENCES alert_rules(id),
lot_id UUID REFERENCES traceability_lots(id),
cte_event_id UUID REFERENCES cte_events(id),
severity VARCHAR(10) NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT NOT NULL,
data JSONB DEFAULT '{}',
status VARCHAR(20) NOT NULL DEFAULT 'open'
CHECK (status IN ('open', 'acknowledged', 'resolved', 'escalated')),
acknowledged_by VARCHAR(255),
acknowledged_at TIMESTAMPTZ,
resolved_at TIMESTAMPTZ,
auto_recall_triggered BOOLEAN DEFAULT false,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_alerts_lot ON alerts(lot_id);
CREATE INDEX idx_alerts_severity ON alerts(severity);
CREATE INDEX idx_alerts_status ON alerts(status);
CREATE INDEX idx_alerts_created ON alerts(created_at DESC);
FastAPI を使用した REST API: Pydantic モデルと CTE エンドポイント
API は、サプライ チェーン内のすべての事業者にとってのエントリ ポイントです。当社では、速度、Pydantic による自動検証、FDA 監査に役立つ OpenAPI ドキュメントの自動生成のために FastAPI を使用しています。 Pydantic テンプレートは、ルールで必要な KDE を反映します。
# fsma204/models.py
# Pydantic v2 models per FSMA 204 API
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List, Dict, Any
from datetime import datetime, date
from uuid import UUID
from enum import Enum
class CTEEventType(str, Enum):
GROWING = "growing"
HARVESTING = "harvesting"
COOLING = "cooling"
INITIAL_PACKING = "initial_packing"
SHIPPING = "shipping"
RECEIVING = "receiving"
TRANSFORMATION = "transformation"
class FTLCategory(str, Enum):
FRESH_CHEESE = "fresh_cheese"
SHELL_EGGS = "shell_eggs"
FINFISH = "finfish"
CRUSTACEANS = "crustaceans"
BIVALVE_MOLLUSKS = "bivalve_mollusks"
NUT_BUTTER = "nut_butter"
RTE_DELI_SALADS = "rte_deli_salads"
FRESH_PRODUCE = "fresh_produce"
class LotStatus(str, Enum):
ACTIVE = "active"
RECALLED = "recalled"
QUARANTINED = "quarantined"
CONSUMED = "consumed"
DISPOSED = "disposed"
# ── KDE Models per CTE type ──────────────────────────────────
class ShippingKDE(BaseModel):
"""Key Data Elements per Shipping CTE - FSMA 204 §1.1330"""
tlc: str = Field(..., description="Traceability Lot Code")
quantity_shipped: float = Field(..., gt=0)
unit_of_measure: str
ship_date: date
tlc_source_reference: str = Field(..., description="Descrizione provenienza TLC")
transport_document_number: str
transport_document_type: str = Field(default="bill_of_lading")
receiver_fda_facility_id: str = Field(..., description="FDA registration del destinatario")
receiver_name: str
receiver_address: str
class ReceivingKDE(BaseModel):
"""Key Data Elements per Receiving CTE - FSMA 204 §1.1335"""
tlc_received: str = Field(..., description="TLC come apparso nel documento di spedizione")
quantity_received: float = Field(..., gt=0)
unit_of_measure: str
receive_date: date
location_description: str
reference_document_number: str
reference_document_type: str
lot_code_generator_location: Optional[str] = None
class GrowingKDE(BaseModel):
"""Key Data Elements per Growing CTE"""
tlc: str
harvest_date: date
field_id: str # Identificativo campo/parcella
grower_fda_facility_id: str
commodity: str
growing_method: str = "conventional" # conventional, organic, hydroponic
class InitialPackingKDE(BaseModel):
"""Key Data Elements per Initial Packing CTE"""
tlc: str
pack_date: date
quantity_packed: float
unit_of_measure: str
packaging_type: str
facility_fda_id: str
lot_code_generator_location: str
class TransformationKDE(BaseModel):
"""Key Data Elements per Transformation CTE"""
new_tlc: str # TLC del nuovo prodotto
input_tlcs: List[str] # TLC degli ingredienti usati
transformation_date: date
facility_fda_id: str
product_description: str
quantity_produced: float
unit_of_measure: str
# ── Request/Response Models ──────────────────────────────────
class CreateLotRequest(BaseModel):
food_item_id: UUID
origin_location_id: UUID
tlc: str = Field(..., min_length=3, max_length=50)
initial_quantity: float = Field(..., gt=0)
unit_of_measure: str = Field(default="kg")
production_date: Optional[date] = None
harvest_date: Optional[date] = None
best_before_date: Optional[date] = None
expiry_date: Optional[date] = None
parent_lot_ids: List[UUID] = Field(default_factory=list)
attributes: Dict[str, Any] = Field(default_factory=dict)
@field_validator('tlc')
@classmethod
def validate_tlc(cls, v: str) -> str:
"""TLC non deve contenere caratteri ambigui"""
forbidden = set('IO0l') # caratteri ambigui
if any(c in forbidden for c in v.upper()):
raise ValueError("TLC contiene caratteri ambigui (I, O, 0, l)")
return v.upper()
class RegisterCTERequest(BaseModel):
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_by: str = Field(..., min_length=3)
kde: Dict[str, Any] = Field(..., description="KDE specifici per il tipo di CTE")
destination_location_id: Optional[UUID] = None
reference_document_number: Optional[str] = None
reference_document_type: Optional[str] = None
quantity: Optional[float] = None
unit_of_measure: Optional[str] = None
notes: Optional[str] = None
class CTEEventResponse(BaseModel):
id: UUID
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_at: datetime
recorded_by: str
kde: Dict[str, Any]
event_hash: str
class TracebackRequest(BaseModel):
lot_id: UUID
depth: int = Field(default=10, ge=1, le=50)
class TracebackResult(BaseModel):
lot_id: UUID
tlc: str
ancestors: List[Dict[str, Any]] # lotti upstream
descendants: List[Dict[str, Any]] # lotti downstream
events: List[Dict[str, Any]]
total_ancestors: int
total_descendants: int
trace_depth: int
computation_ms: int
class SortableListResponse(BaseModel):
"""Formato sortable list richiesto da FDA (§1.1375)"""
generated_at: datetime
requesting_lots: List[str] # TLC richiesti
records: List[Dict[str, Any]] # Record ordinabili
total_records: int
format_version: str = "FSMA204_v1"
# fsma204/api.py
# FastAPI application per FSMA 204 compliance
import hashlib
import json
import time
from uuid import UUID
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .models import (
CreateLotRequest, RegisterCTERequest,
CTEEventResponse, TracebackRequest,
TracebackResult, SortableListResponse
)
from .database import get_db
from .kafka_producer import publish_cte_event
from .alert_engine import check_alert_rules
app = FastAPI(
title="FSMA 204 Traceability API",
description="Food Traceability Compliance System per FDA Rule 204",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def compute_event_hash(payload: dict) -> str:
"""SHA-256 hash del payload CTE per immutabilita"""
canonical = json.dumps(payload, sort_keys=True, default=str)
return hashlib.sha256(canonical.encode()).hexdigest()
# ── Lot Management ──────────────────────────────────────────
@app.post("/api/v1/lots", status_code=201)
async def create_lot(
request: CreateLotRequest,
db: AsyncSession = Depends(get_db)
):
"""
Crea un nuovo lotto tracciabile.
Il TLC deve essere unico nel sistema.
"""
# Verifica unicita TLC
result = await db.execute(
text("SELECT id FROM traceability_lots WHERE tlc = :tlc"),
{"tlc": request.tlc}
)
if result.fetchone():
raise HTTPException(
status_code=409,
detail=f"TLC '{request.tlc}' già esistente nel sistema"
)
# Costruisce genealogy_path
genealogy_path = request.tlc
if request.parent_lot_ids:
# Recupera path del parent principale
parent_result = await db.execute(
text("SELECT genealogy_path, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.parent_lot_ids[0])}
)
parent = parent_result.fetchone()
if parent and parent.genealogy_path:
genealogy_path = f"{parent.genealogy_path}.{request.tlc}"
await db.execute(
text("""
INSERT INTO traceability_lots (
food_item_id, origin_location_id, tlc,
initial_quantity, remaining_quantity, unit_of_measure,
production_date, harvest_date, best_before_date, expiry_date,
parent_lot_ids, genealogy_path, attributes
) VALUES (
:food_item_id, :origin_location_id, :tlc,
:initial_quantity, :initial_quantity, :unit_of_measure,
:production_date, :harvest_date, :best_before_date, :expiry_date,
:parent_lot_ids::uuid[], :genealogy_path::ltree, :attributes::jsonb
)
"""),
{
"food_item_id": str(request.food_item_id),
"origin_location_id": str(request.origin_location_id),
"tlc": request.tlc,
"initial_quantity": request.initial_quantity,
"unit_of_measure": request.unit_of_measure,
"production_date": request.production_date,
"harvest_date": request.harvest_date,
"best_before_date": request.best_before_date,
"expiry_date": request.expiry_date,
"parent_lot_ids": [str(x) for x in request.parent_lot_ids],
"genealogy_path": genealogy_path,
"attributes": json.dumps(request.attributes)
}
)
await db.commit()
return {"message": "Lotto creato", "tlc": request.tlc}
# ── CTE Registration ────────────────────────────────────────
@app.post("/api/v1/cte-events", response_model=CTEEventResponse, status_code=201)
async def register_cte_event(
request: RegisterCTERequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
Registra un Critical Tracking Event.
Il record e IMMUTABILE una volta creato (no update/delete).
"""
# Verifica che il lotto esista e sia attivo
lot_result = await db.execute(
text("SELECT id, status, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.lot_id)}
)
lot = lot_result.fetchone()
if not lot:
raise HTTPException(status_code=404, detail="Lotto non trovato")
if lot.status in ('recalled', 'disposed'):
raise HTTPException(
status_code=422,
detail=f"Impossibile registrare CTE su lotto con stato '{lot.status}'"
)
# Calcola hash per immutabilita
payload = {
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp.isoformat(),
"kde": request.kde
}
event_hash = compute_event_hash(payload)
# Inserisce CTE (immutabile)
result = await db.execute(
text("""
INSERT INTO cte_events (
event_type, lot_id, location_id,
event_timestamp, recorded_by, kde,
destination_location_id, reference_document_number,
reference_document_type, quantity, unit_of_measure,
event_hash, notes
) VALUES (
:event_type, :lot_id, :location_id,
:event_timestamp, :recorded_by, :kde::jsonb,
:destination_location_id, :reference_document_number,
:reference_document_type, :quantity, :unit_of_measure,
:event_hash, :notes
)
RETURNING id, recorded_at
"""),
{
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp,
"recorded_by": request.recorded_by,
"kde": json.dumps(request.kde),
"destination_location_id": (
str(request.destination_location_id)
if request.destination_location_id else None
),
"reference_document_number": request.reference_document_number,
"reference_document_type": request.reference_document_type,
"quantity": request.quantity,
"unit_of_measure": request.unit_of_measure,
"event_hash": event_hash,
"notes": request.notes
}
)
row = result.fetchone()
await db.commit()
# Aggiorna remaining_quantity per spedizioni
if request.event_type == "shipping" and request.quantity:
await db.execute(
text("""
UPDATE traceability_lots
SET remaining_quantity = remaining_quantity - :qty
WHERE id = :id
"""),
{"qty": request.quantity, "id": str(request.lot_id)}
)
await db.commit()
# Background: pubblica su Kafka e verifica alert rules
event_data = {
"id": str(row.id),
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"tlc": lot.tlc,
"kde": request.kde,
"event_timestamp": request.event_timestamp.isoformat()
}
background_tasks.add_task(publish_cte_event, event_data)
background_tasks.add_task(check_alert_rules, event_data, str(request.lot_id))
return CTEEventResponse(
id=row.id,
event_type=request.event_type,
lot_id=request.lot_id,
location_id=request.location_id,
event_timestamp=request.event_timestamp,
recorded_at=row.recorded_at,
recorded_by=request.recorded_by,
kde=request.kde,
event_hash=event_hash
)
# ── Traceback / Traceforward ────────────────────────────────
@app.get("/api/v1/lots/{lot_id}/traceback")
async def traceback(lot_id: UUID, depth: int = 10, db: AsyncSession = Depends(get_db)):
"""
One-step-up AND full traceback: risale la filiera upstream.
Requisito FDA: completare entro 24h da richiesta.
"""
start_ms = time.monotonic()
# Recupera lotto root
root_result = await db.execute(
text("SELECT id, tlc, genealogy_path FROM traceability_lots WHERE id = :id"),
{"id": str(lot_id)}
)
root = root_result.fetchone()
if not root:
raise HTTPException(status_code=404, detail="Lotto non trovato")
# Usa ltree per trovare tutti gli antenati
ancestors_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id, initial_quantity
FROM traceability_lots
WHERE genealogy_path @> :path::ltree
AND id != :lot_id
ORDER BY nlevel(genealogy_path) ASC
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
ancestors = [dict(r._mapping) for r in ancestors_result.fetchall()]
# Trova tutti i discendenti (traceforward)
descendants_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id
FROM traceability_lots
WHERE genealogy_path <@ :path::ltree
AND id != :lot_id
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
descendants = [dict(r._mapping) for r in descendants_result.fetchall()]
# Recupera tutti gli eventi CTE per il lotto
events_result = await db.execute(
text("""
SELECT event_type, event_timestamp, recorded_by,
kde, location_id, event_hash
FROM cte_events
WHERE lot_id = :lot_id
ORDER BY event_timestamp ASC
"""),
{"lot_id": str(lot_id)}
)
events = [dict(r._mapping) for r in events_result.fetchall()]
elapsed_ms = int((time.monotonic() - start_ms) * 1000)
return TracebackResult(
lot_id=lot_id,
tlc=root.tlc,
ancestors=ancestors,
descendants=descendants,
events=events,
total_ancestors=len(ancestors),
total_descendants=len(descendants),
trace_depth=depth,
computation_ms=elapsed_ms
)
# ── FDA Sortable List ───────────────────────────────────────
@app.get("/api/v1/lots/sortable-list", response_model=SortableListResponse)
async def fda_sortable_list(
tlcs: str, # comma-separated TLC list
db: AsyncSession = Depends(get_db)
):
"""
Genera il sortable list in formato elettronico richiesto da FDA.
Deve essere producibile entro 24h da richiesta FDA (§1.1375).
"""
tlc_list = [t.strip() for t in tlcs.split(",")]
# Recupera tutti i record CTE per i TLC richiesti
records_result = await db.execute(
text("""
SELECT
tl.tlc,
ce.event_type,
ce.event_timestamp,
ce.recorded_by,
ce.kde,
ce.quantity,
ce.unit_of_measure,
l.name AS location_name,
l.fda_facility_id,
ce.reference_document_number,
ce.event_hash
FROM cte_events ce
JOIN traceability_lots tl ON ce.lot_id = tl.id
JOIN locations l ON ce.location_id = l.id
WHERE tl.tlc = ANY(:tlcs)
ORDER BY tl.tlc, ce.event_timestamp ASC
"""),
{"tlcs": tlc_list}
)
records = []
for row in records_result.fetchall():
r = dict(row._mapping)
r['event_timestamp'] = r['event_timestamp'].isoformat()
records.append(r)
return SortableListResponse(
generated_at=__import__('datetime').datetime.utcnow(),
requesting_lots=tlc_list,
records=records,
total_records=len(records)
)
Apache Kafka を使用した不変のイベント ソーシング
CTE の不変の性質は、イベント ソーシング パターンに完全に適合します。それぞれの出来事は、サプライチェーンで何が起こったかについての不変の事実です。 Kafka は、これらのイベントが次のとおりであることを保証します。
- 耐久性: 設定可能な保存期間 (例: FSMA 204 の場合は 7 年)
- 再生可能: 歴史上の任意の時点で国家を再構築する
- パーティションごとにソート: 一括発注保証(TLCによる分割)
- 配布: 1 日あたり数百万のイベントに拡張可能
# fsma204/kafka_producer.py
# Kafka producer per CTE events
import json
import logging
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from datetime import datetime
logger = logging.getLogger(__name__)
# Schema Avro per CTE event (per Schema Registry)
CTE_EVENT_SCHEMA = """
{
"type": "record",
"name": "CTEEvent",
"namespace": "com.fsma204.traceability",
"fields": [
{"name": "id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "lot_id", "type": "string"},
{"name": "tlc", "type": "string"},
{"name": "event_timestamp", "type": "string"},
{"name": "kde", "type": "string"},
{"name": "event_hash", "type": "string"}
]
}
"""
# Configurazione Kafka
kafka_config = {
'bootstrap.servers': 'kafka:9092',
'client.id': 'fsma204-producer',
'acks': 'all', # Garanzia durabilita
'retries': 5,
'retry.backoff.ms': 200,
'compression.type': 'snappy'
}
producer = Producer(kafka_config)
def delivery_callback(err, msg):
if err:
logger.error(f"Kafka delivery failed: {err}")
else:
logger.debug(
f"CTE event delivered to {msg.topic()} "
f"partition {msg.partition()} offset {msg.offset()}"
)
async def publish_cte_event(event_data: dict):
"""
Pubblica un CTE event su Kafka.
Usa il TLC come partition key per garantire ordine per lotto.
"""
try:
message = json.dumps(event_data, default=str).encode('utf-8')
# Partition key = TLC: tutti gli eventi dello stesso lotto
# vanno alla stessa partizione, garantendo ordinamento
partition_key = event_data.get('tlc', event_data['lot_id'])
producer.produce(
topic='fsma204.cte.events',
key=partition_key.encode('utf-8'),
value=message,
callback=delivery_callback
)
producer.poll(0) # Flush asincrono
except Exception as e:
logger.error(f"Failed to publish CTE event: {e}", exc_info=True)
# ── Consumer per Alert Engine ───────────────────────────────
# fsma204/kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import asyncio
async def start_cte_consumer():
"""
Consumer Kafka per il real-time alert processing.
Eseguito come background service separato dall'API.
"""
consumer_config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'fsma204-alert-engine',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Commit manuale per at-least-once
}
consumer = Consumer(consumer_config)
consumer.subscribe(['fsma204.cte.events'])
logger.info("CTE Consumer avviato, in ascolto su fsma204.cte.events")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Kafka consumer error: {msg.error()}")
continue
try:
event = json.loads(msg.value().decode('utf-8'))
await process_cte_for_alerts(event)
consumer.commit(msg)
except Exception as e:
logger.error(f"Error processing CTE event: {e}", exc_info=True)
# Non committiamo: l'evento verrà riprocessato
finally:
consumer.close()
async def process_cte_for_alerts(event: dict):
"""Pipeline di processing per ogni CTE event"""
event_type = event['event_type']
kde = json.loads(event.get('kde', '{}'))
# 1. Controlla cold chain per eventi di spedizione/ricezione
if event_type in ('shipping', 'receiving'):
await check_cold_chain_continuity(event, kde)
# 2. Verifica completezza KDE per il tipo di evento
await validate_kde_completeness(event_type, kde, event['lot_id'])
# 3. Controlla se il fornitore e in blacklist
if 'supplier_fda_id' in kde:
await check_supplier_blacklist(kde['supplier_fda_id'], event['lot_id'])
# 4. Aggiorna grafo di tracciabilita in memoria (NetworkX)
await update_traceability_graph(event)
アラート エンジン: リアルタイムの異常検出
アラート エンジンは、CTE イベントの継続的なフローを監視し、構成可能なルールを適用します。通知は重大度に基づいて複数のチャネルで送信されます。アラート critical リコールワークフローを自動的にトリガーできます。
# fsma204/alert_engine.py
# Alert engine per FSMA 204 compliance monitoring
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional
import httpx
import smtplib
from email.mime.text import MIMEText
from twilio.rest import Client as TwilioClient
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .database import get_db_session
logger = logging.getLogger(__name__)
class AlertSeverity:
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertRule:
"""Base class per le regole di alert"""
def __init__(self, rule_id: str, name: str, severity: str):
self.rule_id = rule_id
self.name = name
self.severity = severity
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
"""Ritorna un dict con i dettagli dell'alert, o None se ok"""
raise NotImplementedError
class ColdChainBreachRule(AlertRule):
"""
Rileva interruzioni nella catena del freddo.
Regola: se la temperatura registrata nei KDE supera la soglia
prevista per la categoria di alimento.
"""
TEMP_THRESHOLDS = {
'fresh_cheese': {'min': 1.0, 'max': 7.0},
'finfish': {'min': -2.0, 'max': 4.0},
'crustaceans': {'min': 0.0, 'max': 4.0},
'shell_eggs': {'min': 5.0, 'max': 7.2}, # FDA: < 45°F
'fresh_produce': {'min': 1.0, 'max': 10.0},
}
def __init__(self):
super().__init__("cold_chain_breach", "Cold Chain Breach", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
temp_recorded = kde.get('temperature_celsius')
if temp_recorded is None:
return None
# Recupera categoria alimento del lotto
result = await db.execute(
text("""
SELECT fi.ftl_category
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
WHERE tl.id = :lot_id
"""),
{"lot_id": event['lot_id']}
)
row = result.fetchone()
if not row:
return None
category = row.ftl_category
threshold = self.TEMP_THRESHOLDS.get(category)
if not threshold:
return None
breach = False
breach_detail = ""
if temp_recorded < threshold['min']:
breach = True
breach_detail = (
f"Temperatura {temp_recorded}°C sotto minimo {threshold['min']}°C"
)
elif temp_recorded > threshold['max']:
breach = True
breach_detail = (
f"Temperatura {temp_recorded}°C sopra massimo {threshold['max']}°C"
)
if breach:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"Cold Chain Breach - TLC {event.get('tlc', 'N/A')}",
"description": (
f"{breach_detail} per categoria {category}. "
f"CTE: {event['event_type']}, "
f"timestamp: {event['event_timestamp']}"
),
"lot_id": event['lot_id'],
"data": {
"temperature": temp_recorded,
"threshold": threshold,
"category": category,
"event_type": event['event_type']
}
}
return None
class MissingKDERule(AlertRule):
"""
Verifica che tutti i KDE obbligatori siano presenti per il tipo CTE.
Mancanza di KDE = non-compliance FSMA 204.
"""
REQUIRED_KDE = {
'shipping': ['tlc', 'quantity_shipped', 'ship_date', 'receiver_name'],
'receiving': ['tlc_received', 'quantity_received', 'receive_date'],
'initial_packing': ['tlc', 'pack_date', 'quantity_packed', 'facility_fda_id'],
'growing': ['tlc', 'harvest_date', 'field_id', 'grower_fda_facility_id'],
'transformation': ['new_tlc', 'input_tlcs', 'transformation_date'],
}
def __init__(self):
super().__init__("missing_kde", "Missing KDE Fields", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
event_type = event['event_type']
required = self.REQUIRED_KDE.get(event_type, [])
if not required:
return None
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
missing = [f for f in required if f not in kde or kde[f] is None]
if missing:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"KDE Mancanti - CTE {event_type}",
"description": (
f"CTE {event_type} per TLC {event.get('tlc')} "
f"manca dei KDE obbligatori: {', '.join(missing)}"
),
"lot_id": event['lot_id'],
"data": {"missing_fields": missing, "event_type": event_type}
}
return None
# ── Notification System ──────────────────────────────────────
class MultiChannelNotifier:
def __init__(self, config: dict):
self.config = config
self.twilio = TwilioClient(
config['twilio_account_sid'],
config['twilio_auth_token']
)
async def send_alert(self, alert_data: dict, rule_config: dict):
channels = rule_config.get('channels', ['email'])
severity = alert_data['severity']
tasks = []
if 'email' in channels:
tasks.append(self._send_email(alert_data, rule_config))
if 'sms' in channels and severity in ('high', 'critical'):
tasks.append(self._send_sms(alert_data, rule_config))
if 'webhook' in channels:
tasks.append(self._send_webhook(alert_data, rule_config))
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
logger.error(f"Notification failed: {r}")
async def _send_email(self, alert: dict, config: dict):
recipients = config.get('recipients', {}).get('email', [])
msg = MIMEText(
f"FSMA 204 Alert\n\n"
f"Severity: {alert['severity'].upper()}\n"
f"Title: {alert['title']}\n"
f"Description: {alert['description']}\n"
f"Lot ID: {alert['lot_id']}\n"
f"Timestamp: {datetime.utcnow().isoformat()}"
)
msg['Subject'] = f"[FSMA204-{alert['severity'].upper()}] {alert['title']}"
msg['From'] = self.config['smtp_from']
msg['To'] = ', '.join(recipients)
# In produzione usare asyncio-friendly SMTP
with smtplib.SMTP(self.config['smtp_host'], self.config['smtp_port']) as server:
server.starttls()
server.login(self.config['smtp_user'], self.config['smtp_password'])
server.sendmail(self.config['smtp_from'], recipients, msg.as_string())
async def _send_sms(self, alert: dict, config: dict):
phones = config.get('recipients', {}).get('phone', [])
message_body = (
f"FSMA204 ALERT {alert['severity'].upper()}: "
f"{alert['title']} - Lot: {alert['lot_id']}"
)
for phone in phones:
self.twilio.messages.create(
body=message_body,
from_=self.config['twilio_from'],
to=phone
)
async def _send_webhook(self, alert: dict, config: dict):
webhook_url = config.get('recipients', {}).get('webhook_url')
if not webhook_url:
return
async with httpx.AsyncClient() as client:
await client.post(
webhook_url,
json={
"event": "fsma204_alert",
"severity": alert['severity'],
"data": alert
},
timeout=10.0
)
# ── Main check function ──────────────────────────────────────
ACTIVE_RULES = [
ColdChainBreachRule(),
MissingKDERule(),
# Aggiungere: SupplierBlacklistRule(), LotAgeRule(), etc.
]
async def check_alert_rules(event: dict, lot_id: str):
"""Chiamato come background task dopo ogni registrazione CTE"""
async with get_db_session() as db:
for rule in ACTIVE_RULES:
try:
alert_data = await rule.evaluate(event, db)
if alert_data:
# Salva alert su DB
await db.execute(
text("""
INSERT INTO alerts (rule_id, lot_id, severity, title, description, data)
SELECT ar.id, :lot_id, :severity, :title, :description, :data::jsonb
FROM alert_rules ar WHERE ar.name = :rule_name
"""),
{
"lot_id": lot_id,
"severity": alert_data['severity'],
"title": alert_data['title'],
"description": alert_data['description'],
"data": json.dumps(alert_data.get('data', {})),
"rule_name": rule.name
}
)
await db.commit()
# Auto-trigger recall se critical
if alert_data['severity'] == AlertSeverity.CRITICAL:
await auto_trigger_recall(lot_id, alert_data, db)
logger.warning(
f"Alert generato: {alert_data['severity']} - {alert_data['title']}"
)
except Exception as e:
logger.error(f"Error evaluating rule {rule.rule_id}: {e}", exc_info=True)
リコール管理: 24 時間以内にワークフローを完了
FSMA 204 の最も厳しい要件は、トレースバックとトレースフォワードを完了し、並べ替え可能なリストを FDA に提供できることです。 リクエストから24時間。自動化されたリコール ワークフローは、この SLA を大規模な運用で保証する唯一の方法です。 2024 年に FDA は 241 件の食品リコールを記録し、平均完了時間は 73 時間で、新規則の要件を大幅に超えています。
# fsma204/recall_manager.py
# Recall Management con workflow automatizzato
import asyncio
import json
import logging
from datetime import datetime
from uuid import UUID, uuid4
from typing import List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
import networkx as nx
logger = logging.getLogger(__name__)
class RecallWorkflow:
"""
Gestisce il workflow completo di un recall FSMA 204.
Fasi:
1. IDENTIFIED - Identificazione del problema
2. SCOPE_DETERMINED - Calcolo scope (traceback + traceforward)
3. NOTIFICATIONS_SENT - Notifiche a tutti gli operatori coinvolti
4. REMOVAL_IN_PROGRESS - Rimozione dal mercato
5. EFFECTIVENESS_CHECK - Verifica efficacia
6. CLOSED / FDA_REPORTED
"""
def __init__(self, db: AsyncSession, notifier):
self.db = db
self.notifier = notifier
async def initiate_recall(
self,
trigger_lot_ids: List[UUID],
trigger_reason: str,
trigger_description: str,
initiated_by: str,
recall_type: str = "class_i"
) -> dict:
"""
Avvia un recall. Questo metodo deve completare
traceback + traceforward + scope in < 30 minuti
per rispettare il requisito 24h FDA complessivo.
"""
recall_number = self._generate_recall_number()
start_time = datetime.utcnow()
logger.info(
f"Recall {recall_number} avviato da {initiated_by} "
f"per {len(trigger_lot_ids)} lotti"
)
# Fase 1: Crea record recall
recall_id = await self._create_recall_record(
recall_number, trigger_lot_ids, trigger_reason,
trigger_description, initiated_by, recall_type
)
# Fase 2: Determina scope (traceback + traceforward)
scope_start = datetime.utcnow()
affected_lots = await self._determine_scope(trigger_lot_ids)
scope_seconds = int((datetime.utcnow() - scope_start).total_seconds())
# Fase 3: Aggiorna recall con scope e stati
await self._update_recall_scope(recall_id, affected_lots, scope_seconds)
# Fase 4: Genera FDA Recall Notice
fda_report = await self._generate_fda_report(
recall_number, trigger_lot_ids, affected_lots,
trigger_reason, trigger_description
)
# Fase 5: Marca lotti come recalled
await self._mark_lots_recalled(affected_lots)
# Fase 6: Invia notifiche a tutti gli operatori
notify_start = datetime.utcnow()
await self._send_recall_notifications(affected_lots, recall_number, trigger_description)
notify_seconds = int((datetime.utcnow() - notify_start).total_seconds())
# Aggiorna timestamp workflow
await self.db.execute(
text("""
UPDATE recall_events SET
scope_determined_at = NOW(),
notifications_sent_at = NOW(),
status = 'notifications_sent',
fda_report_json = :fda_report::jsonb,
traceback_seconds = :traceback_seconds,
lots_traced_pct = :lots_pct
WHERE id = :id
"""),
{
"id": str(recall_id),
"fda_report": json.dumps(fda_report),
"traceback_seconds": scope_seconds,
"lots_pct": 100.0 # aggiornare con verifica reale
}
)
await self.db.commit()
total_seconds = int((datetime.utcnow() - start_time).total_seconds())
logger.info(
f"Recall {recall_number} scope determinato in {scope_seconds}s, "
f"notifiche in {notify_seconds}s, totale {total_seconds}s"
)
return {
"recall_number": recall_number,
"recall_id": str(recall_id),
"affected_lots_count": len(affected_lots),
"scope_determination_seconds": scope_seconds,
"notification_seconds": notify_seconds,
"total_seconds": total_seconds,
"fda_report": fda_report
}
async def _determine_scope(self, trigger_lot_ids: List[UUID]) -> List[str]:
"""
Determina scope usando graph traversal.
Usa NetworkX per BFS su grafo di genealogia.
"""
# Costruisce grafo da DB
G = await self._build_traceability_graph()
affected = set()
for lot_id in trigger_lot_ids:
lot_str = str(lot_id)
if lot_str not in G:
continue
# Traceback: tutti gli antenati (upstream)
ancestors = nx.ancestors(G, lot_str)
affected.update(ancestors)
# Traceforward: tutti i discendenti (downstream)
descendants = nx.descendants(G, lot_str)
affected.update(descendants)
affected.add(lot_str)
return list(affected)
async def _build_traceability_graph(self) -> nx.DiGraph:
"""
Costruisce un grafo orientato della genealogia lotti
usando gli eventi CTE di tipo shipping/receiving/transformation.
"""
G = nx.DiGraph()
# Nodi: tutti i lotti attivi
lots_result = await self.db.execute(
text("SELECT id, tlc, status FROM traceability_lots WHERE status != 'archived'")
)
for row in lots_result.fetchall():
G.add_node(str(row.id), tlc=row.tlc, status=row.status)
# Archi: da shipping events
edges_result = await self.db.execute(
text("""
SELECT
s.lot_id AS from_lot,
r.lot_id AS to_lot
FROM cte_events s
JOIN cte_events r ON s.reference_document_number = r.reference_document_number
WHERE s.event_type = 'shipping'
AND r.event_type = 'receiving'
""")
)
for row in edges_result.fetchall():
G.add_edge(str(row.from_lot), str(row.to_lot))
# Archi: da transformation events
transform_result = await self.db.execute(
text("""
SELECT lot_id, related_lot_ids
FROM cte_events
WHERE event_type = 'transformation'
""")
)
for row in transform_result.fetchall():
for parent_id in (row.related_lot_ids or []):
G.add_edge(str(parent_id), str(row.lot_id))
return G
async def _generate_fda_report(
self,
recall_number: str,
trigger_lots: List[UUID],
affected_lots: List[str],
reason: str,
description: str
) -> dict:
"""
Genera il recall notice in formato FDA.
Struttura basata su FDA 21 CFR 7.46.
"""
# Recupera dettagli lotti coinvolti
lots_result = await self.db.execute(
text("""
SELECT tl.tlc, tl.initial_quantity, tl.unit_of_measure,
fi.name AS food_name, fi.ftl_category,
l.name AS origin_name, l.fda_facility_id,
tl.production_date, tl.best_before_date
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
JOIN locations l ON tl.origin_location_id = l.id
WHERE tl.id = ANY(:affected)
"""),
{"affected": affected_lots}
)
affected_details = [dict(r._mapping) for r in lots_result.fetchall()]
return {
"recall_number": recall_number,
"report_date": datetime.utcnow().isoformat(),
"regulation": "FSMA Section 204 - 21 CFR Part 1 Subpart S",
"recall_type": "Voluntary",
"reason": reason,
"description": description,
"affected_products": affected_details,
"total_affected_lots": len(affected_lots),
"fsma_compliance": {
"rule": "Food Traceability Final Rule",
"response_format": "Electronic Sortable",
"24h_compliance": True
}
}
def _generate_recall_number(self) -> str:
year = datetime.utcnow().year
unique = str(uuid4())[:8].upper()
return f"RECALL-{year}-{unique}"
async def _create_recall_record(
self, recall_number, trigger_lot_ids, reason,
description, initiated_by, recall_type
) -> UUID:
result = await self.db.execute(
text("""
INSERT INTO recall_events (
recall_number, recall_type, trigger_lot_ids,
trigger_reason, trigger_description, initiated_by
) VALUES (
:number, :type, :lots::uuid[],
:reason, :desc, :by
)
RETURNING id
"""),
{
"number": recall_number,
"type": recall_type,
"lots": [str(l) for l in trigger_lot_ids],
"reason": reason,
"desc": description,
"by": initiated_by
}
)
row = result.fetchone()
await self.db.commit()
return row.id
async def _mark_lots_recalled(self, affected_lot_ids: List[str]):
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'recalled', updated_at = NOW()
WHERE id = ANY(:ids::uuid[])
"""),
{"ids": affected_lot_ids}
)
await self.db.commit()
async def _update_recall_scope(self, recall_id, affected_lots, scope_seconds):
await self.db.execute(
text("""
UPDATE recall_events SET
affected_lot_ids = :lots::uuid[],
status = 'scope_determined',
traceback_seconds = :seconds
WHERE id = :id
"""),
{
"lots": affected_lots,
"seconds": scope_seconds,
"id": str(recall_id)
}
)
await self.db.commit()
async def _send_recall_notifications(
self,
affected_lot_ids: List[str],
recall_number: str,
description: str
):
"""Notifica tutti gli operatori che hanno movimentato i lotti coinvolti"""
# Recupera contatti di tutti i location coinvolti
contacts_result = await self.db.execute(
text("""
SELECT DISTINCT l.contact_email, l.contact_phone, l.name
FROM cte_events ce
JOIN locations l ON ce.location_id = l.id
WHERE ce.lot_id = ANY(:lots::uuid[])
AND l.contact_email IS NOT NULL
"""),
{"lots": affected_lot_ids}
)
for contact in contacts_result.fetchall():
alert_payload = {
"severity": "critical",
"title": f"RECALL NOTICE {recall_number}",
"description": (
f"I lotti che avete ricevuto/spedito sono coinvolti nel recall {recall_number}. "
f"Motivo: {description}. "
"Sospendere immediatamente la distribuzione e contattare il responsabile."
),
"lot_id": "multiple"
}
rule_config = {
"channels": ["email"],
"recipients": {
"email": [contact.contact_email],
"phone": [contact.contact_phone] if contact.contact_phone else []
}
}
try:
await self.notifier.send_alert(alert_payload, rule_config)
except Exception as e:
logger.error(f"Failed to notify {contact.name}: {e}")
模擬リコール訓練: 定期的なパフォーマンステスト
FDA は次のことを強く推奨しています。 模擬リコール訓練 — システムが 24 時間要件を満たしていることを確認するための、リコール プロセスの定期的なシミュレーション。訓練は少なくとも 6 か月ごとに実行し、その結果を監査のために文書化する必要があります。主な指標は、完了までの合計時間、追跡されたバッチの割合、範囲を決定するまでの時間です。
# fsma204/mock_recall_drill.py
# Mock recall drill con metriche performance
import asyncio
import random
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, Any
logger = logging.getLogger(__name__)
class MockRecallDrill:
"""
Simula un recall scenario per testare le performance del sistema.
Il drill usa lotti MOCK (non production) con dati sintetici.
"""
def __init__(self, db, recall_workflow, lot_count: int = 100):
self.db = db
self.workflow = recall_workflow
self.lot_count = lot_count
self.drill_id = f"DRILL-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
async def run_drill(self) -> Dict[str, Any]:
"""
Esegue il drill completo e ritorna le metriche.
"""
logger.info(f"Avvio mock recall drill {self.drill_id}")
metrics = {
"drill_id": self.drill_id,
"started_at": datetime.utcnow().isoformat(),
"phases": {}
}
# Fase 1: Seed dati mock
phase_start = time.monotonic()
mock_lots = await self._seed_mock_data()
metrics['phases']['data_seeding'] = {
"duration_seconds": round(time.monotonic() - phase_start, 2),
"lots_created": len(mock_lots)
}
# Fase 2: Seleziona lotto trigger casuale
trigger_lot = random.choice(mock_lots[10:40]) # Dal mezzo della supply chain
logger.info(f"Drill trigger lot: {trigger_lot['tlc']}")
# Fase 3: Esegue recall
phase_start = time.monotonic()
recall_result = await self.workflow.initiate_recall(
trigger_lot_ids=[trigger_lot['id']],
trigger_reason="pathogen_contamination",
trigger_description=f"[DRILL] Mock Listeria contamination test - {self.drill_id}",
initiated_by="recall_drill_system",
recall_type="class_i"
)
recall_duration = time.monotonic() - phase_start
metrics['phases']['recall_execution'] = {
"duration_seconds": round(recall_duration, 2),
"scope_seconds": recall_result['scope_determination_seconds'],
"notification_seconds": recall_result['notification_seconds'],
"affected_lots": recall_result['affected_lots_count'],
"recall_number": recall_result['recall_number']
}
# Fase 4: Verifica compliance SLA
sla_24h_seconds = 24 * 3600
compliance_ok = recall_duration < sla_24h_seconds
metrics['sla_compliance'] = {
"target_seconds": sla_24h_seconds,
"actual_seconds": round(recall_duration, 2),
"compliant": compliance_ok,
"margin_hours": round((sla_24h_seconds - recall_duration) / 3600, 1)
}
# Fase 5: Cleanup dati mock
await self._cleanup_mock_data(mock_lots)
metrics['completed_at'] = datetime.utcnow().isoformat()
metrics['overall_pass'] = compliance_ok
self._log_drill_report(metrics)
return metrics
async def _seed_mock_data(self) -> list:
"""Crea lotti sintetici con CTE events per il drill"""
from sqlalchemy import text
mock_lots = []
for i in range(self.lot_count):
tlc = f"MOCK-{self.drill_id}-LOT-{i:04d}"
lot_data = {
"id": None,
"tlc": tlc,
"is_mock": True
}
# In produzione: inserire tramite lot creation API
# Per semplicità usiamo insert diretto nel drill
mock_lots.append(lot_data)
logger.info(f"Creati {len(mock_lots)} lotti mock per drill {self.drill_id}")
return mock_lots
async def _cleanup_mock_data(self, mock_lots: list):
"""Rimuove dati mock dopo il drill"""
from sqlalchemy import text
tlcs = [l['tlc'] for l in mock_lots]
# I mock TLC iniziano con "MOCK-" per identificazione sicura
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'archived'
WHERE tlc LIKE 'MOCK-%' AND tlc = ANY(:tlcs)
"""),
{"tlcs": tlcs}
)
await self.db.commit()
logger.info(f"Cleanup drill {self.drill_id} completato")
def _log_drill_report(self, metrics: dict):
report_lines = [
f"",
f"=== MOCK RECALL DRILL REPORT ===",
f"Drill ID: {metrics['drill_id']}",
f"Started: {metrics['started_at']}",
f"",
f"FASI:",
]
for phase, data in metrics.get('phases', {}).items():
report_lines.append(f" {phase}: {json.dumps(data)}")
sla = metrics.get('sla_compliance', {})
status = "PASS ✓" if sla.get('compliant') else "FAIL ✗"
report_lines.extend([
f"",
f"SLA 24H COMPLIANCE: {status}",
f" Target: {sla.get('target_seconds')}s",
f" Actual: {sla.get('actual_seconds')}s",
f" Margin: {sla.get('margin_hours')}h",
f"",
f"OVERALL: {'PASS' if metrics.get('overall_pass') else 'FAIL'}",
f"================================",
])
logger.info('\n'.join(report_lines))
規制の比較: FSMA 204 と EU 規制178/2002 対英国食品安全法
複数の市場に輸出する事業者にとって、規制枠組み間の違いを理解することは重要です。 「1 ステップアップ、1 ステップダウン」の原則は共通していますが、粒度と制裁の点で大きく異なります。
| サイズ | FSMA 204 (米国) | EU 登録178/2002 | 英国食品安全法 1990 + Reg. |
|---|---|---|---|
| ほうき | FTL食品のみ(特定のリスト) | すべての食品と飼料 | イギリスのすべての食品 |
| 粒度 | イベント固有の CTE + KDE | 一般的な 1 ステップアップ / 1 ステップダウン | ワンステップアップ / ワンステップダウン |
| FDA/当局の応答時間 | 24時間(電子ソート可能なリスト) | 特別な条件はありません | 特別な条件はありません |
| レコードフォーマット | 電子的、注文可能、相互運用可能 | 文書化されたあらゆる形式 | 文書化されたあらゆる形式 |
| 保持 | 24ヶ月 | カテゴリごとの変数 | カテゴリごとの変数 |
| 制裁 | 違反ごとに 1 日あたり最大 10,000 ドル | 加盟国ごとに異なります | 最高20,000ポンド+懲役刑 |
| 輸入業者への申請 | はい: 米国に輸入するすべての人に適用されます | はい: EU市場の事業者向け | はい: 英国市場の通信事業者向け |
| 技術基準 | 義務なし(ただしGS1を推奨) | 令状なし | 令状なし |
輸出業者にとっての実際的な意味
フレッシュモッツァレラを米国に輸出するイタリア企業は遵守する必要があります sia EU 登録FSMA 204 はより厳格であるため (特定の CTE/KDE、24 時間応答、電子形式)、FSMA 204 に準拠したシステムは自動的に EU 規制にも準拠します。最適な戦略は、FSMA 204 をベースラインとして実装し、それを EU/英国の要件に適合させることです。
イタリアの対米国輸出業者への影響
イタリアは米国への食品および飲料製品の第 3 位の輸出国 (カナダ、メキシコに次ぐ) であり、その輸出額は約 2024年に80億ユーロ。 FSMA 204 の影響を受ける主なカテゴリ:
| カテゴリ | IT-USA 2024 年輸出 (予定) | FSMA 204 の適用範囲 | アクションが必要です |
|---|---|---|---|
| ワイン | ~21億ドル | いいえ (FTL にはありません) | FDA登録のみ |
| 熟成チーズ(パルメザンチーズ、グラナチーズ、ペコリーノチーズ) | ~3億5000万ドル | いいえ(経験者は除く) | FDA登録+事前通知のみ |
| フレッシュチーズ(モッツァレラ、ブッラータ、リコッタチーズ) | ~1億2000万ドル | YES — 重要な優先度 | 完全な CTE/KDE 実装 |
| オリーブ油 | ~7億600万ドル | No | FDA登録のみ |
| 野菜ジャム(トマト、ピューレ) | ~2億ドル | 部分的(FTLのフレッシュトマト) | 食材が新鮮かどうかを確認する |
イタリア食品中小企業のコンプライアンスコストの見積り
| 成分 | セットアップ(一回限り) | 年間運営コスト |
|---|---|---|
| ソフトウェアのトレーサビリティ (SaaS またはカスタム) | 15,000 ユーロ – 80,000 ユーロ | 8,000~20,000ユーロ/年 |
| 既存のERP/MES統合 | 10,000 ユーロ – 40,000 ユーロ | 2,000 – 5,000ユーロ/年 |
| ハードウェア (スキャナー、RFID リーダー、センサー) | 5,000ユーロ – 25,000ユーロ | 年間 1,000 ~ 3,000 ユーロ (メンテナンス) |
| パーソナルトレーニング | 3,000 ユーロ – 10,000 ユーロ | 1,500 ~ 3,000 ユーロ/年 |
| 法的アドバイス/FDA コンプライアンス | 5,000ユーロ – 20,000ユーロ | 3,000 – 8,000ユーロ/年 |
| 推定合計額 | 38,000ユーロ – 175,000ユーロ | 15,500 ユーロ~39,000 ユーロ/年 |
利用可能な資金: PNRR Transition 5.0
イタリア食品の中小企業は、デジタルトレーサビリティシステムの導入費用として、PNRR Transition 5.0基金(127億ユーロ割り当て、最大45%の税額控除)を利用できる。 FSMA 204 システムは、コールド チェーン監視用の IoT センサーを統合している場合、「生産プロセスのデジタル化」として認定されます。該当するかどうかを会計士に確認してください。
イタリアの中小企業に推奨されるコンプライアンス ロードマップ
- 1~2ヶ月目: ギャップ分析 — 既存のフローをマッピングし、FTL 製品を特定し、現在の IT システムを評価します
- 3~4ヶ月目: データベース設計 + API 開発 (この記事のコードをベースラインとして使用します)
- 5~6ヶ月目: 既存ERP(SAP、Dynamics、Sage)との統合、オペレータートレーニング
- 7~8ヶ月目: 製品/ラインのパイロット、KDE の完全性の検証、最初の模擬訓練
- 月 9 ~ 10: 完全な展開、アクティブなアラート エンジン、FDA ドキュメント
- 11~12月: 2回目の模擬訓練、微調整、FDA監査の準備
ベストプラクティスとアンチパターン
ベストプラクティス
- ユニークで明確な TLC: 視覚的に似た文字 (I/l/1、O/0) は避けてください。米国のサプライチェーンとの相互運用性の標準として GS1-128 または SSCC を使用する
- DB レベルで不変性が保証される: コード規約だけでなく、cte_events での UPDATE/DELETE を防ぐ PostgreSQL トリガーを使用する
- 監査証跡のイベント ハッシュ: 作成時のペイロードの SHA-256 - レコードが変更されていないことを確認できます
- 24時間ALS検査を定期的に実施: 合成データを使用して 6 か月ごとに模擬訓練を実行し、結果を文書化する
- GS1との同期: location_id には GLN (グローバル ロケーション番号) を使用します - EDI および米国パートナー システムと互換性があります
- KDE のバージョン管理: FSMA 204 は進化する可能性があります。移行を容易にするために、記録に kde_schema_version フィールドを保持します。
避けるべきアンチパターン
- KDE は紙面のみ: 多くの企業は KDE を PDF または Excel シートに保存しています。これらのシステムでは、「電子的でソート可能な形式」の要件を 24 時間以内に満たすことはできません。
- 非標準TLC: 下流パートナーに伝達されない内部ロット番号を使用すると、企業間の追跡が不可能になります
- エスカレーションなしのアラート: 電子メールは送信するが、重大な問題に対するエスカレーション プロトコルがないアラート システム (2 時間以内に応答がない → 自動電話) は、FSMA の精神を尊重していません。
- 内部トレーサビリティのみ: FSMA 204 では、パートナーに対して 1 ステップアップと 1 ステップダウンを要求します。社内体制だけでは不十分
- UTC 同期の失敗: CTE タイムスタンプは UTC であるか、明示的にオフセットされている必要があります。タイムスタンプが曖昧なため、国際的なリコールが発生した場合にタイムラインを再構築することが不可能になる
結論と次のステップ
FSMA 規則 204 は、紙文書としてのトレーサビリティから、 リアルタイム ソフトウェア インフラストラクチャとして追跡可能。この記事で構築したシステム (ストレージには PostgreSQL、API レイヤーには FastAPI、イベント ストリーミングには Kafka、グラフ トラバーサルには NetworkX) は、リクエストから 24 時間以内に完全な並べ替え可能なリストを FDA に提出するという最も重要な要件を満たすことができます。
イタリア企業にとって朗報です。期限が 2028 年 7 月まで延長されたことで、段階的に実施するのに十分な時間が与えられました。悪いニュース: 2027 年まで事業を開始するまで待った企業は、コンサルタントやソフトウェア ベンダーの市場が高騰した価格で長い行列ができることになります。
2024 年のデータは明確に物語っています。FDA の食品リコールは 241 件、病人は 1,392 人、入院者数は 2023 年と比べて 2 倍以上に増加しました。きめ細かなトレーサビリティは官僚的なものではなく、人の命の中で測定できる予防です。
フードテック シリーズ: 次の記事
シリーズの次の記事は、 07 — 垂直農業: ダッシュボードと自動制御ここでは、IoT センサー (光、CO2、pH、EC)、Streamlit ダッシュボード、平方メートルあたりの収量を最大化する最適化アルゴリズムなど、水耕栽培農場向けのリアルタイム制御システムを構築します。 federicolo.dev にご期待ください。
シリーズ間: 関連する洞察
- MLOps (シリーズ 12): MLflow を使用してアラート エンジンの異常検出モデルを本番環境にデプロイする方法
- AI エンジニアリング (シリーズ 13): FDA 文書の RAG — 正しい規制テキストを引用することでコンプライアンスの質問に答える LLM
- PostgreSQL AI (シリーズ 22): 汚染プロファイルの類似性検索のための pgvector — 過去の発生に関与したものと類似した特徴を持つロットを検索します
- DevOps フロントエンド (シリーズ 9): Kubernetes 上で FSMA 204 API を継続的にデプロイするための CI/CD パイプライン







