PyTorch と YOLO を使用した食品品質管理のためのコンピューター ビジョン
毎年、生産中に検出されなかった食品欠陥が世界の産業に損害を与えています およそ 70億ドル:製品の回収、市場でのリコール、損害 ブランドの評判、規制上の制裁、そして最も深刻な場合には消費者の健康に対するリスクも含まれます。 瓶に入った傷ついたトマト 一線を越えた金属異物 包装、カビが隠れた果物のバッチ: 手動検査システムが必要な状況 ラインが毎秒 10 ~ 20 個の速度で回転している場合、十分に安定してインターセプトすることはできません。
人間による目視検査は効果的ですが本質的に限界があります。健康な検査員、 十分に訓練され、休息がとれていれば、高速ラインで約 60 ~ 70% の精度を達成できます。 疲労、照明、主観に関連する大きな変動があります。のシステム ただし、ディープラーニングに基づくコンピュータービジョンは、 98%以上の精度、 1日24時間、夜勤でもパフォーマンスを落とすことなく、絶対的な一貫性のある評価を実現します。
この文脈では、Ultralytics の YOLO (You Only Look Once) ファミリが標準になりました。 リアルタイムの工業検査の事実上。 2024 年 9 月にリリースされた YOLO11 は、 卓越したパフォーマンス: ベンチマークの mAP が高く、YOLOv8 よりもパラメーターが 22% 少ない COCO、T4 GPU で 2 ミリ秒未満のレイテンシ、およびミリメートルサイズの欠陥を検出する機能 高速ベルトコンベア上で。食品品質管理のAI市場 食品の安全性を実現 2025年には27億ドル そして、2030 年までに 30.9% の CAGR で 137 億人に増加すると予想されます。
この記事は、産業用ビジョン システムのアーキテクチャから、 食品欠陥に関する特定のデータセットの構築と注釈、トレーニング パイプラインへ PyTorch と YOLO11 を使用し、産業用ハードウェア、PLC 統合による生産ラインへの導入まで そして自動仕分けシステム。各セクションには、動作しテストされた Python コードが含まれています。
この記事で学べること
- 食品業界に適用されるコンピューター ビジョンの基礎: 特有の課題と従来の産業用履歴書との違い
- YOLO の進化: YOLOv5 から YOLO11 へ、アーキテクチャ、ベンチマーク、食品検査の Faster R-CNN で YOLO が勝利する理由
- 食品欠陥のデータセットの構築: CVAT/Roboflow によるアノテーション、クラス、食品固有のデータ拡張
- PyTorch と YOLO11 を使用した完全なトレーニング パイプライン: Python コード、ハイパーパラメーター調整、検証
- 検出 vs 分類 vs セグメンテーション: 品質管理にどのアプローチをいつ使用するか
- 検査ラインのハードウェア アーキテクチャ: GigE Vision カメラ、構造化照明、トリガー、PLC
- 品質指標: mAP、精度、リコール、食品業界の許容しきい値
- 自動仕分けシステム: 空気圧アクチュエータとピックアンドプレイスロボットの統合
- 完全なケーススタディ: YOLO11 を使用した果物選別ライン、10 個/秒、精度 98.3%
- 規制: IFS Food、BRC、食品環境におけるビジョンシステム用の HACCP
フードテックシリーズ - すべての記事
| # | アイテム | レベル | Stato |
|---|---|---|---|
| 1 | Python と MQTT を使用した精密農業用の IoT パイプライン | 高度な | 利用可能 |
| 2 | 作物監視のための ML Edge: 圃場でのコンピュータ ビジョン | 高度な | 利用可能 |
| 3 | 衛星 API と植生インデックス: Python と Sentinel-2 を使用した NDVI | 中級 | 利用可能 |
| 4 | 食品におけるブロックチェーンのトレーサビリティ: 現場からスーパーマーケットまで | 中級 | 利用可能 |
| 5 | PyTorch を使用した品質管理のためのコンピューター ビジョン YOLO (ここにいます) | 高度な | 現在 |
| 6 | FSMA とデジタル コンプライアンス: 規制プロセスの自動化 | 中級 | 近日公開 |
| 7 | 垂直農法: IoT と ML による環境制御 | 高度な | 近日公開 |
| 8 | Prophet と LightGBM を使用した食品小売の需要予測 | 中級 | 近日公開 |
| 9 | ファーム インテリジェンス ダッシュボード: Grafana を使用したリアルタイム分析 | 中級 | 近日公開 |
| 10 | サプライチェーンの食品の最適化: 廃棄物削減のための ML | 中級 | 近日公開 |
食品業界におけるコンピュータービジョン: 課題と機会
食品品質管理のためのコンピュータビジョンは単なる「応用産業履歴書」ではありません 食べ物に」。ユニークな特徴があり、それがより複雑であると同時により興味深いものとなっています 機械部品やプリント基板の検査と比較して。これらの特性を理解する 堅牢で信頼性の高いシステムを構築するための第一歩です。
食品の自然変動
欠陥のあるボルトは、正確かつ不変の幾何学的基準によって適合ボルトと区別できます。 直径、ピッチ、長さ。しかし、「完璧な」トマトはほぼ無限の範囲に存在します。 形、色、質感、サイズは品種間で異なるだけでなく、 同じ作物内で。視覚システムは変動性を区別することを学習する必要があります 欠陥がなくても許容できる天然物(わずかに非対称だが完全に健康なトマト) 実際(日焼け、衝撃によるへこみ、真菌の攻撃)。
この課題には、代表的なサンプルを含む、非常に大規模でバランスのとれたトレーニング データセットが必要です。 製品の通常の変動をすべて考慮し、特に校正に注意を払います。 偽陰性 (検出されない欠陥) と i の両方を回避するための判定しきい値 誤検知 (準拠製品が廃棄され、運用コストに直接影響を与える)。
照明と環境の課題
食品生産ラインの環境は光学システムにとって過酷です: 水蒸気 洗浄プロセス、冷蔵環境でのレンズの結露、レンズの変動による影響 ベルトコンベアに沿った照明、光沢のある表面からの鏡面反射など ワックスまたは釉薬。構造化照明(リングライト、バックライト、同軸照明、 偏光)は基本であり、ビジョン システムと一緒に設計する必要があります。 後付けで追加されたものではありません。
透明・半透明の商品(ゼリー、ドリンク、PET容器)の場合、 バックライトが使用されており、内包物や気泡が見えるようになります。 果物や野菜などの不透明な製品には、拡散照明の組み合わせ 同軸光を45度で照射すると、火傷、へこみ、表面病変がよくわかります。 金属異物の検出にはビジョンシステムに対応 誘導式またはX線金属探知機。
回線速度とリアルタイム処理
果物の包装ラインは通常、チャネルごとに 1 秒あたり 8 ~ 15 個の速度で稼働します。 ビスケットの生産ラインは毎分 200 ~ 400 個に達します。 ビジョンシステムは画像を取得し、処理して、決定を伝達する必要があります。 製品がその距離を移動するのにかかる時間内にリジェクトアクチュエータまで 検査ステーションと選別ステーションの間: 通常 200 ~ 500 ミリ秒。
この時間制約により、モデルなどの計算量の多いアプローチは除外されます。 最適化を行わずに高解像度のセグメンテーションを実現し、アーキテクチャに報酬を与える GPU 上の単一の順方向パスで検出を実行する YOLO のようなワンショット。
食品検査のための YOLO の進化: YOLOv5 から YOLO11 へ
YOLO ファミリは、10 年近くにわたって産業用リアルタイム検出を支配してきました。 その進化をたどることは、YOLO11 が最適な選択肢である理由を理解するのに役立ちます 2025年の新しい食品検査制度に向けて。
YOLOv5 (2020) - 転換点
Ultralytics の YOLOv5 は、産業用検出のアクセシビリティに革命をもたらしました。 初のネイティブ モジュラー PyTorch 実装、ONNX へのエクスポートの簡素化、 TensorRT と CoreML、文書化された再現可能なトレーニング パイプライン。彼が作ったのは、 検出カスタムは、CV に関する深い専門知識を持たないチームでもアクセス可能であり、定着しています。 導入の簡素化により、グローバルな生産ラインを実現します。多くの施設 2021 年から 2023 年の間にインストールされたものは、引き続き YOLOv5 で実行されます。
YOLOv8 (2023) - 現代建築
YOLOv8 はアンカーフリー アーキテクチャを導入し、アンカーの必要性を排除します。 事前定義されたアンカー ボックスを定義し、食品データセットのトレーニングを簡素化します オブジェクトの寸法は(金型の点から)非常に変化しやすい場合 リンゴ丸ごとにミリメートル)。 C2f バックボーン (ボトルネックが 2 つあるクロスステージ部分) トレーニング中の勾配の流れを改善します。分離型検出ヘッド 分類と回帰のための (分離ヘッド) により、収束が向上します。 COCO の mAP@50: 2590 万パラメータの YOLOv8m で 50.2%。
YOLO11 (2024 年 9 月) - 最先端の技術
YOLO11 は、計算効率の点で最も大きな進歩を示しています。 YOLO11m モデルは mAP@50 に達します COCOで51.5% 一人で 2010万パラメータつまり、同じタスクの YOLOv8m より 22% 少ないことになります。 C3k2 アーキテクチャによる改良されたバックボーンにより、より豊富な機能抽出が可能になります。 ネック SPPF (空間ピラミッド プーリング - 高速) は、スケーリングされたオブジェクトをより適切に処理します。 これは、ミリメートル単位の欠陥と完全なオブジェクトの両方を検出するために重要です。 Nano モデルおよび 1.5ミリ秒、 600+ FPS での処理が可能になります。
YOLO と食品検査の代替アーキテクチャの比較
| 建築 | mAP@50ココ | レイテンシー (ミリ秒) | パラメータ | 食品QC適格 |
|---|---|---|---|---|
| YOLO11n | 39.5% | 1.5ミリ秒 | 2.6M | Excellent(エッジ展開) |
| ヨロ11m | 51.5% | 4.7ミリ秒 | 20.1M | 素晴らしい(予算) |
| YOLO11x | 54.7% | 11.3ミリ秒 | 5690万 | 良い(精度が高い) |
| YOLOv8m | 50.2% | 5.1ミリ秒 | 25.9M | 良好 (レガシー システム) |
| より高速な R-CNN | 55.0% | 120~200ミリ秒 | 41.8M | 悪い(遅すぎる) |
| モバイルネット SSD | 23.2% | 1.1ミリ秒 | 6.8M | 限界(低精度) |
| RT-DETR | 53.1% | 8.9ミリ秒 | 42.0M | 良好 (NMS なし) |
静的ベンチマークでの高い精度にもかかわらず、より高速な R-CNN ベルトコンベア上のリアルタイム検査には使用不可: 120 ~ 200 ミリ秒の遅延 これは、10 個/秒の速度では、システムは 12 ~ 20 個のうち 1 個しか認識しないことを意味します。 4.7 ms の YOLO11m は 200 FPS を容易に処理し、十分なヘッドルームを残します。 PLC および廃棄物アクチュエータとの通信パイプライン。
食品欠陥のデータセット: 構築とアノテーション
トレーニング データセットの品質がパフォーマンスを決定する最も重要な要素です ビジョンシステムの。貧弱なデータセットでトレーニングされた優れた YOLO11 モデル 平凡な結果が得られます。逆に、より単純なモデルをトレーニングした場合でも、 豊富でバランスが取れ、適切に注釈が付けられたデータセットは、大幅に優れた結果を生み出します。
食品業界でよくある欠陥の種類
データセットに含めるクラスは製品やプロセスによって異なりますが、存在します。 食品業界全般に共通するカテゴリ:
果物と野菜の視覚システムの欠陥クラス
| クラス | 説明 | 典型的な原因 | 重大度のしきい値 |
|---|---|---|---|
mold | 表面または隠れたカビ | 湿気、皮膚の傷 | 高 (強制拒否) |
bruise | 衝撃による凹み | 収集・運搬 | 中 (重大度に応じて) |
burn | 日焼けや低温やけど | 直射日光、霜 | 中~高 |
crack | 亀裂または亀裂 | 急速な成長、干ばつ | 高 (病原体媒介) |
foreign_object | 異物(木の葉、石、プラスチック) | 機械による収穫 | 批判 |
rot | 進行した腐敗 | 細菌、真菌 | 高 (強制拒否) |
insect_damage | 虫害 | 昆虫による攻撃 | 中~高 |
size_defect | 仕様外の口径 | 品種の多様性 | 低 (リダイレクト) |
color_defect | 異常な色(熟しすぎ/熟しすぎ) | 収集タイミング | 平均 |
ok | 適合製品 | - | - |
注釈ツール: CVAT、Label Studio、Roboflow
産業データセットのアノテーションには、3 つの主要なツールがあります。 特定の強みを持つ:
CVAT (コンピューター ビジョン アノテーション ツール) インテルとツールの データプライバシー要件を持つチームに最適な、堅牢な自己ホスト型オープンソース またはエアギャップ環境。境界ボックス、ポリゴン、ポリライン、ポイント注釈をサポート そしてビデオトラッキング。 Roboflow との統合により、事前トレーニングされたモデルを使用できるようになります 注釈アシスタントとして使用できるため、手動時間が 60 ~ 70% 削減されます。
ラベルスタジオ マルチモーダル データセット (画像、テキスト、オーディオ) にさらに柔軟に対応します。 MLOps パイプラインと簡単に統合できます。との共同注釈をサポート 複数のレビューとコンセンサス投票。複数のアノテーターが同じ画像に対して作業する場合に便利です。
ロボフロー アノテーション、前処理、 単一のクラウド ワークフローで拡張と YOLO 形式でエクスポートします。食品データセットの場合 パブリック、Roboflow Universe は数百のパブリック ドメイン データセット (フルーツ、 植物、表面欠陥など)を転移学習の出発点として使用できます。
データセットのサイジング
単一の製品に 8 ~ 10 クラスの欠陥がある検出システムの場合、 推奨される最小サイズと:
- トレーニングセット: クラスあたり最低 500 画像、理想的には 1000 以上
- 検証セット: トレーニングの 15 ~ 20%、クラスごとに階層化
- テストセット: 10~15%、完全に分離、実線条件で取得
- レアなクラス: のようなクラスの場合
foreign_object実稼働環境ではほとんど必要とされないため、オーバーサンプリングと積極的な拡張を使用します。
食品に特化したデータ拡張
食品の拡張では、実際の変化をシミュレートする必要があります。 システムは本番環境で対応します。定番テクニック(左右反転、回転、 作物)を食品固有の増強と統合する必要があります。
# augmentation_food.py
# Configurazione augmentation specifica per food quality inspection
import albumentations as A
import cv2
import numpy as np
def build_food_augmentation_pipeline(image_size: int = 640) -> A.Compose:
"""
Pipeline di augmentation per dataset di difetti alimentari.
Simula variazioni reali di illuminazione, orientamento e condizioni di linea.
"""
return A.Compose([
# Geometria: prodotti alimentari arrivano in orientamenti casuali
A.RandomRotate90(p=0.5),
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.3),
A.ShiftScaleRotate(
shift_limit=0.1,
scale_limit=0.2,
rotate_limit=45,
border_mode=cv2.BORDER_REFLECT,
p=0.7
),
# Illuminazione: variazioni reali in linea (vapore, sporco sulle lenti)
A.OneOf([
A.RandomBrightnessContrast(
brightness_limit=0.3,
contrast_limit=0.3,
p=1.0
),
A.RandomGamma(gamma_limit=(70, 130), p=1.0),
A.CLAHE(clip_limit=4.0, tile_grid_size=(8, 8), p=1.0),
], p=0.8),
# Colore: variazioni stagionali e di maturazione
A.HueSaturationValue(
hue_shift_limit=15, # Piccolo: non cambiare il colore del prodotto
sat_shift_limit=30,
val_shift_limit=20,
p=0.5
),
# Rumore: sensore camera industriale, interferenze EMI
A.OneOf([
A.GaussNoise(var_limit=(10.0, 50.0), p=1.0),
A.ISONoise(color_shift=(0.01, 0.05), intensity=(0.1, 0.5), p=1.0),
A.MultiplicativeNoise(multiplier=(0.9, 1.1), p=1.0),
], p=0.4),
# Blur: motion blur da velocità nastro, defocus per DOF limitata
A.OneOf([
A.MotionBlur(blur_limit=7, p=1.0), # Più realistico per nastro
A.GaussianBlur(blur_limit=(3, 7), p=1.0),
], p=0.3),
# Riflessioni: superfici cerate, glazze, film di acqua
A.RandomSunFlare(
flare_roi=(0.0, 0.0, 1.0, 0.5),
num_flare_circles_lower=1,
num_flare_circles_upper=3,
src_radius=100,
p=0.1
),
# Ritaglio e padding finale
A.PadIfNeeded(
min_height=image_size,
min_width=image_size,
border_mode=cv2.BORDER_REFLECT
),
A.Resize(image_size, image_size),
# Normalizzazione finale
A.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
),
], bbox_params=A.BboxParams(
format='yolo',
label_fields=['class_labels'],
min_area=100, # Ignora bbox troppo piccole dopo crop
min_visibility=0.3 # Ignora bbox con meno del 30% visibile
))
def augment_rare_class(
image: np.ndarray,
bboxes: list,
class_labels: list,
n_augmentations: int = 10
) -> list:
"""
Over-sampling per classi rare come foreign_object.
Genera n varianti augmentate di un singolo campione.
"""
pipeline = build_food_augmentation_pipeline()
augmented_samples = []
for _ in range(n_augmentations):
result = pipeline(
image=image,
bboxes=bboxes,
class_labels=class_labels
)
augmented_samples.append({
'image': result['image'],
'bboxes': result['bboxes'],
'class_labels': result['class_labels']
})
return augmented_samples
PyTorch と YOLO11 を使用してトレーニング パイプラインを完了する
食品品質管理用にカスタマイズされた YOLO11 モデルのトレーニング 構造化されたパイプラインに従います: 環境の準備、データセットの準備、 導入のための転移学習、検証、最適化を使用したトレーニング。
トレーニング環境のセットアップ
# Requisiti: Python 3.11+, CUDA 12.1+, NVIDIA GPU con 8GB+ VRAM
# Consigliato: RTX 3080/4080 per training locale, A100 per training veloce
# 1. Installazione dipendenze
# pip install ultralytics==8.3.0 albumentations roboflow torch torchvision
# 2. Struttura directory dataset (formato YOLO)
# dataset/
# ├── images/
# │ ├── train/ (70% campioni)
# │ ├── val/ (20% campioni)
# │ └── test/ (10% campioni)
# ├── labels/
# │ ├── train/ (file .txt corrispondenti)
# │ ├── val/
# │ └── test/
# └── data.yaml (configurazione dataset)
# data.yaml - Configurazione dataset
DATA_YAML_CONTENT = """
path: /data/food_quality_dataset
train: images/train
val: images/val
test: images/test
nc: 10 # Numero di classi
names:
0: ok
1: mold
2: bruise
3: burn
4: crack
5: foreign_object
6: rot
7: insect_damage
8: size_defect
9: color_defect
"""
import yaml
with open('/data/food_quality_dataset/data.yaml', 'w') as f:
f.write(DATA_YAML_CONTENT)
Roboflow を使用したデータセットのダウンロードと準備
# dataset_preparation.py
# Scarica dataset da Roboflow Universe o usa dataset locale
from roboflow import Roboflow
import os
def download_food_dataset(api_key: str, workspace: str, project: str, version: int) -> str:
"""
Scarica dataset da Roboflow e prepara per training YOLO11.
"""
rf = Roboflow(api_key=api_key)
proj = rf.workspace(workspace).project(project)
dataset = proj.version(version).download("yolov8") # Formato YOLO11 compatibile
dataset_path = dataset.location
print(f"Dataset scaricato in: {dataset_path}")
print(f"Training images: {len(os.listdir(os.path.join(dataset_path, 'train', 'images')))}")
print(f"Validation images: {len(os.listdir(os.path.join(dataset_path, 'valid', 'images')))}")
return dataset_path
def analyze_class_distribution(dataset_path: str) -> dict:
"""
Analizza la distribuzione delle classi nel dataset.
Identifica classi sbilanciate che richiedono over-sampling.
"""
import glob
from collections import Counter
class_counts = Counter()
label_files = glob.glob(os.path.join(dataset_path, 'train', 'labels', '*.txt'))
for label_file in label_files:
with open(label_file, 'r') as f:
for line in f:
class_id = int(line.split()[0])
class_counts[class_id] += 1
total = sum(class_counts.values())
distribution = {
class_id: {
'count': count,
'percentage': round(count / total * 100, 2),
'needs_oversampling': count < total / len(class_counts) * 0.3 # < 30% della media
}
for class_id, count in sorted(class_counts.items())
}
return distribution
# Analisi distribuzione per identificare classi rare
if __name__ == "__main__":
dataset_path = "/data/food_quality_dataset"
distribution = analyze_class_distribution(dataset_path)
print("\nDistribuzione classi nel dataset:")
class_names = ['ok', 'mold', 'bruise', 'burn', 'crack',
'foreign_object', 'rot', 'insect_damage', 'size_defect', 'color_defect']
for class_id, info in distribution.items():
name = class_names[class_id] if class_id < len(class_names) else f"class_{class_id}"
warning = " *** RICHIEDE OVERSAMPLING ***" if info['needs_oversampling'] else ""
print(f" {name}: {info['count']} campioni ({info['percentage']}%){warning}")
転移学習を使用した YOLO11 トレーニング
# train_yolo11_food.py
# Training pipeline completa per food quality inspection
from ultralytics import YOLO
import torch
import yaml
import os
from pathlib import Path
def train_food_quality_model(
dataset_yaml: str,
output_dir: str = "./runs/food_quality",
epochs: int = 150,
batch_size: int = 16,
image_size: int = 640,
model_variant: str = "yolo11m.pt" # n/s/m/l/x
) -> dict:
"""
Training YOLO11 per food quality inspection con ottimizzazioni specifiche.
Args:
dataset_yaml: Path al file data.yaml del dataset
output_dir: Directory per salvare i risultati
epochs: Numero di epoche (150 e un buon punto di partenza)
batch_size: Dimensione batch (16 per 8GB VRAM, 32 per 16GB+)
image_size: Dimensione immagine (640 standard, 1280 per difetti piccoli)
model_variant: Variante YOLO11 da usare come punto di partenza
Returns:
dict con metriche finali del training
"""
# Verifica GPU disponibile
device = 'cuda' if torch.cuda.is_available() else 'cpu'
if device == 'cpu':
print("ATTENZIONE: Training su CPU, sarà molto lento. Usa una GPU.")
print(f"Device: {device}")
if device == 'cuda':
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
# Carica modello pre-addestrato su COCO (transfer learning)
model = YOLO(model_variant)
# Configurazione training
training_args = {
'data': dataset_yaml,
'epochs': epochs,
'batch': batch_size,
'imgsz': image_size,
'device': device,
'project': output_dir,
'name': 'food_quality_v1',
# Ottimizzatore: AdamW e ottimo per fine-tuning
'optimizer': 'AdamW',
'lr0': 0.001, # Learning rate iniziale
'lrf': 0.01, # Learning rate finale = lr0 * lrf
'momentum': 0.937,
'weight_decay': 0.0005,
# Learning rate scheduling: cosine annealing
'cos_lr': True,
'warmup_epochs': 5, # 5 epoche di warmup per stabilizzare
# Augmentation integrata (albumentations)
'hsv_h': 0.015, # Hue shift (piccolo per preservare colore prodotto)
'hsv_s': 0.7, # Saturation
'hsv_v': 0.4, # Value (luminosita)
'degrees': 30.0, # Rotazione
'translate': 0.1, # Traslazione
'scale': 0.5, # Scaling
'shear': 5.0,
'perspective': 0.0001,
'flipud': 0.3, # Flip verticale (utile per frutta)
'fliplr': 0.5, # Flip orizzontale
'mosaic': 1.0, # Mosaic augmentation (4 immagini)
'mixup': 0.1, # MixUp augmentation
'copy_paste': 0.1, # Copy-paste per classi rare
# Loss weights: aumenta peso classificazione per difetti rari
'cls': 1.5, # Classification loss weight (default 0.5)
'box': 7.5, # Box regression loss weight
'dfl': 1.5, # Distribution Focal Loss weight
# Valutazione e salvataggio
'val': True,
'save': True,
'save_period': 10, # Salva checkpoint ogni 10 epoche
'patience': 30, # Early stopping se no miglioramento per 30 epoche
# Performances
'workers': 8, # DataLoader workers
'cache': True, # Cache dataset in RAM per velocità
'amp': True, # Automatic Mixed Precision (FP16)
# Metriche
'plots': True, # Genera grafici di training
'verbose': True,
}
# Avvia training
print(f"\nAvvio training YOLO11 per food quality inspection")
print(f"Epoche: {epochs}, Batch: {batch_size}, Immagini: {image_size}x{image_size}")
results = model.train(**training_args)
# Estrai metriche finali
metrics = {
'mAP50': float(results.results_dict.get('metrics/mAP50(B)', 0)),
'mAP50_95': float(results.results_dict.get('metrics/mAP50-95(B)', 0)),
'precision': float(results.results_dict.get('metrics/precision(B)', 0)),
'recall': float(results.results_dict.get('metrics/recall(B)', 0)),
'best_model_path': str(Path(output_dir) / 'food_quality_v1' / 'weights' / 'best.pt')
}
print(f"\nTraining completato!")
print(f"mAP@50: {metrics['mAP50']:.4f}")
print(f"mAP@50-95: {metrics['mAP50_95']:.4f}")
print(f"Precision: {metrics['precision']:.4f}")
print(f"Recall: {metrics['recall']:.4f}")
print(f"Modello salvato in: {metrics['best_model_path']}")
return metrics
if __name__ == "__main__":
metrics = train_food_quality_model(
dataset_yaml="/data/food_quality_dataset/data.yaml",
output_dir="./runs/food_quality",
epochs=150,
batch_size=16,
image_size=640,
model_variant="yolo11m.pt"
)
Ray Tune によるハイパーパラメータ調整
# hyperparameter_tuning.py
# Ricerca automatica degli hyperparametri ottimali con Ray Tune
from ultralytics import YOLO
from ray import tune
from ray.tune.schedulers import ASHAScheduler
import torch
def objective(config: dict) -> dict:
"""Funzione obiettivo per Ray Tune."""
model = YOLO("yolo11m.pt")
results = model.train(
data="/data/food_quality_dataset/data.yaml",
epochs=50, # Epoche ridotte per tuning rapido
batch=config['batch'],
lr0=config['lr0'],
lrf=config['lrf'],
momentum=config['momentum'],
weight_decay=config['weight_decay'],
cls=config['cls'],
hsv_s=config['hsv_s'],
mixup=config['mixup'],
amp=True,
verbose=False,
plots=False,
)
mAP50 = results.results_dict.get('metrics/mAP50(B)', 0)
return {"mAP50": mAP50}
def run_hyperparameter_search(n_trials: int = 20) -> dict:
"""Esegue ricerca hyperparametri con ASHA scheduler."""
search_space = {
'batch': tune.choice([8, 16, 32]),
'lr0': tune.loguniform(1e-4, 1e-2),
'lrf': tune.uniform(0.001, 0.1),
'momentum': tune.uniform(0.85, 0.98),
'weight_decay': tune.loguniform(1e-5, 1e-3),
'cls': tune.uniform(0.5, 2.0),
'hsv_s': tune.uniform(0.4, 0.9),
'mixup': tune.uniform(0.0, 0.3),
}
scheduler = ASHAScheduler(
max_t=50,
grace_period=10,
reduction_factor=2
)
analysis = tune.run(
objective,
config=search_space,
num_samples=n_trials,
scheduler=scheduler,
metric="mAP50",
mode="max",
resources_per_trial={"cpu": 4, "gpu": 1},
)
best_config = analysis.best_config
print(f"Migliori hyperparametri trovati:")
for key, value in best_config.items():
print(f" {key}: {value}")
return best_config
食品 QC における検出 vs 分類 vs セグメンテーション
YOLO11 は、オブジェクト検出 (境界ボックス)、 分類 (画像クラス全体) とインスタンスのセグメンテーション (ピクセルレベルのマスク)。 どちらを選択するかは、欠陥の種類、必要な速度、および利用可能なハードウェアによって異なります。
いつどのアプローチを使用するか
| アプローチ | 出力 | 一般的なレイテンシー | 食品品質管理のユースケース | 長所/短所 |
|---|---|---|---|---|
| 分類 | 品格+自信 | 0.5~1.5ms | カテゴリー/品質による果物の分類。成熟 | 長所: 非常に速い。短所: 欠陥の位置特定ができない |
| 物体検出 | Bbox + クラス + パック | 1.5~5ミリ秒 | 同じ部品上の複数の欠陥の検出。異物 | 長所: 最適な速度と情報のバランス。短所: 正確な形状がない |
| セグメンテーション | ピクセルマスク + クラス | 3~15ミリ秒 | 欠陥領域を測定します。正確なサイズ制御。グレーディング | 長所: 詳細な情報。短所: 遅く、より複雑 |
| 姿勢推定 | キーポイント | 2~6ミリ秒 | ピックアンドプレイスロボットの部品の向き。カウント | 長所: 方向が正確です。短所: キーポイント データセットが必要 |
ほとんどの食品品質検査システムでは、物体検出 YOLO11 を使用すると、同じ製品の複数の欠陥を検出して位置を特定するのに最適な選択になります。 境界ボックスのサイズに基づいて重大度を定量化できます。 高速回線に対応したレイテンシーを維持します。セグメンテーションは正当化される グレーディングのために欠陥領域を正確に測定する必要がある場合 (例: 熱傷を表面 5% 未満の「軽度」、表面 15% を超える「重度」として分類します)。
品質指標: 食品業界向けの解釈
標準の CV 指標 (mAP、精度、リコール) には特定の意味があります 食品の文脈において、それを理解し、責任者に明確に伝える必要がある 本番稼働前の生産状況。
mAP (平均平均精度)
mAP は、すべてのクラスの適合率と再現率の曲線を要約します。 mAP@50 は 検出を考慮するための IoU (Intersection over Union) しきい値 0.5 正しいです。 mAP@50-95 の閾値の平均は 0.5 ~ 0.95 であり、より深刻です。 食品の QC では、精度が重要なため、通常、mAP@50 が主要な指標となります。 正確な位置特定 (IoU 0.95) の精度よりも重要ではありません。 欠陥の分類。
精度と再現率: 重要なトレードオフ
食品の品質管理では、精度とリコールには基本的なコストの非対称性があります。
- 低再現率 (偽陰性): を超える不良品 検査を受けて消費者に届きます。費用: 製品のリコール、製品への損害 ブランドの評判、健康被害の可能性。 受け入れられない 非常に重大な欠陥 (カビ、異物) の場合。
- 低精度 (誤検知): 準拠した製品が来る 捨てられた。コスト: 製品の損失、ラインのパフォーマンスの低下。 制限内で許容可能 製品の価値によります。
信頼閾値の調整は、信頼閾値を管理するための主要なメカニズムです
このトレードオフ: しきい値を下げると再現率が増加します (偽陰性が少なくなります)。
精度が低下します (誤検知が増加します)。異物などの重大な欠陥については、
より多くの誤検知を受け入れる非常に低いしきい値 (0.3 ~ 0.4) が設定されています。
などの不具合については、 size_defect、しきい値はさらに高くすることができます (0.6 ~ 0.7)。
# evaluate_model.py
# Valutazione completa del modello con metriche per food QC
from ultralytics import YOLO
import numpy as np
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
def evaluate_food_model(
model_path: str,
test_dataset_yaml: str,
class_names: list,
confidence_thresholds: dict # Soglie per classe
) -> dict:
"""
Valutazione completa con metriche specifiche per food quality inspection.
Args:
model_path: Path al modello addestrato (best.pt)
test_dataset_yaml: Path al data.yaml del test set
class_names: Nomi delle classi
confidence_thresholds: Soglie confidence per classe {"mold": 0.35, "ok": 0.6}
"""
model = YOLO(model_path)
# Validazione standard YOLO
results = model.val(
data=test_dataset_yaml,
split='test',
imgsz=640,
conf=0.001, # Basso per calcolare la curva completa
iou=0.6,
plots=True,
verbose=False,
)
# Metriche per classe
class_metrics = {}
for i, name in enumerate(class_names):
class_metrics[name] = {
'precision': float(results.box.p[i]) if i < len(results.box.p) else 0,
'recall': float(results.box.r[i]) if i < len(results.box.r) else 0,
'mAP50': float(results.box.ap50[i]) if i < len(results.box.ap50) else 0,
'mAP50_95': float(results.box.ap[i]) if i < len(results.box.ap) else 0,
}
# Soglie di accettabilita per l'industria alimentare
acceptance_thresholds = {
'mold': {'recall_min': 0.98, 'precision_min': 0.85},
'foreign_object': {'recall_min': 0.995, 'precision_min': 0.80},
'rot': {'recall_min': 0.97, 'precision_min': 0.87},
'bruise': {'recall_min': 0.90, 'precision_min': 0.85},
'burn': {'recall_min': 0.88, 'precision_min': 0.83},
'crack': {'recall_min': 0.93, 'precision_min': 0.85},
'insect_damage': {'recall_min': 0.92, 'precision_min': 0.84},
'size_defect': {'recall_min': 0.85, 'precision_min': 0.88},
'color_defect': {'recall_min': 0.85, 'precision_min': 0.88},
'ok': {'recall_min': 0.95, 'precision_min': 0.92},
}
# Verifica accettabilita
go_nogo_results = {}
for class_name, metrics in class_metrics.items():
thresholds = acceptance_thresholds.get(class_name, {})
recall_ok = metrics['recall'] >= thresholds.get('recall_min', 0.0)
precision_ok = metrics['precision'] >= thresholds.get('precision_min', 0.0)
go_nogo_results[class_name] = {
'go': recall_ok and precision_ok,
'recall_ok': recall_ok,
'precision_ok': precision_ok,
'recall': metrics['recall'],
'precision': metrics['precision'],
}
# Report finale
print("\n=== VALUTAZIONE FOOD QUALITY MODEL ===\n")
print(f"mAP@50 globale: {float(results.box.map50):.4f}")
print(f"mAP@50-95 globale: {float(results.box.map):.4f}")
print("\nRisultati per classe:")
print(f"{'Classe':-20} {'Recall':>8} {'Precision':>10} {'mAP50':>8} {'GO/NO-GO':>10}")
print("-" * 60)
for class_name, gng in go_nogo_results.items():
status = "GO ✓" if gng['go'] else "NO-GO ✗"
print(f"{class_name:-20} {gng['recall']:8.4f} {gng['precision']:10.4f} "
f"{class_metrics[class_name]['mAP50']:8.4f} {status:>10}")
overall_go = all(gng['go'] for gng in go_nogo_results.values())
print(f"\nVALUTAZIONE FINALE: {'SISTEMA APPROVATO PER DEPLOY' if overall_go else 'NON APPROVATO - RICHIEDE MIGLIORAMENTI'}")
return {
'class_metrics': class_metrics,
'go_nogo': go_nogo_results,
'overall_go': overall_go,
'global_mAP50': float(results.box.map50),
}
# Esecuzione valutazione
if __name__ == "__main__":
class_names = ['ok', 'mold', 'bruise', 'burn', 'crack',
'foreign_object', 'rot', 'insect_damage', 'size_defect', 'color_defect']
confidence_thresholds = {
'mold': 0.35,
'foreign_object': 0.30,
'rot': 0.40,
'bruise': 0.50,
'burn': 0.50,
'crack': 0.45,
'insect_damage': 0.45,
'size_defect': 0.60,
'color_defect': 0.60,
'ok': 0.60,
}
results = evaluate_food_model(
model_path="./runs/food_quality/food_quality_v1/weights/best.pt",
test_dataset_yaml="/data/food_quality_dataset/data.yaml",
class_names=class_names,
confidence_thresholds=confidence_thresholds
)
工業用検査ラインのハードウェア アーキテクチャ
食品品質検査用の産業用ビジョン システムは単なるソフトウェアではありません。 ハードウェアはモデルと同じくらい基本的なものです。オプティカルチェーン(レンズ、 センサー、照明)によって画質が決まり、 最高の AI モデルによる後処理で低品質を保存することはできません。
検査ラインのコンポーネント
完全な検査ラインには 4 つの異なるゾーンが含まれます。 画像、AI処理、判断、仕分け。これらの領域間の統合 ハードウェア信号 (エンコーダー、トリガー、PLC) および通信を介して発生します 産業用 (Ethernet/IP、PROFINET、OPC-UA)。
食品品質検査用ハードウェアの比較
| 成分 | エントリーレベル | プロ | 高性能 |
|---|---|---|---|
| 部屋 | USB3 ビジョン 5MP (Basler ace) | GigE ビジョン 12MP (Basler ace2) | CoaXPress 25MP (Allied Vision Goldeye) |
| フレームレート | 30~60FPS | 100~200FPS | 300-500FPS |
| 保護 | IP40(オフィス) | IP67(飛沫防水) | IP69K(高圧ジェット) |
| 点灯 | 汎用 LED リング | コントローラー付きLEDバー | プログラム可能な LED + IR ストロボ |
| GPU推論 | NVIDIA Jetson Orin NX (16GB) | Hailo-8 + 産業用 CPU | 産業用 PC の RTX 4080 |
| 推論レイテンシ | 8~15ミリ秒(ジェットソン) | 2~5ミリ秒(Hailo-8) | 1~3ms (RTX 4080) |
| システムコスト | 5,000~15,000ユーロ | 20,000~50,000ユーロ | 60,000~150,000ユーロ |
| 最大スループット | 3~5個/秒 | 10~20個/秒 | 30~60個/秒 |
GigE ビジョン: 産業用通信規格
GigE Vision (GenICam) はカメラ間通信の業界標準です ギガビットイーサネット経由の処理システム。 USB3 に対する利点: ケーブル長は延長器なしで最大 100 メートル、PoE (Power over Ethernet) サポート、 Precision Time Protocol (PTP) による確定的遅延、マルチカメラオン シングルスイッチ。構成にはジャンボ フレームを備えた専用の NIC が必要です 有効 (MTU 9000) と取得プロセスの CPU アフィニティ。
トリガーと同期システム
ベルトコンベアと画像取得と批評の同期 モーションブラーや製品画像の途中を避けるため。通常は A が使用されます ベルトに接続されたロータリーエンコーダーは、N mm 進むごとにパルスを生成します。 PLC (プログラマブル ロジック コントローラー) はパルスを受信し、信号を生成します。 GPIO 経由のカメラのハードウェア トリガー。ハードウェア トリガーは不可欠です。 ソフトウェア トリガーにより、高速テープでは 1 ~ 5 ミリ秒のジッターが発生します。 許容できない位置ずれが生じます。
# camera_gige_acquisition.py
# Acquisizione immagini da camera GigE Vision con trigger hardware
import pypylon.pylon as pylon
import numpy as np
import cv2
import threading
import queue
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class AcquiredFrame:
"""Frame acquisito dalla camera con metadati."""
image: np.ndarray
timestamp_ns: int
frame_id: int
trigger_counter: int
class GigEVisionCamera:
"""
Wrapper per camera GigE Vision via Basler pylibpylon.
Gestisce trigger hardware, acquisizione e buffer.
"""
def __init__(
self,
device_index: int = 0,
trigger_mode: str = "Line1", # Trigger hardware su Line1
exposure_us: int = 500, # 500 microsec: congela il moto a 2m/s
gain_db: float = 6.0,
buffer_count: int = 10,
) -> None:
self._device_index = device_index
self._trigger_mode = trigger_mode
self._exposure_us = exposure_us
self._gain_db = gain_db
self._buffer_count = buffer_count
self._camera: Optional[pylon.InstantCamera] = None
self._frame_queue: queue.Queue = queue.Queue(maxsize=100)
self._acquiring = False
self._frame_counter = 0
def connect(self) -> None:
"""Connette e configura la camera GigE Vision."""
transport_factory = pylon.TlFactory.GetInstance()
devices = transport_factory.EnumerateDevices()
if len(devices) == 0:
raise RuntimeError("Nessuna camera GigE Vision trovata")
if self._device_index >= len(devices):
raise RuntimeError(f"Camera index {self._device_index} non disponibile")
self._camera = pylon.InstantCamera(
transport_factory.CreateDevice(devices[self._device_index])
)
self._camera.Open()
# Configurazione trigger hardware
self._camera.TriggerMode.SetValue("On")
self._camera.TriggerSource.SetValue(self._trigger_mode)
self._camera.TriggerActivation.SetValue("RisingEdge")
# Configurazione esposizione
self._camera.ExposureTime.SetValue(self._exposure_us)
self._camera.Gain.SetValue(self._gain_db)
# Pixel format: Mono8 per velocità massima, BayerRG8 per colore
self._camera.PixelFormat.SetValue("BayerRG8")
# Buffer pool
self._camera.MaxNumBuffer.SetValue(self._buffer_count)
print(f"Camera connessa: {self._camera.DeviceModelName.GetValue()}")
print(f"Risoluzione: {self._camera.Width.GetValue()}x{self._camera.Height.GetValue()}")
print(f"Frame rate max: {self._camera.AcquisitionFrameRate.GetValue():.1f} FPS")
def start_acquisition(self) -> None:
"""Avvia acquisizione continua in thread separato."""
if self._camera is None:
raise RuntimeError("Camera non connessa")
self._acquiring = True
self._camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
acquisition_thread = threading.Thread(
target=self._acquisition_loop,
daemon=True
)
acquisition_thread.start()
print("Acquisizione avviata in modalità trigger hardware")
def _acquisition_loop(self) -> None:
"""Loop di acquisizione continua."""
while self._acquiring and self._camera.IsGrabbing():
try:
grab_result = self._camera.RetrieveResult(
5000, # Timeout 5 secondi
pylon.TimeoutHandling_ThrowException
)
if grab_result.GrabSucceeded():
# Conversione immagine
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converted = converter.Convert(grab_result)
image = converted.GetArray()
frame = AcquiredFrame(
image=image.copy(),
timestamp_ns=grab_result.TimeStamp,
frame_id=grab_result.ImageNumber,
trigger_counter=self._frame_counter
)
# Non-blocking: scarta se coda piena (priorità ai frame più recenti)
try:
self._frame_queue.put_nowait(frame)
except queue.Full:
# Scarta frame vecchio, inserisci nuovo
try:
self._frame_queue.get_nowait()
except queue.Empty:
pass
self._frame_queue.put_nowait(frame)
self._frame_counter += 1
grab_result.Release()
except pylon.TimeoutException:
pass # Nessun trigger ricevuto nel timeout, normale
except Exception as e:
print(f"Errore acquisizione: {e}")
def get_frame(self, timeout: float = 1.0) -> Optional[AcquiredFrame]:
"""Recupera il prossimo frame dalla coda."""
try:
return self._frame_queue.get(timeout=timeout)
except queue.Empty:
return None
def stop(self) -> None:
"""Ferma acquisizione e disconnette la camera."""
self._acquiring = False
if self._camera and self._camera.IsGrabbing():
self._camera.StopGrabbing()
if self._camera:
self._camera.Close()
print("Camera disconnessa")
自動仕分けシステム:PLCとアクチュエータとの統合
視覚システムの出力は物理的な動作、つまり排出に変換される必要があります。 ラインからの不良品の発生。これは空気圧アクチュエータによって発生します (圧縮空気ノズル) またはピック アンド プレース ロボット、上の PLC によって制御されます。 AI システムの決定の基礎。
仕分けシステムのタイミング
タイミングが重要です。システムは、次の瞬間から計算を開始する必要があります。 トリガー(画像取得)、製品が到着する正確な時間 排出ステーションに送信し、ミリ秒の精度でアクチュエータを制御します。 タイムチェーンには、取得時間、AI 推論時間、 PLCへの通信時間、空圧アクチュエータの応答時間 (通常、バルブ開く遅延を含めて 30 ~ 80 ミリ秒)。
# sorting_system.py
# Sistema di sorting integrato con PLC via OPC-UA
import asyncio
import asyncua
from ultralytics import YOLO
import numpy as np
import time
from dataclasses import dataclass
from typing import Optional
from enum import IntEnum
class SortingDecision(IntEnum):
"""Decisioni di sorting basate sulla criticita del difetto."""
ACCEPT = 0 # Prodotto conforme: procede
REJECT_DEFECT = 1 # Difetto standard: canale scarto generico
REJECT_CRITICAL = 2 # Difetto critico (corpo estraneo, muffa): canale separato
REINSPECT = 3 # Bassa confidence: re-ispezione manuale
@dataclass
class InspectionResult:
"""Risultato ispezione con decisione di sorting."""
frame_id: int
timestamp_ms: float
decision: SortingDecision
primary_defect: Optional[str]
confidence: float
defect_count: int
inference_time_ms: float
class FoodInspectionEngine:
"""
Engine principale di ispezione: integra vision AI e logica di sorting.
"""
# Classificazione criticita difetti
CRITICAL_DEFECTS = {'foreign_object', 'mold', 'rot'}
STANDARD_DEFECTS = {'bruise', 'burn', 'crack', 'insect_damage'}
QUALITY_DEFECTS = {'size_defect', 'color_defect'}
# Soglie confidence per classe
CLASS_THRESHOLDS = {
'foreign_object': 0.30,
'mold': 0.35,
'rot': 0.38,
'crack': 0.45,
'bruise': 0.50,
'insect_damage': 0.48,
'burn': 0.50,
'size_defect': 0.60,
'color_defect': 0.60,
'ok': 0.60,
}
def __init__(self, model_path: str) -> None:
self._model = YOLO(model_path)
self._class_names = self._model.names
self._inspection_count = 0
self._defect_count = 0
self._reject_count = 0
def inspect(self, image: np.ndarray, frame_id: int) -> InspectionResult:
"""
Esegue l'ispezione AI su un frame e restituisce la decisione di sorting.
"""
start_time = time.perf_counter()
# Inference YOLO11
results = self._model(
image,
conf=0.25, # Soglia bassa: filtraggio per classe dopo
iou=0.45,
verbose=False,
half=True, # FP16 per velocità
)
inference_time_ms = (time.perf_counter() - start_time) * 1000
# Analisi detections
detections = []
for result in results:
if result.boxes is None:
continue
for box in result.boxes:
class_id = int(box.cls[0])
class_name = self._class_names[class_id]
confidence = float(box.conf[0])
# Applica soglia per classe
class_threshold = self.CLASS_THRESHOLDS.get(class_name, 0.5)
if confidence >= class_threshold and class_name != 'ok':
detections.append({
'class': class_name,
'confidence': confidence,
'bbox': box.xyxy[0].tolist(),
})
# Logica di decisione sorting
decision, primary_defect, max_confidence = self._make_sorting_decision(detections)
self._inspection_count += 1
if decision != SortingDecision.ACCEPT:
self._reject_count += 1
return InspectionResult(
frame_id=frame_id,
timestamp_ms=time.time() * 1000,
decision=decision,
primary_defect=primary_defect,
confidence=max_confidence,
defect_count=len(detections),
inference_time_ms=inference_time_ms,
)
def _make_sorting_decision(
self,
detections: list
) -> tuple[SortingDecision, Optional[str], float]:
"""
Logica di decisione sorting basata sui difetti rilevati.
Priorità: REJECT_CRITICAL > REJECT_DEFECT > REINSPECT > ACCEPT
"""
if not detections:
return SortingDecision.ACCEPT, None, 0.0
# Verifica presenza difetti critici (priorità assoluta)
critical_detections = [
d for d in detections
if d['class'] in self.CRITICAL_DEFECTS
]
if critical_detections:
primary = max(critical_detections, key=lambda x: x['confidence'])
return SortingDecision.REJECT_CRITICAL, primary['class'], primary['confidence']
# Verifica difetti standard
standard_detections = [
d for d in detections
if d['class'] in self.STANDARD_DEFECTS
]
if standard_detections:
primary = max(standard_detections, key=lambda x: x['confidence'])
# Se confidence bassa (0.45-0.55) su difetto standard: reinspect
if primary['confidence'] < 0.55:
return SortingDecision.REINSPECT, primary['class'], primary['confidence']
return SortingDecision.REJECT_DEFECT, primary['class'], primary['confidence']
# Solo difetti qualità
quality_detections = [
d for d in detections
if d['class'] in self.QUALITY_DEFECTS
]
if quality_detections:
primary = max(quality_detections, key=lambda x: x['confidence'])
return SortingDecision.REJECT_DEFECT, primary['class'], primary['confidence']
return SortingDecision.ACCEPT, None, 0.0
def get_statistics(self) -> dict:
"""Statistiche di ispezione in tempo reale."""
reject_rate = self._reject_count / max(self._inspection_count, 1) * 100
return {
'total_inspected': self._inspection_count,
'total_rejected': self._reject_count,
'reject_rate_pct': round(reject_rate, 2),
'throughput': self._inspection_count, # Da normalizzare nel tempo
}
class PLCInterface:
"""
Interfaccia OPC-UA verso PLC Siemens/Beckhoff per controllo attuatori.
"""
# Indirizzi OPC-UA nodi PLC (configurati nel TIA Portal o TwinCAT)
SORT_COMMAND_NODE = "ns=2;s=FoodLine.Sort.Command"
SORT_DELAY_MS_NODE = "ns=2;s=FoodLine.Sort.DelayMs"
CONVEYOR_SPEED_NODE = "ns=2;s=FoodLine.Conveyor.SpeedMps"
INSPECTION_TO_EJECTOR_MM = 450.0 # Distanza fisica stazione ispezione -> espulsore
def __init__(self, plc_url: str = "opc.tcp://192.168.1.100:4840") -> None:
self._plc_url = plc_url
self._client: Optional[asyncua.Client] = None
async def connect(self) -> None:
"""Connette al PLC via OPC-UA."""
self._client = asyncua.Client(url=self._plc_url)
await self._client.connect()
print(f"Connesso al PLC: {self._plc_url}")
async def get_conveyor_speed(self) -> float:
"""Legge velocità nastro in m/s dal PLC."""
node = self._client.get_node(self.CONVEYOR_SPEED_NODE)
return await node.read_value()
async def send_sort_command(
self,
decision: SortingDecision,
conveyor_speed_mps: float
) -> None:
"""
Invia comando di sorting al PLC con delay calcolato.
Il delay compensa il ritardo di trasporto dalla stazione di ispezione
all'attuatore di espulsione.
"""
if decision == SortingDecision.ACCEPT:
return # Nessuna azione necessaria
# Calcola delay: distanza / velocità - anticipo attuatore
transport_delay_ms = (self.INSPECTION_TO_EJECTOR_MM / 1000) / conveyor_speed_mps * 1000
actuator_lead_ms = 60 # Il valvola pneumatica richiede ~60ms per aprirsi
total_delay_ms = max(0, int(transport_delay_ms - actuator_lead_ms))
# Codice comando per PLC
plc_command = 1 if decision in [SortingDecision.REJECT_DEFECT, SortingDecision.REJECT_CRITICAL] else 0
# Scrivi delay e comando
delay_node = self._client.get_node(self.SORT_DELAY_MS_NODE)
command_node = self._client.get_node(self.SORT_COMMAND_NODE)
await delay_node.write_value(total_delay_ms)
await command_node.write_value(plc_command)
async def disconnect(self) -> None:
"""Disconnette dal PLC."""
if self._client:
await self._client.disconnect()
エッジへの導入: 産業用ハードウェアの最適化
トレーニングされたモデルは、ターゲット ハードウェアへの展開用に最適化する必要があります。 YOLO11 は、レイテンシを削減できる複数のエクスポート形式をサポートしています ネイティブ PyTorch モデルと比較して 30 ~ 70% 減少します。
# deploy_optimization.py
# Export e ottimizzazione modello per deploy industriale
from ultralytics import YOLO
import torch
import time
import numpy as np
def export_optimized_model(
model_path: str,
target_hardware: str = "tensorrt", # tensorrt, openvino, onnx, hailo
image_size: int = 640,
batch_size: int = 1,
) -> str:
"""
Esporta il modello YOLO11 nel formato ottimizzato per l'hardware target.
Formati supportati per applicazioni industriali:
- TensorRT (NVIDIA GPU): massima velocità su CUDA hardware
- OpenVINO (Intel CPU/iGPU): ottimale per sistemi embedded Intel
- ONNX (universale): compatibile con ONNX Runtime su qualsiasi hardware
- Hailo: formato proprietario per chip Hailo-8/Hailo-15
"""
model = YOLO(model_path)
export_args = {
'format': target_hardware,
'imgsz': image_size,
'batch': batch_size,
'half': True, # FP16: dimezza memoria, +30% velocità
'int8': False, # INT8 richiede calibration dataset
'simplify': True, # Semplifica grafo ONNX
'dynamic': False, # Batch size fisso per latenza deterministica
'verbose': False,
}
if target_hardware == "tensorrt":
export_args.update({
'workspace': 4, # GB di workspace TensorRT
'device': '0', # GPU 0
})
exported_path = model.export(**export_args)
print(f"Modello esportato: {exported_path}")
return str(exported_path)
def benchmark_inference(model_path: str, n_iterations: int = 1000) -> dict:
"""
Benchmark di latenza per confronto tra formati export.
"""
model = YOLO(model_path)
# Immagine di test casuale (simula frame camera)
dummy_image = np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8)
# Warmup
for _ in range(50):
model(dummy_image, verbose=False)
# Benchmark
latencies = []
for _ in range(n_iterations):
start = time.perf_counter()
model(dummy_image, verbose=False, half=True)
latencies.append((time.perf_counter() - start) * 1000)
latencies = np.array(latencies)
results = {
'mean_ms': float(np.mean(latencies)),
'median_ms': float(np.median(latencies)),
'p95_ms': float(np.percentile(latencies, 95)),
'p99_ms': float(np.percentile(latencies, 99)),
'max_fps': round(1000 / np.mean(latencies), 1),
}
print(f"\nBenchmark {n_iterations} iterazioni:")
print(f" Latenza media: {results['mean_ms']:.2f} ms")
print(f" Latenza P95: {results['p95_ms']:.2f} ms")
print(f" Latenza P99: {results['p99_ms']:.2f} ms")
print(f" FPS massimo: {results['max_fps']:.1f} FPS")
return results
# Esempio: confronto latenze per hardware diversi
# Risultati tipici su linea produttiva reale:
# PyTorch FP32: 12.4 ms -> 80 FPS
# PyTorch FP16: 7.1 ms -> 140 FPS
# TensorRT FP16: 2.8 ms -> 357 FPS
# TensorRT INT8: 1.9 ms -> 526 FPS
# ONNX Runtime: 5.2 ms -> 192 FPS
# OpenVINO: 3.8 ms -> 263 FPS
ケーススタディ: YOLO11 を使用した果物選別ライン
協同組合向けのビジョンシステムの実際の導入について報告します。 南イタリアの果物農園、容量のあるリンゴとオレンジの包装ライン の チャンネルごとに 1 秒あたり 10 個、3 つの並列チャネル、動作可能 収穫期(10月~1月)は24時間営業。
システム仕様
- 製品: りんご(ふじ、ガーラ、ゴールデン)、みかん(タロッコ、ネーブル)
- ライン: 3 チャンネル各 10 個/秒 = 合計 30 個/秒
- 部屋: Basler ace2 GigE、12MP、200 FPS、IP67、同軸 LED 照明
- ハードウェア推論: NVIDIA Jetson AGX Orin 64GB (チャネルごとに 1 つ)
- アクチュエーター: チャンネルごとに 6 bar の 3 つの空気圧ノズル (クリティカル/欠陥/再検査)
- PLC: OPC-UA 通信を備えた Siemens S7-1500
- 検出されたクラス: OK、カビ、打撲傷、火傷、サイズ欠陥、異物オブジェクト
データセットとトレーニング
- データセット: 3 つの収集シーズン (2022 ~ 2024 年) で収集された合計 28,400 枚の画像
- アノテーション: 4 人のアノテーターによる CVAT、あいまいなクラスのコンセンサス投票
- 拡張: 気象条件シミュレーションを備えたカスタム アルバム パイプライン
- モデル: AWS p3.2xlarge (Tesla V100) で 120 エポックでトレーニングされた YOLO11m
- トレーニング時間: 4.7時間
生産実績
本番環境のシステムメトリクス (平均シーズン 2024 ~ 2025)
| メトリック | 客観的 | 結果 | 評価 |
|---|---|---|---|
| mAP@50 グローバル | > 90% | 93.7% | 素晴らしい |
| リコール金型 | > 98% | 98.4% | 承認された |
| 外部オブジェクトの呼び出し | > 99.5% | 99.6% | 承認された |
| あざを思い出す | > 90% | 91.8% | 承認された |
| 精度はまあまあ | > 92% | 94.2% | 素晴らしい |
| 推論レイテンシ | < 8 ミリ秒 | 6.3ミリ秒(ジェットソンAGX) | 承認された |
| 総システム遅延 | < 80 ミリ秒 | 71ミリ秒 | 承認された |
| チャネルごとのスループット | 10個/秒 | 10.0個/秒 | 到達しました |
| 誤検知拒否率 | < 3% | 2.1% | 素晴らしい |
| システム稼働時間 | > 99% | 99.7% | 素晴らしい |
ROIと経済効果
このシステムは、コストをかけて 6 人の手動検査員 (シフトごとに 2 人 x 3 シフト) を置き換えました。 のインストールの 145,000ユーロ (ハードウェア、ソフトウェア、統合、 トレーニング、試運転)。計算された経済的利益:
- 検査要員の削減: 180,000 ユーロ/年 (諸費用を含む総額)
- 不適合の見逃しの削減: 45,000 ユーロ/年 (リコール回避、制裁)
- 手動検査と比較して誤検知の削減: 28,000 ユーロ/年 (回収製品)
- ROI の回収: 8.5ヶ月
- 3 年間の ROI: 478%
食品環境におけるビジョンシステムの規制と認証
認定食品ラインに設置された産業用ビジョンシステム AI のパフォーマンスだけを超えた特定の規制要件に準拠する必要があります。 これらの要件を遵守しないと、プラントの認証が損なわれる可能性があります。
物理的要件: IP69K および食品グレードの材料
食品環境用の工業用チャンバーは評価される必要があります IP69K: 粉塵 (6) の侵入および噴流水に対する完全な保護 高圧 (9K) - 80 度、100 バールの高圧洗浄機による洗浄は、 多くの施設で標準となっています。食品と接触する材料 (カバー、マウント、サポート) が入っている必要があります AISI 316L 鋼 または FDA/EC 10/2011 認定ポリマー材料。
ケーブルと接続は食品環境向けに認定されている必要があります (TPU カバー) 洗剤の酸に対する耐性、M12 IP67+ コネクタ)。産業用PC 加工はラインから離れた IP65 錫メッキキャビネットに収容されます。 シールドされた GigE ケーブルを介してカメラにビデオ/信号を出力します。
IFS Food と BRC: 自動検査システムの要件
標準 IFSフード8 (国際的な注目の規格) e BRC 世界基準の食品安全問題 9 そのシステムを必要とする 自動検査の対象となるのは次のとおりです。
- 検出仕様 (クラス、しきい値、対象製品) が文書化されています。
- 既知の参照サンプルを使用して定期的に校正されます (チャレンジテスト)
- 文書化されたプロトコル (OQ/PQ) による初期検証の対象
- リスクに応じて CCP または OPRP として HACCP 計画に統合
- 故障警報システム搭載(AIシステムがオフラインの場合ライン停止)
- 文書化された予防メンテナンス(レンズのクリーニング、カメラのキャリブレーション)の対象となります。
HACCP: CCP としてのビジョン システム
異物(金属、プラスチック、ガラス、石)の検出に、 ビジョンシステムは次のように認定できます。 重要管理点 (CCP) HACCP 計画では、従来の金属探知機を置き換えるか、従来の金属探知機と併用します。 これには、システムの能力を実証する科学的な検証が必要です 最小サイズの異物の種類をクリティカルとして安定して検出する (通常、金属の場合は 2 ~ 3 mm、プラスチックの場合は 5 ~ 10 mm)。
注意: 異物視覚システムの制限事項
ビジョンシステムは異物のみを検出します 表面に見える。 製品内部に異物が混入している(例:トマト内の金属) それらは光学カメラには見えません。内部異物の検出には、 誘導金属探知機またはX線システムは引き続き必須であり、 視覚システムを補完します。
システム検証: OQ および PQ
# validation_protocol.py
# Protocollo di validazione OQ/PQ per sistema vision alimentare
import json
import datetime
from dataclasses import dataclass, field, asdict
from typing import Optional
from ultralytics import YOLO
import numpy as np
@dataclass
class ChallengeTestResult:
"""Risultato di un singolo challenge test."""
defect_class: str
defect_severity: str # 'mild', 'moderate', 'severe'
n_samples: int
detected_correctly: int
false_positives_on_ok: int
recall: float
precision: float
pass_fail: str
@dataclass
class ValidationReport:
"""Report di validazione OQ/PQ del sistema vision."""
system_id: str
product_type: str
validation_date: str
model_version: str
hardware_config: dict
challenge_results: list[ChallengeTestResult] = field(default_factory=list)
overall_result: str = "PENDING"
validated_by: str = ""
notes: str = ""
def to_json(self, output_path: str) -> None:
"""Esporta il report in formato JSON per documentazione normativa."""
report_dict = asdict(self)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report_dict, f, indent=2, ensure_ascii=False)
print(f"Report salvato: {output_path}")
def run_challenge_test(
model: YOLO,
challenge_images_dir: str,
defect_class: str,
severity: str,
n_ok_images: int = 50,
acceptance_recall: float = 0.95,
acceptance_precision: float = 0.85,
) -> ChallengeTestResult:
"""
Esegue un challenge test su campioni noti.
I challenge samples sono immagini acquisite dalla linea reale,
etichettate da ispettori esperti, con difetti di gravita nota.
Sono conservati fisicamente (campioni di riferimento) e fotografati
in condizioni di linea controllate.
"""
import glob
import os
# Immagini con difetto della classe specificata
defect_images = glob.glob(
os.path.join(challenge_images_dir, defect_class, severity, "*.jpg")
)
# Immagini ok di riferimento
ok_images = glob.glob(
os.path.join(challenge_images_dir, "ok", "*.jpg")
)[:n_ok_images]
detected = 0
total_defect = len(defect_images)
for img_path in defect_images:
result = model(img_path, verbose=False)
detections = [
r for r in result[0].boxes
if model.names[int(r.cls[0])] == defect_class
and float(r.conf[0]) >= 0.35 # Soglia del sistema
]
if len(detections) > 0:
detected += 1
# Falsi positivi su immagini ok
fp_count = 0
for img_path in ok_images:
result = model(img_path, verbose=False)
fp_detections = [
r for r in result[0].boxes
if model.names[int(r.cls[0])] == defect_class
and float(r.conf[0]) >= 0.35
]
if len(fp_detections) > 0:
fp_count += 1
recall = detected / max(total_defect, 1)
precision = detected / max(detected + fp_count, 1)
passed = recall >= acceptance_recall and precision >= acceptance_precision
return ChallengeTestResult(
defect_class=defect_class,
defect_severity=severity,
n_samples=total_defect,
detected_correctly=detected,
false_positives_on_ok=fp_count,
recall=round(recall, 4),
precision=round(precision, 4),
pass_fail="PASS" if passed else "FAIL"
)
本番環境でのモニタリングとフィードバック ループ
本番環境の AI ビジョン システムは静的な成果物ではありません: 条件 ライン変化(商品の季節性、照明の擦れ、汚れ等) レンズ上)、モデルのパフォーマンスを継続的に監視する必要があります 品質に影響を与える前にモデルのドリフトを検出します。
# production_monitor.py
# Monitoring continuo del sistema vision in produzione
import sqlite3
import time
import json
from collections import deque
from typing import Optional
from dataclasses import dataclass
@dataclass
class ProductionStats:
"""Statistiche di produzione per monitoraggio."""
timestamp: str
window_minutes: int
total_inspected: int
reject_rate_pct: float
defect_distribution: dict
avg_inference_ms: float
p99_inference_ms: float
alert_triggered: bool
alert_reason: Optional[str]
class ProductionMonitor:
"""
Monitor continuo per sistema vision alimentare.
Rileva anomalie statistiche che indicano degradazione del modello.
"""
# Soglie di alert
MAX_REJECT_RATE_PCT = 15.0 # >15% scarto e anomalo
MIN_REJECT_RATE_PCT = 0.1 # <0.1% potrebbe indicare modello non funzionante
MAX_INFERENCE_P99_MS = 15.0 # Latenza P99 non deve superare 15ms
MAX_CONSECUTIVE_ACCEPTS = 500 # 500 ok di fila = modello probabilmente bloccato
def __init__(self, db_path: str = "/data/production_log.db") -> None:
self._db_path = db_path
self._inspection_buffer: deque = deque(maxlen=10000)
self._consecutive_accepts = 0
self._init_db()
def _init_db(self) -> None:
"""Inizializza database SQLite per log produzioni."""
with sqlite3.connect(self._db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS inspections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
frame_id INTEGER,
decision INTEGER,
primary_defect TEXT,
confidence REAL,
inference_ms REAL,
line_speed_mps REAL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
alert_type TEXT,
details TEXT,
acknowledged INTEGER DEFAULT 0
)
""")
def log_inspection(self, result: dict) -> Optional[str]:
"""
Logga un'ispezione e verifica anomalie.
Ritorna il motivo dell'alert se presente, None altrimenti.
"""
self._inspection_buffer.append(result)
# Tracking accept consecutivi
if result['decision'] == 0: # ACCEPT
self._consecutive_accepts += 1
else:
self._consecutive_accepts = 0
# Alert: troppi accept consecutivi (modello potrebbe essere bloccato)
if self._consecutive_accepts >= self.MAX_CONSECUTIVE_ACCEPTS:
alert_msg = (
f"Alert: {self._consecutive_accepts} accept consecutivi. "
f"Verificare funzionamento sistema vision."
)
self._log_alert("CONSECUTIVE_ACCEPTS", alert_msg)
self._consecutive_accepts = 0 # Reset dopo alert
return alert_msg
# Controlla reject rate su finestra recente (ultimi 100 pezzi)
if len(self._inspection_buffer) >= 100:
recent = list(self._inspection_buffer)[-100:]
reject_count = sum(1 for r in recent if r['decision'] != 0)
reject_rate = reject_count / len(recent) * 100
if reject_rate > self.MAX_REJECT_RATE_PCT:
alert_msg = f"Alert: tasso scarto {reject_rate:.1f}% (soglia: {self.MAX_REJECT_RATE_PCT}%)"
self._log_alert("HIGH_REJECT_RATE", alert_msg)
return alert_msg
# Controlla latenza
if result.get('inference_ms', 0) > self.MAX_INFERENCE_P99_MS:
alert_msg = f"Alert: latenza inference {result['inference_ms']:.1f}ms supera soglia"
self._log_alert("HIGH_LATENCY", alert_msg)
return alert_msg
return None
def _log_alert(self, alert_type: str, details: str) -> None:
"""Logga alert nel database."""
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"INSERT INTO alerts (timestamp, alert_type, details) VALUES (?, ?, ?)",
(time.time(), alert_type, details)
)
print(f"[ALERT] {alert_type}: {details}")
def get_hourly_report(self) -> dict:
"""Genera report orario delle prestazioni di produzione."""
recent = list(self._inspection_buffer)
if not recent:
return {}
total = len(recent)
rejects = sum(1 for r in recent if r['decision'] != 0)
latencies = [r.get('inference_ms', 0) for r in recent]
defect_dist = {}
for r in recent:
defect = r.get('primary_defect') or 'ok'
defect_dist[defect] = defect_dist.get(defect, 0) + 1
return {
'total_inspected': total,
'reject_rate_pct': round(rejects / total * 100, 2),
'defect_distribution': defect_dist,
'avg_inference_ms': round(sum(latencies) / len(latencies), 2),
'p99_inference_ms': round(sorted(latencies)[int(len(latencies) * 0.99)], 2),
}
ベストプラクティスとアンチパターン
フードビジョンシステムのベストプラクティス
- モデルの前の照明: 予算の 30% を投資 高品質の構造化された照明のハードウェア。で取得されたデータセット 安定した一貫した照明により、サンプル数が 40% 削減されます。 同じパフォーマンスを達成するために必要です。
- 豊富な陰性サンプル: データセットには次のものが含まれている必要があります 少なくとも 3 倍以上の「OK」画像が欠陥よりも多く、OK 画像がカバーする必要がある 製品のすべての自然な変動性(さまざまな年齢、サイズ、品種)。
- 毎月のチャレンジテスト: 正式なチャレンジテストを実施する 毎月、次のような原因によるパターンのドリフトを検出することが知られている物理サンプルを使用します。 季節ごとに商品が変わります。
- タイムアウトとフォールバック: AI システムに 2 倍以上の時間がかかる場合 公称レイテンシでは、画像が無効であるとみなされ、商品が送信されます。 手動再検査チャネルに転送します。
- 各フレームをログに記録します。 画像とそれぞれの結果を保存します 少なくとも72時間の検査。インシデント後のデバッグに不可欠です そして新しいトレーニングサンプルを収集するためです。
- ハードウェアの冗長性: 重要な回線については、1 つインストールしてください バックアップルームはプライマリルームと同様に構成され、自動切り替えが行われます 失敗した場合。
避けるべきアンチパターン
- スマートフォン画像を使ったトレーニング: 撮影した画像 研究室でスマートフォンを使用することは実際の状況を表すものではありません ラインの。データセットを収集するには、常に最終工業用チャンバーを使用してください。
- すべてのクラスに対する単一の信頼しきい値: しきい値 ユニフォームは最も代表的なクラスを優先します。クラスごとにしきい値を使用し、 欠陥の重大度に基づいて調整されます。
- シャドウ モード期間なしで展開: 確認する前に AI に移行するには、少なくとも手動検査と並行してシステムを実行します。 2 週間、意思決定を比較します。過剰な誤検知を修正する 稼働前に。
- モデルのドリフトを無視する: モデルのパフォーマンスが低下する 製品のばらつきによる経年変化、照明の消耗、汚れ レンズの上に。積極的な監視がなければ、劣化は静かに進行し、危険です。
- 災害復旧計画がない: AIシステムに障害が発生すると、 代替計画が必要です (手動検査、回線速度の低下)。 文書化され、定期的にテストされます。
結論と次のステップ
La computer vision con YOLO11 rappresenta oggi la tecnologia più matura e accessibile per il controllo qualità automatico nell'industria alimentare. Non e più una tecnologia da laboratorio: sistemi come quello descritto nel case study sono operativi in centinaia di stabilimenti globali, con risultati documentati di accuratezza superiore al 98%, ROI sotto i 12 mesi e uptime del 99%+.
Il mercato AI per la sicurezza e la qualità alimentare crescera da 2.7 a 13.7 miliardi di dollari entro il 2030 (CAGR 30.9%), trainato da requisiti normativi sempre più stringenti, carenza di manodopera specializzata e aumento dei costi dei recall. Le aziende alimentari che investono oggi in sistemi di vision AI si posizionano con un vantaggio competitivo strutturale difficile da colmare in seguito.
Il percorso tecnico descritto in questo articolo, dalla raccolta del dataset all'annotazione con CVAT/Roboflow, dal training YOLO11 alla validazione con challenge test, fino al deploy con integrazione PLC e monitoring continuo, e applicabile a qualsiasi linea produttiva alimentare con gli adattamenti necessari al prodotto specifico.
Checklist per Iniziare il Tuo Progetto di Food Vision
- Definisci le classi di difetti prioritari per il tuo prodotto (max 10 inizialmente)
- Acquisisci almeno 200 campioni per classe con la camera finale in condizioni di linea reali
- Scegli lo strumento di annotazione (CVAT self-hosted per privacy, Roboflow per velocità)
- Inizia con YOLO11m su dataset limitato per validare l'approccio (2-3 giorni di lavoro)
- Definisci le soglie di accettabilita (recall/precision) con il responsabile qualità
- Pianifica il periodo di shadow mode prima del go-live
- Documenta tutto per i requisiti IFS/BRC/HACCP sin dall'inizio
Prosegui nella Serie FoodTech
Questo articolo fa parte della serie FoodTech su federicocalo.dev. Il prossimo articolo approfondisce la compliance normativa digitale: FSMA e Compliance Digitale: Automazione dei Processi Normativi, dove vedremo come automatizzare i flussi documentali HACCP, gestire i piani di controllo con strumenti digitali e prepararsi agli audit FDA/IFS/BRC con sistemi di traceability integrati.
Articoli correlati da altre serie:
- Serie MLOps: Come mettere in produzione e monitorare i modelli YOLO con MLflow e Evidently
- Serie Computer Vision: CNN avanzate, segmentazione semantica e edge deployment con TensorRT
- Serie AI Engineering: Pipeline di inferenza scalabili con Triton Inference Server
- Serie Deep Learning Avanzato: Tecniche di transfer learning e domain adaptation per dataset piccoli







