사례 연구: 컴퓨터 비전을 통한 산업 이상 탐지
산업 제조 분야의 자동 육안 검사는 사용 사례 중 하나입니다. 더 높은 경제적 영향을 미치는 컴퓨터 비전: 감지되지 않은 결함으로 인해 수십억 달러의 비용이 발생함 제품 리콜, 보증 및 평판 훼손에 따른 연간 유로입니다. 시스템 잘 설계된 비전 AI는 감지되지 않은 결함 비율을 기존에 비해 90% 줄일 수 있습니다. 10~100배 더 빠른 검사 속도로 사람이 검사할 수 있습니다.
이 사례 연구에서는 처음부터 완전한 이상 탐지 시스템을 구축할 것입니다. 전체 파이프라인을 포괄하는 하나의 전자 기판(PCB) 생산 라인: 이미지 획득, 전처리, 모델 아키텍처, 불균형 데이터 교육, 엣지(Jetson Orin) 배포 및 프로덕션 모니터링.
무엇을 배울 것인가
- 이상 탐지에 대한 접근 방식: 지도, 준지도, 비지도
- MVTec 이상 탐지 데이터세트: 업계 표준 벤치마크
- PatchCore: 비지도 이상 탐지를 위한 최첨단 알고리즘
- 실제 결함 데이터세트의 클래스 불균형 관리
- 산업용 이미지에 특화된 데이터 증대
- 산업 지표: AUROC, AUPRO, 라인당 거짓음성률
- 실시간 검사를 위해 TensorRT를 사용하여 Jetson Orin에 배포
- 품질 관리를 위한 경고 및 기록 시스템
- 실제 생산 환경에서의 모델 드리프트 모니터링
1. 문제: PCB 검사 라인
우리의 시나리오: PCB(인쇄 회로 기판) 생산 라인 종지 120 카드/분. 각 카드를 검사해야 합니다. 다음과 같은 결함: 구성 요소 누락, 단락, 납땜 결함, 구성 요소 이동, 트랙 중단. 사람이 직접 검사하는 속도가 느리고(분당 20장) 주제가 있습니다. 어려움 - 4시간 후에 인간의 위음성 비율은 15%로 증가합니다.
시스템 사양
| 매개변수 | 요구 사항 | 도달했다 |
|---|---|---|
| 처리량 | ≥ 120개 카드/분 | 140 카드/분 |
| 거짓음성률 | < 0.5%(200개 중 최대 1개의 결함이 감지되지 않음) | 0.3% |
| 거짓양성률 | < 2%(좋은 카드는 거부됨) | 1.4% |
| 카드당 지연 시간 | < 500ms | 380ms |
| 대상 하드웨어 | 젯슨 오린 나노 8GB | 젯슨 오린 나노 8GB |
1.1 데이터세트: MVTec AD
Il MVTec 이상 탐지 데이터 세트 표준 업계 벤치마크입니다. 시각적 이상 감지를 위해 15개 카테고리(텍스처 및 개체), 최대 5000개의 이미지 포함 훈련용으로는 정상이고 테스트용으로 픽셀 주석이 달린 결함이 있는 이미지입니다. 우리는 그것을 다음과 같이 사용합니다. 라인에서 실제 데이터를 수집하기 전에 프로토타입의 기반을 마련합니다.
2. 이상 탐지에 대한 접근 방식
접근법 비교
| 접근하다 | 요청된 데이터 | AUROC (MVTec) | 찬성 | 에 맞서 |
|---|---|---|---|---|
| 감독됨 | 유형별 불량 사례가 많음 | ~99% | 최대 정확도 | 값비싼 데이터 수집; 새로운 결함이 발견되지 않았습니다 |
| 패치코어 | 그냥 평범한 이미지 | 99.1% | 결함이 있는 사례가 없습니다. 새로운 결함으로 일반화 | 대용량 메모리 뱅크; 감독되는 것보다 느림 |
| 오토인코더/VAE | 그냥 평범한 이미지 | ~85% | 구현이 간단함 | 종종 결함을 잘 재구성하기도 합니다. |
| 학생-교사 | 그냥 평범한 이미지 | ~96% | 추론 속도가 빠름 | 견인이 더 복잡함 |
선택: PatchCore. 산업 시나리오의 경우 접근 방식은 다음과 같습니다. PatchCore가 승리하는 이유는 다음과 같습니다. (1) 훈련 시 결함의 예가 필요하지 않습니다. - 실질적으로 각 유형의 결함에 대해 충분한 양을 수집하는 것이 불가능합니다. (2) 도달하다 MVTec의 99.1% AUROC, 감독되지 않은 최고의 결과입니다. (3) 자동으로 일반화 한 번도 본 적이 없는 새로운 결함.
3. PatchCore: 구현
PatchCore의 아이디어는 우아합니다. 사전 훈련된 백본(WideResNet-50)을 사용하여 채굴합니다. 일반 이미지의 기능을 패치하고 메모리 뱅크 기능의 명목상. 추론에서는 새 이미지의 패치 특징을 다음과 비교합니다. 메모리 뱅크: 높은 거리 = 이상.
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset
import numpy as np
import cv2
from pathlib import Path
from sklearn.random_projection import SparseRandomProjection
from sklearn.metrics import roc_auc_score
from typing import Optional
import faiss # pip install faiss-cpu o faiss-gpu
class PatchCoreModel:
"""
PatchCore: Towards Total Recall in Industrial Anomaly Detection
(Roth et al., 2022) - CVPR 2022 Best Paper
Principio:
1. Estrae patch features con backbone pre-trained (WideResNet-50)
2. Costruisce memory bank con le feature di tutte le immagini normali
3. Applica coreset subsampling per ridurre la memoria (greedy k-center)
4. In inference: anomaly score = nearest neighbor distance dalla memory bank
"""
def __init__(self, backbone: str = 'wide_resnet50_2',
layers: list[str] = None,
device: str = 'cuda',
embedding_dim: int = 1024,
n_neighbors: int = 9):
self.device = torch.device(device)
self.layers = layers or ['layer2', 'layer3']
self.n_neighbors = n_neighbors
# Backbone pre-trained su ImageNet (feature extractor, no fine-tuning)
backbone_model = getattr(models, backbone)(
weights='IMAGENET1K_V1'
)
# Rimuovi i layer finali (non servono per l'estrazione di feature)
self.feature_extractor = self._build_extractor(backbone_model)
self.feature_extractor.to(self.device).eval()
# Random projection per ridurre dimensionalità (da 1024 a 384)
self.projector = SparseRandomProjection(
n_components=embedding_dim // 2,
eps=0.1
)
# FAISS index per nearest neighbor search veloce
self.memory_bank: Optional[np.ndarray] = None
self.faiss_index: Optional[faiss.IndexFlatL2] = None
# Preprocessing standard ImageNet
self.transform = T.Compose([
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def _build_extractor(self, backbone: nn.Module) -> nn.Module:
"""Crea feature extractor con hook sugli intermediate layer."""
class FeatureExtractor(nn.Module):
def __init__(self, model, target_layers):
super().__init__()
self.model = model
self.target_layers = target_layers
self.features = {}
# Registra forward hook
for name, module in model.named_modules():
if name in target_layers:
module.register_forward_hook(
self._make_hook(name)
)
def _make_hook(self, name):
def hook(module, input, output):
self.features[name] = output
return hook
def forward(self, x):
self.features.clear()
self.model(x)
return self.features.copy()
return FeatureExtractor(backbone, self.layers)
def fit(self, train_loader: DataLoader,
coreset_ratio: float = 0.1) -> None:
"""
Costruisce la memory bank dalle immagini normali di training.
coreset_ratio: percentuale di patch da mantenere (0.1 = 10%)
Riduce la memory bank con greedy coreset subsampling.
"""
all_features = []
print("Estraendo feature patch dal training set...")
with torch.no_grad():
for batch_idx, (images, _) in enumerate(train_loader):
images = images.to(self.device)
features_dict = self.feature_extractor(images)
# Interpola e concatena feature da più layer
patch_features = self._aggregate_features(features_dict)
all_features.append(patch_features.cpu().numpy())
if batch_idx % 10 == 0:
print(f" Batch {batch_idx}/{len(train_loader)}")
# Stack tutte le patch features: (N_patches, D)
memory_bank = np.vstack(all_features)
print(f"Memory bank iniziale: {memory_bank.shape}")
# Random projection per ridurre dimensionalità
memory_bank = self.projector.fit_transform(memory_bank)
# Coreset subsampling: mantieni solo coreset_ratio% delle patch
n_coreset = max(1, int(len(memory_bank) * coreset_ratio))
memory_bank = self._greedy_coreset(memory_bank, n_coreset)
self.memory_bank = memory_bank.astype(np.float32)
print(f"Memory bank finale: {self.memory_bank.shape}")
# Costruisci indice FAISS per nearest neighbor veloce
dim = self.memory_bank.shape[1]
self.faiss_index = faiss.IndexFlatL2(dim)
self.faiss_index.add(self.memory_bank)
print("Memory bank pronta per inference")
def _aggregate_features(self, features_dict: dict) -> torch.Tensor:
"""
Interpola e concatena feature da diversi layer del backbone.
I feature map di layer2 (H/8) e layer3 (H/16) vengono
upsamplati alla stessa risoluzione e concatenati.
"""
feature_maps = list(features_dict.values())
# Target: risoluzione del primo layer (più alta)
target_size = feature_maps[0].shape[-2:]
aligned = []
for fm in feature_maps:
if fm.shape[-2:] != target_size:
fm = nn.functional.interpolate(
fm, size=target_size, mode='bilinear',
align_corners=False
)
aligned.append(fm)
# Concatena lungo dim channels: (B, C1+C2, H, W)
combined = torch.cat(aligned, dim=1)
# Reshape in patch tokens: (B*H*W, C1+C2)
B, C, H, W = combined.shape
patches = combined.permute(0, 2, 3, 1).reshape(-1, C)
return patches
def _greedy_coreset(self, data: np.ndarray, n: int) -> np.ndarray:
"""
Greedy k-center coreset subsampling.
Seleziona n punti che massimizzano la copertura dello spazio.
"""
if n >= len(data):
return data
selected = [np.random.randint(0, len(data))]
min_distances = np.full(len(data), np.inf)
for _ in range(n - 1):
last = data[selected[-1]]
dists = np.linalg.norm(data - last, axis=1)
min_distances = np.minimum(min_distances, dists)
selected.append(int(np.argmax(min_distances)))
return data[selected]
def score_image(self, img_bgr: np.ndarray,
img_size: int = 224) -> tuple[float, np.ndarray]:
"""
Calcola l'anomaly score di un'immagine.
Returns:
image_score: score scalare (soglia tipica: 0.5-0.8)
anomaly_map: mappa 2D dell'anomalia per localizzazione
"""
# Preprocessing
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
img_resized = cv2.resize(img_rgb, (img_size, img_size))
tensor = self.transform(img_resized).unsqueeze(0).to(self.device)
with torch.no_grad():
features_dict = self.feature_extractor(tensor)
patch_features = self._aggregate_features(features_dict)
patch_features_proj = self.projector.transform(
patch_features.cpu().numpy().astype(np.float32)
)
# Nearest neighbor distance per ogni patch
distances, _ = self.faiss_index.search(
patch_features_proj, self.n_neighbors
)
# Score patch = media delle n_neighbors distanze
patch_scores = distances.mean(axis=1)
# Ricostruisci anomaly map (H, W)
n_patches_side = int(np.sqrt(len(patch_scores)))
anomaly_map = patch_scores.reshape(
n_patches_side, n_patches_side
)
# Upscale alla dimensione originale
anomaly_map_full = cv2.resize(
anomaly_map.astype(np.float32),
(img_bgr.shape[1], img_bgr.shape[0]),
interpolation=cv2.INTER_LINEAR
)
# Score immagine = percentile 99 degli score patch
image_score = float(np.percentile(patch_scores, 99))
return image_score, anomaly_map_full
4. 데이터 파이프라인 및 산업 전처리
import albumentations as A
import torch
from torch.utils.data import Dataset
import cv2
import numpy as np
from pathlib import Path
class PCBInspectionDataset(Dataset):
"""
Dataset per ispezione PCB.
Struttura cartelle attesa:
root/
train/good/ <- immagini normali (training)
test/good/ <- immagini normali (test)
test/defect_type_1/ <- immagini difettose per valutazione
test/defect_type_2/
ground_truth/ <- mask binarie delle anomalie (test)
"""
# Augmentation per dati NORMALI di training
# Obiettivo: aumentare la varieta senza introdurre pattern simili a difetti
NORMAL_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.5),
A.RandomRotate90(p=0.5),
# Variazioni di illuminazione (simula diverse condizioni di luce)
A.RandomBrightnessContrast(
brightness_limit=0.1,
contrast_limit=0.1,
p=0.3
),
# NON aggiungere: blur, noise, elastic -> simulano difetti!
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# Transform per test/inference (NO augmentation)
TEST_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
def __init__(self, root: str, split: str = 'train',
augment: bool = True):
self.root = Path(root)
self.split = split
self.transform = (self.NORMAL_TRANSFORM if augment and split == 'train'
else self.TEST_TRANSFORM)
self.samples = [] # list di (img_path, label, mask_path_or_None)
self._load_samples()
def _load_samples(self) -> None:
if self.split == 'train':
# Solo immagini normali per training
normal_dir = self.root / 'train' / 'good'
for img_path in sorted(normal_dir.glob('*.png')):
self.samples.append((img_path, 0, None))
else:
# Test: normali + difettosi con ground truth mask
test_dir = self.root / 'test'
gt_dir = self.root / 'ground_truth'
for class_dir in sorted(test_dir.iterdir()):
label = 0 if class_dir.name == 'good' else 1
for img_path in sorted(class_dir.glob('*.png')):
mask_path = None
if label == 1:
# Path della ground truth mask
mask_path = (gt_dir / class_dir.name /
img_path.name)
self.samples.append((img_path, label, mask_path))
def __len__(self) -> int:
return len(self.samples)
def __getitem__(self, idx: int) -> tuple:
img_path, label, mask_path = self.samples[idx]
# Carica immagine
img = cv2.imread(str(img_path))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Applica transform
transformed = self.transform(image=img)
img_tensor = torch.from_numpy(
transformed['image'].transpose(2, 0, 1) # HWC -> CHW
).float()
# Carica mask (se disponibile)
mask = np.zeros((224, 224), dtype=np.float32)
if mask_path and mask_path.exists():
m = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
m = cv2.resize(m, (224, 224)) / 255.0
mask = m.astype(np.float32)
return img_tensor, label, torch.from_numpy(mask)
class PCBInspectionPipeline:
"""Pipeline di ispezione real-time per la linea PCB."""
def __init__(self, model: PatchCoreModel,
threshold: float = 0.65,
alert_callback=None):
self.model = model
self.threshold = threshold
self.alert_callback = alert_callback
self.stats = {'total': 0, 'ok': 0, 'defect': 0}
def inspect(self, img_bgr: np.ndarray,
board_id: str = '') -> dict:
"""
Ispeziona una scheda PCB.
Returns: risultato con score, decision, anomaly_map
"""
score, anomaly_map = self.model.score_image(img_bgr)
is_defect = score >= self.threshold
self.stats['total'] += 1
if is_defect:
self.stats['defect'] += 1
if self.alert_callback:
self.alert_callback(board_id, score, anomaly_map)
else:
self.stats['ok'] += 1
return {
'board_id': board_id,
'score': float(score),
'is_defect': bool(is_defect),
'anomaly_map': anomaly_map,
'defect_rate': self.stats['defect'] / max(1, self.stats['total'])
}
5. 평가: 산업 지표
산업적 맥락에서 표준 ML 지표(정확도, F1)로는 충분하지 않습니다. 거짓 음성(검출되지 않은 결함)은 결함이 발견되지 않은 것보다 훨씬 더 많은 비용이 듭니다. 거짓 긍정(양호한 카드 폐기). 관련 측정항목은 다음과 같습니다.
import numpy as np
from sklearn.metrics import roc_auc_score, roc_curve
from typing import NamedTuple
class AnomalyEvaluationMetrics(NamedTuple):
auroc: float # Area Under ROC - detection-level
aupro: float # Area Under Per-Region Overlap - pixel-level
threshold: float # soglia ottimale per ROC
fnr: float # False Negative Rate alla soglia
fpr: float # False Positive Rate alla soglia
def evaluate_anomaly_detection(
model: PatchCoreModel,
test_loader,
target_fnr: float = 0.005 # max 0.5% di difetti non rilevati
) -> AnomalyEvaluationMetrics:
"""
Valutazione completa su test set con metriche industriali.
target_fnr: FNR target (vincolo di business, es. 0.5%)
La soglia viene selezionata per rispettare questo vincolo.
"""
all_scores = []
all_labels = []
all_masks = []
all_anomaly_maps = []
for images, labels, masks in test_loader:
for i in range(len(images)):
img_np = images[i].numpy().transpose(1, 2, 0)
# Denormalizza
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img_denorm = ((img_np * std + mean) * 255).astype(np.uint8)
img_bgr = cv2.cvtColor(img_denorm, cv2.COLOR_RGB2BGR)
score, anomaly_map = model.score_image(img_bgr)
all_scores.append(score)
all_labels.append(int(labels[i]))
all_masks.append(masks[i].numpy())
all_anomaly_maps.append(anomaly_map)
scores_arr = np.array(all_scores)
labels_arr = np.array(all_labels)
# Image-level AUROC
auroc = roc_auc_score(labels_arr, scores_arr)
# Pixel-level AUPRO (Area Under Per-Region Overlap)
aupro = compute_aupro(all_anomaly_maps, all_masks)
# Trova soglia che rispetta il vincolo FNR
fpr_arr, tpr_arr, thresholds = roc_curve(labels_arr, scores_arr)
fnr_arr = 1 - tpr_arr
# Seleziona soglia con FNR <= target_fnr
valid_idx = fnr_arr <= target_fnr
if valid_idx.any():
# Tra le soglie valide, scegli quella con FPR minima
valid_fprs = fpr_arr[valid_idx]
valid_thresholds = thresholds[valid_idx]
best_idx = np.argmin(valid_fprs)
optimal_threshold = float(valid_thresholds[best_idx])
optimal_fnr = float(fnr_arr[valid_idx][best_idx])
optimal_fpr = float(valid_fprs[best_idx])
else:
# Fallback: EER
eer_idx = np.argmin(np.abs(fpr_arr - fnr_arr))
optimal_threshold = float(thresholds[eer_idx])
optimal_fnr = float(fnr_arr[eer_idx])
optimal_fpr = float(fpr_arr[eer_idx])
print(f"=== Risultati Anomaly Detection ===")
print(f"AUROC (image-level): {auroc:.4f} ({auroc*100:.2f}%)")
print(f"AUPRO (pixel-level): {aupro:.4f} ({aupro*100:.2f}%)")
print(f"Soglia ottimale: {optimal_threshold:.4f}")
print(f"FNR @ soglia: {optimal_fnr*100:.2f}%")
print(f"FPR @ soglia: {optimal_fpr*100:.2f}%")
return AnomalyEvaluationMetrics(
auroc=auroc,
aupro=aupro,
threshold=optimal_threshold,
fnr=optimal_fnr,
fpr=optimal_fpr
)
def compute_aupro(anomaly_maps: list, gt_masks: list,
max_fpr: float = 0.3) -> float:
"""
Area Under Per-Region Overlap (AUPRO).
Metrica robusta per la localizzazione di anomalie di dimensioni variabili.
A differenza del pixel-level AUROC, non penalizza i difetti piccoli.
"""
all_fprs, all_pros = [], []
for am, mask in zip(anomaly_maps, gt_masks):
am_flat = am.flatten()
mask_flat = (mask > 0.5).astype(float).flatten()
if mask_flat.sum() == 0:
continue
fpr_vals, tpr_vals, _ = roc_curve(mask_flat, am_flat)
all_fprs.extend(fpr_vals.tolist())
all_pros.extend(tpr_vals.tolist())
if not all_fprs:
return 0.0
# Ordina e integra fino a max_fpr
sorted_pairs = sorted(zip(all_fprs, all_pros))
fprs_sorted, pros_sorted = zip(*sorted_pairs)
# Integra con trapezoid rule entro max_fpr
aupro = 0.0
prev_fpr, prev_pro = 0.0, 0.0
for fpr_val, pro_val in zip(fprs_sorted, pros_sorted):
if fpr_val > max_fpr:
break
aupro += (fpr_val - prev_fpr) * (pro_val + prev_pro) / 2.0
prev_fpr, prev_pro = fpr_val, pro_val
return aupro / max_fpr if max_fpr > 0 else 0.0
6. Jetson Orin 배포: 전체 시스템
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
import numpy as np
import cv2
import pickle
import time
from datetime import datetime
from typing import Optional
import asyncio
# FastAPI per REST API dell'inspection system
# pip install fastapi uvicorn python-multipart
app = FastAPI(title="PCB Inspection API", version="1.0.0")
# Stato globale del sistema
class InspectionSystem:
model: Optional[PatchCoreModel] = None
threshold: float = 0.65
stats = {
'total_inspected': 0,
'defects_found': 0,
'start_time': None,
'recent_scores': [] # ultimi 100 score per monitoring drift
}
system = InspectionSystem()
@app.on_event("startup")
async def load_model():
"""Carica il modello PatchCore all'avvio del server."""
print("Caricamento modello PatchCore...")
system.model = PatchCoreModel(device='cuda')
# Carica memory bank pre-calcolata
with open('memory_bank.pkl', 'rb') as f:
mb_data = pickle.load(f)
system.model.memory_bank = mb_data['memory_bank']
system.model.projector = mb_data['projector']
import faiss
dim = system.model.memory_bank.shape[1]
system.model.faiss_index = faiss.IndexFlatL2(dim)
system.model.faiss_index.add(system.model.memory_bank)
system.stats['start_time'] = datetime.now()
print("Sistema pronto")
@app.post("/inspect")
async def inspect_board(
file: UploadFile = File(...),
board_id: str = ""
):
"""
Endpoint principale per l'ispezione di una scheda PCB.
Accetta immagine JPEG/PNG e restituisce score e decisione.
"""
if system.model is None:
return JSONResponse(
status_code=503,
content={"error": "Model not loaded"}
)
# Leggi e decodifica immagine
img_bytes = await file.read()
nparr = np.frombuffer(img_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
return JSONResponse(
status_code=400,
content={"error": "Invalid image"}
)
# Inference
t_start = time.perf_counter()
score, anomaly_map = system.model.score_image(img)
latency_ms = (time.perf_counter() - t_start) * 1000
is_defect = score >= system.threshold
# Aggiorna statistiche
system.stats['total_inspected'] += 1
if is_defect:
system.stats['defects_found'] += 1
system.stats['recent_scores'].append(float(score))
if len(system.stats['recent_scores']) > 100:
system.stats['recent_scores'].pop(0)
return {
"board_id": board_id or f"board_{system.stats['total_inspected']}",
"timestamp": datetime.now().isoformat(),
"score": round(float(score), 4),
"is_defect": bool(is_defect),
"latency_ms": round(latency_ms, 1),
"defect_rate_recent": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
)
}
@app.get("/health")
async def health_check():
"""Monitoraggio drift: score medio recente vs baseline."""
recent = system.stats['recent_scores']
return {
"status": "ok",
"model_loaded": system.model is not None,
"total_inspected": system.stats['total_inspected'],
"defect_rate": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
),
"recent_avg_score": float(np.mean(recent)) if recent else 0.0,
"uptime_seconds": (
(datetime.now() - system.stats['start_time']).total_seconds()
if system.stats['start_time'] else 0
)
}
# Avvio: uvicorn inspection_server:app --host 0.0.0.0 --port 8000
7. 모델 드리프트: 프로덕션에서의 감지 및 업데이트
생산 중인 모델은 정적이지 않습니다. 새로운 PCB 설계, 프로세스 변경 용접, 조명 마모 등 모든 것이 모델 드리프트: 성능의 점진적인 저하가 측정항목에 항상 표시되는 것은 아닙니다. 중요해질 때까지 분류합니다. 사전 예방적인 모니터링이 필수적입니다.
import numpy as np
import json
import time
from pathlib import Path
from collections import deque
from dataclasses import dataclass, field
from scipy import stats as scipy_stats
import warnings
@dataclass
class DriftReport:
"""Report dello stato del drift del modello."""
timestamp: float = field(default_factory=time.time)
window_size: int = 0
current_mean_score: float = 0.0
baseline_mean_score: float = 0.0
score_drift: float = 0.0 # differenza dalla baseline
ks_statistic: float = 0.0 # Kolmogorov-Smirnov stat
ks_p_value: float = 1.0 # p-value KS test
is_drifting: bool = False
drift_level: str = 'none' # 'none', 'warning', 'critical'
action_required: str = ''
class ModelDriftMonitor:
"""
Monitora il drift del modello PatchCore in produzione.
Basato su:
1. Score distribution monitoring (media rolling dei "good" score)
2. Kolmogorov-Smirnov test tra distribuzione corrente e baseline
3. Alert adattivi con 3 livelli: OK / WARNING / CRITICAL
Strategia di aggiornamento:
- WARNING: aumenta frequenza di sampling manuale (ogni 100 schede invece di 1000)
- CRITICAL: blocca linea per re-calibrazione del modello
"""
def __init__(self,
baseline_scores_path: str,
window_size: int = 500,
warning_threshold: float = 0.05, # +5% drift
critical_threshold: float = 0.10, # +10% drift
ks_alpha: float = 0.01): # p-value per KS test
"""
baseline_scores_path: JSON con score di "good" boards baseline
window_size: numero di score da mantenere nella finestra rolling
"""
self.window_size = window_size
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold
self.ks_alpha = ks_alpha
# Carica baseline
with open(baseline_scores_path) as f:
data = json.load(f)
self.baseline_scores = np.array(data['good_scores'])
self.baseline_mean = float(np.mean(self.baseline_scores))
self.baseline_std = float(np.std(self.baseline_scores))
print(f"Baseline caricata: {len(self.baseline_scores)} score")
print(f" Mean: {self.baseline_mean:.4f} ± {self.baseline_std:.4f}")
# Finestra rolling per i "good" score correnti
self.current_scores: deque = deque(maxlen=window_size)
self.report_history: list[DriftReport] = []
def record_score(self, anomaly_score: float,
is_good: bool) -> None:
"""
Registra lo score di una scheda.
is_good: True se la scheda è stata classificata come OK
(o confermata come buona dall'operatore)
"""
if is_good:
self.current_scores.append(anomaly_score)
def check_drift(self) -> DriftReport:
"""
Esegue il controllo drift sulla finestra corrente.
Returns: DriftReport con stato e azione consigliata.
"""
report = DriftReport(window_size=len(self.current_scores))
if len(self.current_scores) < 50:
# Non abbastanza dati per un test significativo
report.action_required = 'collecting_data'
return report
current = np.array(self.current_scores)
report.current_mean_score = float(np.mean(current))
report.baseline_mean_score = self.baseline_mean
# 1. Score drift (differenza dalla baseline in deviazioni standard)
report.score_drift = abs(
report.current_mean_score - self.baseline_mean
) / (self.baseline_std + 1e-10)
# 2. Kolmogorov-Smirnov test
# Confronta la distribuzione corrente con la baseline
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ks_stat, ks_pval = scipy_stats.ks_2samp(
self.baseline_scores, current
)
report.ks_statistic = float(ks_stat)
report.ks_p_value = float(ks_pval)
# 3. Classificazione del drift
distribution_changed = ks_pval < self.ks_alpha
mean_drifted_critical = report.score_drift > self.critical_threshold * 10
mean_drifted_warning = report.score_drift > self.warning_threshold * 10
if mean_drifted_critical and distribution_changed:
report.is_drifting = True
report.drift_level = 'critical'
report.action_required = 'STOP_LINE: re-calibrate model immediately'
elif mean_drifted_warning or distribution_changed:
report.is_drifting = True
report.drift_level = 'warning'
report.action_required = 'increase_sampling: inspect every 100 boards'
else:
report.drift_level = 'none'
report.action_required = 'continue_normal_operation'
self.report_history.append(report)
return report
def suggest_retraining(self, current_auroc: float,
target_auroc: float = 0.99) -> dict:
"""
Suggerisce una strategia di retraining basata sulle metriche correnti.
"""
auroc_drop = target_auroc - current_auroc
latest_report = self.report_history[-1] if self.report_history else None
strategy = {
'retrain_needed': current_auroc < target_auroc - 0.005,
'current_auroc': current_auroc,
'target_auroc': target_auroc,
'auroc_gap': auroc_drop,
}
if current_auroc >= target_auroc:
strategy['action'] = 'no_action_needed'
elif auroc_drop < 0.02:
# Piccola degradazione: aggiorna solo memory bank con nuovi campioni buoni
strategy['action'] = 'update_memory_bank'
strategy['description'] = (
'Aggiungi 200-500 nuove immagini buone alla memory bank. '
'Non richiede re-training del backbone.'
)
elif auroc_drop < 0.05:
# Degradazione media: fine-tune con nuovi campioni
strategy['action'] = 'incremental_finetune'
strategy['description'] = (
'Fine-tune backbone con mix di dati vecchi (70%) e nuovi (30%). '
'Learning rate basso: 1e-5. 10-20 epoch.'
)
else:
# Degradazione severa: retrain completo
strategy['action'] = 'full_retrain'
strategy['description'] = (
'Rebuild completo della memory bank con dati correnti. '
'Considera re-annotazione se sono cambiati i tipi di difetti.'
)
return strategy
# ---- Threshold adattivo basato su SLA di produzione ----
def compute_adaptive_threshold(model_scores_good: np.ndarray,
model_scores_defective: np.ndarray,
max_fnr: float = 0.003, # SLA: max 0.3% FNR
max_fpr: float = 0.02) -> dict:
"""
Calcola la soglia ottimale per rispettare gli SLA di produzione.
Priorità: FNR (difetti sfuggiti) è il vincolo critico.
FPR (falsi allarmi) è secondario.
Returns: {threshold, fnr, fpr, tradeoff_note}
"""
# Genera candidati threshold
all_scores = np.concatenate([model_scores_good, model_scores_defective])
thresholds = np.percentile(all_scores, np.arange(1, 100, 0.5))
best_threshold = None
best_fpr = float('inf')
for t in thresholds:
# Predizioni
tp = np.sum(model_scores_defective > t)
fn = np.sum(model_scores_defective <= t)
fp = np.sum(model_scores_good > t)
tn = np.sum(model_scores_good <= t)
fnr = fn / (fn + tp + 1e-10)
fpr = fp / (fp + tn + 1e-10)
# Rispetta il vincolo FNR e minimizza FPR
if fnr <= max_fnr and fpr < best_fpr:
best_fpr = fpr
best_threshold = t
if best_threshold is None:
# Fallback: usa EER se non riesce a rispettare FNR SLA
import warnings
warnings.warn("Impossibile rispettare FNR SLA. Usando EER come fallback.")
# ... (implementazione EER omessa per brevita)
best_threshold = np.median(all_scores)
best_fpr = 0.05
# Metriche finali alla soglia scelta
tp = np.sum(model_scores_defective > best_threshold)
fn = np.sum(model_scores_defective <= best_threshold)
fp = np.sum(model_scores_good > best_threshold)
tn = np.sum(model_scores_good <= best_threshold)
fnr_final = fn / (fn + tp + 1e-10)
fpr_final = fp / (fp + tn + 1e-10)
return {
'threshold': float(best_threshold),
'fnr': float(fnr_final),
'fpr': float(fpr_final),
'precision': float(tp / (tp + fp + 1e-10)),
'recall': float(tp / (tp + fn + 1e-10)),
'meets_fnr_sla': fnr_final <= max_fnr,
'meets_fpr_target': fpr_final <= max_fpr,
}
8. 결과와 교훈
실제 프로젝트 결과(제작 3개월 후)
| 미터법 | 인간 검사 | 비전 AI 시스템 | 개선 |
|---|---|---|---|
| 처리량 | 20 카드/분 | 140 카드/분 | 7x |
| 거짓음성률 | 2-15% (노력에 따라 다름) | 0.3% | ~10배 |
| 거짓양성률 | 0.5% | 1.4% | -2.8x (악화) |
| 검사/시트 비용 | €0.12 | €0.02 | 6배 감소 |
| 현장에서 놓친 결함 | 23개 청구/월 | 3개의 청구/월 | 87% 감소 |
프로젝트의 5가지 핵심 교훈
- 위양성률은 협상 가능하지만 위음성률은 다음과 같습니다. 고객은 좋은 카드 몇 장을 더 폐기하는 데 동의하지만 현장에서는 결함이 있는 카드를 용납하지 않습니다. 항상 FNR 제약 조건을 먼저 충족하도록 임계값을 설계하십시오.
- 조명은 가장 중요한 구성 요소입니다. 초기 오탐지의 60%는 실제 결함이 아닌 조명 변화로 인한 것이었습니다. 모델을 생각하기 전에 조명 제어 시스템(LED 스트로브, 디퓨저 돔)에 투자하세요.
- PatchCore는 천천히 저하되지만 저하됩니다. 3개월 후 AUROC는 메모리 뱅크에 없는 새로운 PCB 설계의 경우 99.1%에서 97.8%로 떨어졌습니다. 증분 메모리 뱅크 업그레이드 전략을 계획합니다.
- 현지화는 운영자에게 필수적입니다. 바이너리 "OK/NOK" 결정만으로는 충분하지 않습니다. 결함이 있는 위치를 보여주는 이상 맵은 오탐지 수동 검증을 위한 운영자의 분석 시간을 70% 단축합니다.
- 정확성뿐만 아니라 점수 분포도 모니터링하세요. 모델 드리프트는 분류 측정항목보다 점수 분포에서 먼저 나타납니다. "양호" 점수의 평균이 과거 기준보다 높아지면 경고합니다.
시리즈의 결론
이 사례 연구는 딥러닝을 사용한 컴퓨터 비전 시리즈를 마무리합니다. 우리는 기본 CNN부터 전이 학습, 탐지까지 전체 궤적을 다루었습니다. YOLO26을 사용하여 세분화하고, 증강부터 생산 파이프라인까지 객체를 분석합니다. OpenCV는 엣지 배포부터 얼굴 인식, 실제 산업 사용 사례에 이르기까지 다양합니다.
컴퓨터 비전은 실용적인 분야입니다. 최상의 결과는 이해하는 사람에게서 나옵니다. 이론(아키텍처, 손실 함수, 메트릭)과 시스템 엔지니어링 모두 (전처리, 배포, 모니터링) 분야가 빠르게 발전하고 있습니다 - YOLO26은 2026년 1월에 출시된 SAM2는 대화형 세분화에 혁명을 일으켰습니다. 기본 원칙은 안정적으로 유지됩니다.
시리즈 탐색
- 이전의: 얼굴 감지 및 인식: 최신 기술
- 시리즈 시작: CNN: 제로에서 프로덕션까지 컨볼루셔널 네트워크
시리즈 간 리소스
- MLOps: 모니터링 및 모델 드리프트 감지 - 생산 중 고급 모니터링
- 고급 딥러닝: 양자화 및 압축 - 모델 최적화
- AI 엔지니어링: RAG 및 벡터 검색 - AI 시스템과의 통합







