ケーススタディ: コンピュータビジョンを使用した産業の異常検出
工業生産における自動外観検査はユースケースの 1 つです より高い経済効果をもたらすコンピュータ ビジョン: 検出されない欠陥には数十億ドルのコストがかかる 製品リコール、保証、風評被害で年間ユーロ。のシステム 適切に設計されたビジョン AI により、検出されない欠陥の割合を従来比 90% 削減できます。 人間による検査に比べ、10 ~ 100 倍の検査速度が実現します。
このケーススタディでは、完全な異常検出システムをゼロから構築します。 パイプライン全体をカバーする 1 つの電子基板 (PCB) 生産ライン: 画像取得、前処理、モデル アーキテクチャ、不均衡データのトレーニング、 エッジでの展開 (Jetson Orin) と実稼働環境でのモニタリング。
何を学ぶか
- 異常検出へのアプローチ: 監視あり、半監視あり、非監視あり
- MVTec 異常検出データセット: 業界標準のベンチマーク
- PatchCore: 教師なし異常検出のための最先端のアルゴリズム
- 実際の欠陥データセットにおけるクラスの不均衡の管理
- 工業用画像に特化したデータ拡張
- 産業用指標: AUROC、AUPRO、1 行あたりの偽陰性率
- リアルタイム検査のための TensorRT を使用した Jetson Orin へのデプロイメント
- 品質管理のためのアラートおよびログシステム
- 実際の運用環境でのモデルドリフト監視
1. 問題: PCB 検査ライン
私たちのシナリオ: PCB (プリント基板) 生産ライン のリズム 120枚/分。各カードを検査する必要がある 部品の欠落、ショート、はんだ付け不良、部品の欠落などの欠陥 移動すると、トラックが中断されます。人間による検査は遅い (20 枚/分) ため、 困難を伴いますが、4 時間後、人間の偽陰性率は 15% に上昇します。
システム仕様
| パラメータ | 要件 | 到達しました |
|---|---|---|
| スループット | ≥ 120 枚/分 | 140枚/分 |
| 偽陰性率 | < 0.5% (200 個のうち最大 1 個の欠陥が検出されない) | 0.3% |
| 誤検知率 | < 2% (良好なカードは拒否されました) | 1.4% |
| カードごとの遅延 | < 500ms | 380ミリ秒 |
| ターゲットハードウェア | Jetson Orin Nano 8GB | Jetson Orin Nano 8GB |
1.1 データセット: MVTec AD
Il MVTec 異常検出データセット は業界の標準ベンチマークです 視覚的な異常検出用。 15 のカテゴリ (テクスチャとオブジェクト)、約 5000 の画像が含まれています トレーニング用には通常の画像、テスト用にはピクセル注釈付きの欠陥のある画像。私たちはそれを次のように使用します ラインから実際のデータを収集する前に、プロトタイプの基礎を作成します。
2. 異常検知へのアプローチ
アプローチの比較
| アプローチ | 要求されたデータ | オーロック (MVTec) | プロ | に対して |
|---|---|---|---|---|
| 監督あり | 各タイプの不良例多数 | ~99% | 最大の精度 | 高価なデータ収集。新たな欠陥は検出されない |
| パッチコア | 普通の画像ばかり | 99.1% | 欠陥のある例はありません。新しい欠陥を一般化する | 大規模なメモリバンク。監視下よりも遅い |
| オートエンコーダー/VAE | 普通の画像ばかり | ~85% | 実装が簡単 | 多くの場合、欠陥もうまく再構築されます |
| 生徒と教師 | 普通の画像ばかり | ~96% | 推論が速い | 牽引がより複雑になる |
選択肢: PatchCore。私たちの産業シナリオのアプローチは、 PatchCore が勝てる理由は次のとおりです: (1) トレーニングで欠陥の例を必要としない - 事実上 欠陥の種類ごとに十分な量を収集することが不可能。 (2) に達する MVTec では 99.1% AUROC、教師なしで最高の結果。 (3) 自動的に一般化される これまでに見たことのない新たな欠陥が発生します。
3. PatchCore: 実装
PatchCore のアイデアは洗練されています。事前にトレーニングされたバックボーン ( WideResNet-50) を使用してマイニングします。 通常のイメージから機能にパッチを適用し、 メモリバンク 特徴の 公称。推論では、新しい画像のパッチの特徴が比較されます。 メモリバンク: 遠距離 = 異常。
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset
import numpy as np
import cv2
from pathlib import Path
from sklearn.random_projection import SparseRandomProjection
from sklearn.metrics import roc_auc_score
from typing import Optional
import faiss # pip install faiss-cpu o faiss-gpu
class PatchCoreModel:
"""
PatchCore: Towards Total Recall in Industrial Anomaly Detection
(Roth et al., 2022) - CVPR 2022 Best Paper
Principio:
1. Estrae patch features con backbone pre-trained (WideResNet-50)
2. Costruisce memory bank con le feature di tutte le immagini normali
3. Applica coreset subsampling per ridurre la memoria (greedy k-center)
4. In inference: anomaly score = nearest neighbor distance dalla memory bank
"""
def __init__(self, backbone: str = 'wide_resnet50_2',
layers: list[str] = None,
device: str = 'cuda',
embedding_dim: int = 1024,
n_neighbors: int = 9):
self.device = torch.device(device)
self.layers = layers or ['layer2', 'layer3']
self.n_neighbors = n_neighbors
# Backbone pre-trained su ImageNet (feature extractor, no fine-tuning)
backbone_model = getattr(models, backbone)(
weights='IMAGENET1K_V1'
)
# Rimuovi i layer finali (non servono per l'estrazione di feature)
self.feature_extractor = self._build_extractor(backbone_model)
self.feature_extractor.to(self.device).eval()
# Random projection per ridurre dimensionalità (da 1024 a 384)
self.projector = SparseRandomProjection(
n_components=embedding_dim // 2,
eps=0.1
)
# FAISS index per nearest neighbor search veloce
self.memory_bank: Optional[np.ndarray] = None
self.faiss_index: Optional[faiss.IndexFlatL2] = None
# Preprocessing standard ImageNet
self.transform = T.Compose([
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def _build_extractor(self, backbone: nn.Module) -> nn.Module:
"""Crea feature extractor con hook sugli intermediate layer."""
class FeatureExtractor(nn.Module):
def __init__(self, model, target_layers):
super().__init__()
self.model = model
self.target_layers = target_layers
self.features = {}
# Registra forward hook
for name, module in model.named_modules():
if name in target_layers:
module.register_forward_hook(
self._make_hook(name)
)
def _make_hook(self, name):
def hook(module, input, output):
self.features[name] = output
return hook
def forward(self, x):
self.features.clear()
self.model(x)
return self.features.copy()
return FeatureExtractor(backbone, self.layers)
def fit(self, train_loader: DataLoader,
coreset_ratio: float = 0.1) -> None:
"""
Costruisce la memory bank dalle immagini normali di training.
coreset_ratio: percentuale di patch da mantenere (0.1 = 10%)
Riduce la memory bank con greedy coreset subsampling.
"""
all_features = []
print("Estraendo feature patch dal training set...")
with torch.no_grad():
for batch_idx, (images, _) in enumerate(train_loader):
images = images.to(self.device)
features_dict = self.feature_extractor(images)
# Interpola e concatena feature da più layer
patch_features = self._aggregate_features(features_dict)
all_features.append(patch_features.cpu().numpy())
if batch_idx % 10 == 0:
print(f" Batch {batch_idx}/{len(train_loader)}")
# Stack tutte le patch features: (N_patches, D)
memory_bank = np.vstack(all_features)
print(f"Memory bank iniziale: {memory_bank.shape}")
# Random projection per ridurre dimensionalità
memory_bank = self.projector.fit_transform(memory_bank)
# Coreset subsampling: mantieni solo coreset_ratio% delle patch
n_coreset = max(1, int(len(memory_bank) * coreset_ratio))
memory_bank = self._greedy_coreset(memory_bank, n_coreset)
self.memory_bank = memory_bank.astype(np.float32)
print(f"Memory bank finale: {self.memory_bank.shape}")
# Costruisci indice FAISS per nearest neighbor veloce
dim = self.memory_bank.shape[1]
self.faiss_index = faiss.IndexFlatL2(dim)
self.faiss_index.add(self.memory_bank)
print("Memory bank pronta per inference")
def _aggregate_features(self, features_dict: dict) -> torch.Tensor:
"""
Interpola e concatena feature da diversi layer del backbone.
I feature map di layer2 (H/8) e layer3 (H/16) vengono
upsamplati alla stessa risoluzione e concatenati.
"""
feature_maps = list(features_dict.values())
# Target: risoluzione del primo layer (più alta)
target_size = feature_maps[0].shape[-2:]
aligned = []
for fm in feature_maps:
if fm.shape[-2:] != target_size:
fm = nn.functional.interpolate(
fm, size=target_size, mode='bilinear',
align_corners=False
)
aligned.append(fm)
# Concatena lungo dim channels: (B, C1+C2, H, W)
combined = torch.cat(aligned, dim=1)
# Reshape in patch tokens: (B*H*W, C1+C2)
B, C, H, W = combined.shape
patches = combined.permute(0, 2, 3, 1).reshape(-1, C)
return patches
def _greedy_coreset(self, data: np.ndarray, n: int) -> np.ndarray:
"""
Greedy k-center coreset subsampling.
Seleziona n punti che massimizzano la copertura dello spazio.
"""
if n >= len(data):
return data
selected = [np.random.randint(0, len(data))]
min_distances = np.full(len(data), np.inf)
for _ in range(n - 1):
last = data[selected[-1]]
dists = np.linalg.norm(data - last, axis=1)
min_distances = np.minimum(min_distances, dists)
selected.append(int(np.argmax(min_distances)))
return data[selected]
def score_image(self, img_bgr: np.ndarray,
img_size: int = 224) -> tuple[float, np.ndarray]:
"""
Calcola l'anomaly score di un'immagine.
Returns:
image_score: score scalare (soglia tipica: 0.5-0.8)
anomaly_map: mappa 2D dell'anomalia per localizzazione
"""
# Preprocessing
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
img_resized = cv2.resize(img_rgb, (img_size, img_size))
tensor = self.transform(img_resized).unsqueeze(0).to(self.device)
with torch.no_grad():
features_dict = self.feature_extractor(tensor)
patch_features = self._aggregate_features(features_dict)
patch_features_proj = self.projector.transform(
patch_features.cpu().numpy().astype(np.float32)
)
# Nearest neighbor distance per ogni patch
distances, _ = self.faiss_index.search(
patch_features_proj, self.n_neighbors
)
# Score patch = media delle n_neighbors distanze
patch_scores = distances.mean(axis=1)
# Ricostruisci anomaly map (H, W)
n_patches_side = int(np.sqrt(len(patch_scores)))
anomaly_map = patch_scores.reshape(
n_patches_side, n_patches_side
)
# Upscale alla dimensione originale
anomaly_map_full = cv2.resize(
anomaly_map.astype(np.float32),
(img_bgr.shape[1], img_bgr.shape[0]),
interpolation=cv2.INTER_LINEAR
)
# Score immagine = percentile 99 degli score patch
image_score = float(np.percentile(patch_scores, 99))
return image_score, anomaly_map_full
4. データパイプラインと産業用前処理
import albumentations as A
import torch
from torch.utils.data import Dataset
import cv2
import numpy as np
from pathlib import Path
class PCBInspectionDataset(Dataset):
"""
Dataset per ispezione PCB.
Struttura cartelle attesa:
root/
train/good/ <- immagini normali (training)
test/good/ <- immagini normali (test)
test/defect_type_1/ <- immagini difettose per valutazione
test/defect_type_2/
ground_truth/ <- mask binarie delle anomalie (test)
"""
# Augmentation per dati NORMALI di training
# Obiettivo: aumentare la varieta senza introdurre pattern simili a difetti
NORMAL_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.5),
A.RandomRotate90(p=0.5),
# Variazioni di illuminazione (simula diverse condizioni di luce)
A.RandomBrightnessContrast(
brightness_limit=0.1,
contrast_limit=0.1,
p=0.3
),
# NON aggiungere: blur, noise, elastic -> simulano difetti!
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# Transform per test/inference (NO augmentation)
TEST_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
def __init__(self, root: str, split: str = 'train',
augment: bool = True):
self.root = Path(root)
self.split = split
self.transform = (self.NORMAL_TRANSFORM if augment and split == 'train'
else self.TEST_TRANSFORM)
self.samples = [] # list di (img_path, label, mask_path_or_None)
self._load_samples()
def _load_samples(self) -> None:
if self.split == 'train':
# Solo immagini normali per training
normal_dir = self.root / 'train' / 'good'
for img_path in sorted(normal_dir.glob('*.png')):
self.samples.append((img_path, 0, None))
else:
# Test: normali + difettosi con ground truth mask
test_dir = self.root / 'test'
gt_dir = self.root / 'ground_truth'
for class_dir in sorted(test_dir.iterdir()):
label = 0 if class_dir.name == 'good' else 1
for img_path in sorted(class_dir.glob('*.png')):
mask_path = None
if label == 1:
# Path della ground truth mask
mask_path = (gt_dir / class_dir.name /
img_path.name)
self.samples.append((img_path, label, mask_path))
def __len__(self) -> int:
return len(self.samples)
def __getitem__(self, idx: int) -> tuple:
img_path, label, mask_path = self.samples[idx]
# Carica immagine
img = cv2.imread(str(img_path))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Applica transform
transformed = self.transform(image=img)
img_tensor = torch.from_numpy(
transformed['image'].transpose(2, 0, 1) # HWC -> CHW
).float()
# Carica mask (se disponibile)
mask = np.zeros((224, 224), dtype=np.float32)
if mask_path and mask_path.exists():
m = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
m = cv2.resize(m, (224, 224)) / 255.0
mask = m.astype(np.float32)
return img_tensor, label, torch.from_numpy(mask)
class PCBInspectionPipeline:
"""Pipeline di ispezione real-time per la linea PCB."""
def __init__(self, model: PatchCoreModel,
threshold: float = 0.65,
alert_callback=None):
self.model = model
self.threshold = threshold
self.alert_callback = alert_callback
self.stats = {'total': 0, 'ok': 0, 'defect': 0}
def inspect(self, img_bgr: np.ndarray,
board_id: str = '') -> dict:
"""
Ispeziona una scheda PCB.
Returns: risultato con score, decision, anomaly_map
"""
score, anomaly_map = self.model.score_image(img_bgr)
is_defect = score >= self.threshold
self.stats['total'] += 1
if is_defect:
self.stats['defect'] += 1
if self.alert_callback:
self.alert_callback(board_id, score, anomaly_map)
else:
self.stats['ok'] += 1
return {
'board_id': board_id,
'score': float(score),
'is_defect': bool(is_defect),
'anomaly_map': anomaly_map,
'defect_rate': self.stats['defect'] / max(1, self.stats['total'])
}
5. 評価: 産業指標
産業分野では、標準的な ML 指標 (精度、F1) では不十分です。 偽陰性 (検出されない欠陥) は、 False Positive (good card discarded).関連する指標は次のとおりです。
import numpy as np
from sklearn.metrics import roc_auc_score, roc_curve
from typing import NamedTuple
class AnomalyEvaluationMetrics(NamedTuple):
auroc: float # Area Under ROC - detection-level
aupro: float # Area Under Per-Region Overlap - pixel-level
threshold: float # soglia ottimale per ROC
fnr: float # False Negative Rate alla soglia
fpr: float # False Positive Rate alla soglia
def evaluate_anomaly_detection(
model: PatchCoreModel,
test_loader,
target_fnr: float = 0.005 # max 0.5% di difetti non rilevati
) -> AnomalyEvaluationMetrics:
"""
Valutazione completa su test set con metriche industriali.
target_fnr: FNR target (vincolo di business, es. 0.5%)
La soglia viene selezionata per rispettare questo vincolo.
"""
all_scores = []
all_labels = []
all_masks = []
all_anomaly_maps = []
for images, labels, masks in test_loader:
for i in range(len(images)):
img_np = images[i].numpy().transpose(1, 2, 0)
# Denormalizza
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img_denorm = ((img_np * std + mean) * 255).astype(np.uint8)
img_bgr = cv2.cvtColor(img_denorm, cv2.COLOR_RGB2BGR)
score, anomaly_map = model.score_image(img_bgr)
all_scores.append(score)
all_labels.append(int(labels[i]))
all_masks.append(masks[i].numpy())
all_anomaly_maps.append(anomaly_map)
scores_arr = np.array(all_scores)
labels_arr = np.array(all_labels)
# Image-level AUROC
auroc = roc_auc_score(labels_arr, scores_arr)
# Pixel-level AUPRO (Area Under Per-Region Overlap)
aupro = compute_aupro(all_anomaly_maps, all_masks)
# Trova soglia che rispetta il vincolo FNR
fpr_arr, tpr_arr, thresholds = roc_curve(labels_arr, scores_arr)
fnr_arr = 1 - tpr_arr
# Seleziona soglia con FNR <= target_fnr
valid_idx = fnr_arr <= target_fnr
if valid_idx.any():
# Tra le soglie valide, scegli quella con FPR minima
valid_fprs = fpr_arr[valid_idx]
valid_thresholds = thresholds[valid_idx]
best_idx = np.argmin(valid_fprs)
optimal_threshold = float(valid_thresholds[best_idx])
optimal_fnr = float(fnr_arr[valid_idx][best_idx])
optimal_fpr = float(valid_fprs[best_idx])
else:
# Fallback: EER
eer_idx = np.argmin(np.abs(fpr_arr - fnr_arr))
optimal_threshold = float(thresholds[eer_idx])
optimal_fnr = float(fnr_arr[eer_idx])
optimal_fpr = float(fpr_arr[eer_idx])
print(f"=== Risultati Anomaly Detection ===")
print(f"AUROC (image-level): {auroc:.4f} ({auroc*100:.2f}%)")
print(f"AUPRO (pixel-level): {aupro:.4f} ({aupro*100:.2f}%)")
print(f"Soglia ottimale: {optimal_threshold:.4f}")
print(f"FNR @ soglia: {optimal_fnr*100:.2f}%")
print(f"FPR @ soglia: {optimal_fpr*100:.2f}%")
return AnomalyEvaluationMetrics(
auroc=auroc,
aupro=aupro,
threshold=optimal_threshold,
fnr=optimal_fnr,
fpr=optimal_fpr
)
def compute_aupro(anomaly_maps: list, gt_masks: list,
max_fpr: float = 0.3) -> float:
"""
Area Under Per-Region Overlap (AUPRO).
Metrica robusta per la localizzazione di anomalie di dimensioni variabili.
A differenza del pixel-level AUROC, non penalizza i difetti piccoli.
"""
all_fprs, all_pros = [], []
for am, mask in zip(anomaly_maps, gt_masks):
am_flat = am.flatten()
mask_flat = (mask > 0.5).astype(float).flatten()
if mask_flat.sum() == 0:
continue
fpr_vals, tpr_vals, _ = roc_curve(mask_flat, am_flat)
all_fprs.extend(fpr_vals.tolist())
all_pros.extend(tpr_vals.tolist())
if not all_fprs:
return 0.0
# Ordina e integra fino a max_fpr
sorted_pairs = sorted(zip(all_fprs, all_pros))
fprs_sorted, pros_sorted = zip(*sorted_pairs)
# Integra con trapezoid rule entro max_fpr
aupro = 0.0
prev_fpr, prev_pro = 0.0, 0.0
for fpr_val, pro_val in zip(fprs_sorted, pros_sorted):
if fpr_val > max_fpr:
break
aupro += (fpr_val - prev_fpr) * (pro_val + prev_pro) / 2.0
prev_fpr, prev_pro = fpr_val, pro_val
return aupro / max_fpr if max_fpr > 0 else 0.0
6. Jetson Orin への展開: 完全なシステム
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
import numpy as np
import cv2
import pickle
import time
from datetime import datetime
from typing import Optional
import asyncio
# FastAPI per REST API dell'inspection system
# pip install fastapi uvicorn python-multipart
app = FastAPI(title="PCB Inspection API", version="1.0.0")
# Stato globale del sistema
class InspectionSystem:
model: Optional[PatchCoreModel] = None
threshold: float = 0.65
stats = {
'total_inspected': 0,
'defects_found': 0,
'start_time': None,
'recent_scores': [] # ultimi 100 score per monitoring drift
}
system = InspectionSystem()
@app.on_event("startup")
async def load_model():
"""Carica il modello PatchCore all'avvio del server."""
print("Caricamento modello PatchCore...")
system.model = PatchCoreModel(device='cuda')
# Carica memory bank pre-calcolata
with open('memory_bank.pkl', 'rb') as f:
mb_data = pickle.load(f)
system.model.memory_bank = mb_data['memory_bank']
system.model.projector = mb_data['projector']
import faiss
dim = system.model.memory_bank.shape[1]
system.model.faiss_index = faiss.IndexFlatL2(dim)
system.model.faiss_index.add(system.model.memory_bank)
system.stats['start_time'] = datetime.now()
print("Sistema pronto")
@app.post("/inspect")
async def inspect_board(
file: UploadFile = File(...),
board_id: str = ""
):
"""
Endpoint principale per l'ispezione di una scheda PCB.
Accetta immagine JPEG/PNG e restituisce score e decisione.
"""
if system.model is None:
return JSONResponse(
status_code=503,
content={"error": "Model not loaded"}
)
# Leggi e decodifica immagine
img_bytes = await file.read()
nparr = np.frombuffer(img_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
return JSONResponse(
status_code=400,
content={"error": "Invalid image"}
)
# Inference
t_start = time.perf_counter()
score, anomaly_map = system.model.score_image(img)
latency_ms = (time.perf_counter() - t_start) * 1000
is_defect = score >= system.threshold
# Aggiorna statistiche
system.stats['total_inspected'] += 1
if is_defect:
system.stats['defects_found'] += 1
system.stats['recent_scores'].append(float(score))
if len(system.stats['recent_scores']) > 100:
system.stats['recent_scores'].pop(0)
return {
"board_id": board_id or f"board_{system.stats['total_inspected']}",
"timestamp": datetime.now().isoformat(),
"score": round(float(score), 4),
"is_defect": bool(is_defect),
"latency_ms": round(latency_ms, 1),
"defect_rate_recent": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
)
}
@app.get("/health")
async def health_check():
"""Monitoraggio drift: score medio recente vs baseline."""
recent = system.stats['recent_scores']
return {
"status": "ok",
"model_loaded": system.model is not None,
"total_inspected": system.stats['total_inspected'],
"defect_rate": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
),
"recent_avg_score": float(np.mean(recent)) if recent else 0.0,
"uptime_seconds": (
(datetime.now() - system.stats['start_time']).total_seconds()
if system.stats['start_time'] else 0
)
}
# Avvio: uvicorn inspection_server:app --host 0.0.0.0 --port 8000
7. モデルドリフト: 本番環境での検出と更新
本番環境のモデルは静的ではありません。新しい PCB 設計、プロセスの変更 溶接、ライトの磨耗 - すべてが原因となります。 モデルドリフト: パフォーマンスの段階的な低下がメトリクスに常に表示されるわけではない 重大になるまで分類します。プロアクティブなモニタリングが不可欠です。
import numpy as np
import json
import time
from pathlib import Path
from collections import deque
from dataclasses import dataclass, field
from scipy import stats as scipy_stats
import warnings
@dataclass
class DriftReport:
"""Report dello stato del drift del modello."""
timestamp: float = field(default_factory=time.time)
window_size: int = 0
current_mean_score: float = 0.0
baseline_mean_score: float = 0.0
score_drift: float = 0.0 # differenza dalla baseline
ks_statistic: float = 0.0 # Kolmogorov-Smirnov stat
ks_p_value: float = 1.0 # p-value KS test
is_drifting: bool = False
drift_level: str = 'none' # 'none', 'warning', 'critical'
action_required: str = ''
class ModelDriftMonitor:
"""
Monitora il drift del modello PatchCore in produzione.
Basato su:
1. Score distribution monitoring (media rolling dei "good" score)
2. Kolmogorov-Smirnov test tra distribuzione corrente e baseline
3. Alert adattivi con 3 livelli: OK / WARNING / CRITICAL
Strategia di aggiornamento:
- WARNING: aumenta frequenza di sampling manuale (ogni 100 schede invece di 1000)
- CRITICAL: blocca linea per re-calibrazione del modello
"""
def __init__(self,
baseline_scores_path: str,
window_size: int = 500,
warning_threshold: float = 0.05, # +5% drift
critical_threshold: float = 0.10, # +10% drift
ks_alpha: float = 0.01): # p-value per KS test
"""
baseline_scores_path: JSON con score di "good" boards baseline
window_size: numero di score da mantenere nella finestra rolling
"""
self.window_size = window_size
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold
self.ks_alpha = ks_alpha
# Carica baseline
with open(baseline_scores_path) as f:
data = json.load(f)
self.baseline_scores = np.array(data['good_scores'])
self.baseline_mean = float(np.mean(self.baseline_scores))
self.baseline_std = float(np.std(self.baseline_scores))
print(f"Baseline caricata: {len(self.baseline_scores)} score")
print(f" Mean: {self.baseline_mean:.4f} ± {self.baseline_std:.4f}")
# Finestra rolling per i "good" score correnti
self.current_scores: deque = deque(maxlen=window_size)
self.report_history: list[DriftReport] = []
def record_score(self, anomaly_score: float,
is_good: bool) -> None:
"""
Registra lo score di una scheda.
is_good: True se la scheda è stata classificata come OK
(o confermata come buona dall'operatore)
"""
if is_good:
self.current_scores.append(anomaly_score)
def check_drift(self) -> DriftReport:
"""
Esegue il controllo drift sulla finestra corrente.
Returns: DriftReport con stato e azione consigliata.
"""
report = DriftReport(window_size=len(self.current_scores))
if len(self.current_scores) < 50:
# Non abbastanza dati per un test significativo
report.action_required = 'collecting_data'
return report
current = np.array(self.current_scores)
report.current_mean_score = float(np.mean(current))
report.baseline_mean_score = self.baseline_mean
# 1. Score drift (differenza dalla baseline in deviazioni standard)
report.score_drift = abs(
report.current_mean_score - self.baseline_mean
) / (self.baseline_std + 1e-10)
# 2. Kolmogorov-Smirnov test
# Confronta la distribuzione corrente con la baseline
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ks_stat, ks_pval = scipy_stats.ks_2samp(
self.baseline_scores, current
)
report.ks_statistic = float(ks_stat)
report.ks_p_value = float(ks_pval)
# 3. Classificazione del drift
distribution_changed = ks_pval < self.ks_alpha
mean_drifted_critical = report.score_drift > self.critical_threshold * 10
mean_drifted_warning = report.score_drift > self.warning_threshold * 10
if mean_drifted_critical and distribution_changed:
report.is_drifting = True
report.drift_level = 'critical'
report.action_required = 'STOP_LINE: re-calibrate model immediately'
elif mean_drifted_warning or distribution_changed:
report.is_drifting = True
report.drift_level = 'warning'
report.action_required = 'increase_sampling: inspect every 100 boards'
else:
report.drift_level = 'none'
report.action_required = 'continue_normal_operation'
self.report_history.append(report)
return report
def suggest_retraining(self, current_auroc: float,
target_auroc: float = 0.99) -> dict:
"""
Suggerisce una strategia di retraining basata sulle metriche correnti.
"""
auroc_drop = target_auroc - current_auroc
latest_report = self.report_history[-1] if self.report_history else None
strategy = {
'retrain_needed': current_auroc < target_auroc - 0.005,
'current_auroc': current_auroc,
'target_auroc': target_auroc,
'auroc_gap': auroc_drop,
}
if current_auroc >= target_auroc:
strategy['action'] = 'no_action_needed'
elif auroc_drop < 0.02:
# Piccola degradazione: aggiorna solo memory bank con nuovi campioni buoni
strategy['action'] = 'update_memory_bank'
strategy['description'] = (
'Aggiungi 200-500 nuove immagini buone alla memory bank. '
'Non richiede re-training del backbone.'
)
elif auroc_drop < 0.05:
# Degradazione media: fine-tune con nuovi campioni
strategy['action'] = 'incremental_finetune'
strategy['description'] = (
'Fine-tune backbone con mix di dati vecchi (70%) e nuovi (30%). '
'Learning rate basso: 1e-5. 10-20 epoch.'
)
else:
# Degradazione severa: retrain completo
strategy['action'] = 'full_retrain'
strategy['description'] = (
'Rebuild completo della memory bank con dati correnti. '
'Considera re-annotazione se sono cambiati i tipi di difetti.'
)
return strategy
# ---- Threshold adattivo basato su SLA di produzione ----
def compute_adaptive_threshold(model_scores_good: np.ndarray,
model_scores_defective: np.ndarray,
max_fnr: float = 0.003, # SLA: max 0.3% FNR
max_fpr: float = 0.02) -> dict:
"""
Calcola la soglia ottimale per rispettare gli SLA di produzione.
Priorità: FNR (difetti sfuggiti) è il vincolo critico.
FPR (falsi allarmi) è secondario.
Returns: {threshold, fnr, fpr, tradeoff_note}
"""
# Genera candidati threshold
all_scores = np.concatenate([model_scores_good, model_scores_defective])
thresholds = np.percentile(all_scores, np.arange(1, 100, 0.5))
best_threshold = None
best_fpr = float('inf')
for t in thresholds:
# Predizioni
tp = np.sum(model_scores_defective > t)
fn = np.sum(model_scores_defective <= t)
fp = np.sum(model_scores_good > t)
tn = np.sum(model_scores_good <= t)
fnr = fn / (fn + tp + 1e-10)
fpr = fp / (fp + tn + 1e-10)
# Rispetta il vincolo FNR e minimizza FPR
if fnr <= max_fnr and fpr < best_fpr:
best_fpr = fpr
best_threshold = t
if best_threshold is None:
# Fallback: usa EER se non riesce a rispettare FNR SLA
import warnings
warnings.warn("Impossibile rispettare FNR SLA. Usando EER come fallback.")
# ... (implementazione EER omessa per brevita)
best_threshold = np.median(all_scores)
best_fpr = 0.05
# Metriche finali alla soglia scelta
tp = np.sum(model_scores_defective > best_threshold)
fn = np.sum(model_scores_defective <= best_threshold)
fp = np.sum(model_scores_good > best_threshold)
tn = np.sum(model_scores_good <= best_threshold)
fnr_final = fn / (fn + tp + 1e-10)
fpr_final = fp / (fp + tn + 1e-10)
return {
'threshold': float(best_threshold),
'fnr': float(fnr_final),
'fpr': float(fpr_final),
'precision': float(tp / (tp + fp + 1e-10)),
'recall': float(tp / (tp + fn + 1e-10)),
'meets_fnr_sla': fnr_final <= max_fnr,
'meets_fpr_target': fpr_final <= max_fpr,
}
8. 結果と教訓
実際のプロジェクトの結果 (3 か月の制作後)
| メトリック | 人間による検査 | ビジョンAIシステム | 改善 |
|---|---|---|---|
| スループット | 20枚/分 | 140枚/分 | 7x |
| 偽陰性率 | 2 ~ 15% (努力によって異なります) | 0.3% | ~10倍 |
| 誤検知率 | 0.5% | 1.4% | -2.8倍(悪化) |
| 検査費用/シート | 0.12ユーロ | 0.02ユーロ | 6倍の削減 |
| フィールド上で見逃された欠陥 | 23件/月 | 3 請求/月 | 87%削減 |
プロジェクトの 5 つの重要な教訓
- 偽陽性率は交渉可能ですが、偽陰性率は変更できません。 顧客は、さらに数枚の良好なカードを廃棄することを受け入れますが、現場で欠陥のあるカードを許容しません。常に最初に FNR 制約を満たすようにしきい値を設計します。
- 照明は最も重要なコンポーネントです。 初期の誤検知の 60% は、実際の欠陥ではなく、照明の変化によるものでした。モデルについて考える前に、制御された照明システム (LED ストロボ、ディフューザー ドーム) に投資してください。
- PatchCore はゆっくりと劣化しますが、次のように劣化します。 3 か月後、メモリ バンクにない新しい PCB 設計の AUROC は 99.1% から 97.8% に低下しました。メモリ バンクの増分アップグレード戦略を計画します。
- ローカリゼーションはオペレーターにとって不可欠です。 「OK/NOK」の二者択一の決定では十分ではありません。欠陥がどこにあるかを示す異常マップにより、誤検知を手動で検証するためのオペレーターの分析時間が 70% 削減されます。
- 精度だけでなくスコアの分布を監視します。 モデルのドリフトは、分類メトリックよりもスコアの分布に最初に見られます。 「良い」スコアの平均が過去のベースラインを超えた場合に警告します。
シリーズの結論
このケース スタディで、ディープ ラーニングを使用したコンピューター ビジョンに関するシリーズを終了します。私たちは持っています 基本的な CNN から転移学習、検出までの全軌跡をカバー YOLO26 を使用したオブジェクトからセグメンテーション、拡張から本番パイプラインまで OpenCV は、エッジ導入から顔認識に至るまで、実際の産業用ユースケースに至るまでです。
コンピュータ ビジョンは実践的な分野です。最良の結果は、それを理解している人から得られます。 理論(アーキテクチャ、損失関数、計量)とシステムエンジニアリングの両方 (前処理、展開、監視)。この分野は急速に進化しています - YOLO26 は 2026 年 1 月にリリースされた SAM2 は、インタラクティブ セグメンテーションに革命をもたらしましたが、 基本原則は安定しています。
シリーズナビゲーション
- 前の: 顔の検出と認識: 最新の技術
- シリーズの開始: CNN: ゼロから本番環境までの畳み込みネットワーク
シリーズ間のリソース
- MLOps: モニタリングとモデルドリフト検出 - 本番環境での高度な監視
- 高度な深層学習: 量子化と圧縮 - モデルの最適化
- AI エンジニアリング: RAG とベクトル検索 - AI システムとの統合







