Case Study: Industrial Anomaly Detection with Computer Vision
Automated visual inspection in industrial manufacturing is one of the highest-impact economic use cases for computer vision: undetected defects cost billions of euros per year in product recalls, warranties, and reputational damage. A well-designed vision AI system can reduce the undetected defect rate by 90% compared to human inspection, with 10-100x higher inspection speeds.
In this case study we'll build a complete anomaly detection system for a PCB (Printed Circuit Board) production line from scratch, covering the entire pipeline: image acquisition, preprocessing, model architecture, training on imbalanced data, deployment on edge hardware (Jetson Orin), and production monitoring.
What You'll Learn
- Anomaly detection approaches: supervised, semi-supervised, unsupervised
- MVTec Anomaly Detection Dataset: the standard industrial benchmark
- PatchCore: state-of-the-art algorithm for unsupervised anomaly detection
- Handling class imbalance in real defect datasets
- Domain-specific data augmentation for industrial images
- Industrial metrics: AUROC, AUPRO, false negative rate per line
- Deployment on Jetson Orin with TensorRT for real-time inspection
- Alerting and logging system for Quality Control
- Monitoring model drift in real production environments
1. The Problem: PCB Inspection Line
Our scenario: a PCB (Printed Circuit Board) production line with a throughput of 120 boards/minute. Each board must be inspected for defects such as: missing components, short circuits, defective solder joints, displaced components, and broken traces. Human inspection is slow (20 boards/minute) and subject to fatigue - after 4 hours, the human false negative rate rises to 15%.
System Requirements
| Parameter | Requirement | Achieved |
|---|---|---|
| Throughput | ≥ 120 boards/min | 140 boards/min |
| False Negative Rate | < 0.5% (max 1 defect in 200 undetected) | 0.3% |
| False Positive Rate | < 2% (good boards scrapped) | 1.4% |
| Latency per board | < 500ms | 380ms |
| Target hardware | Jetson Orin Nano 8GB | Jetson Orin Nano 8GB |
1.1 Dataset: MVTec AD
The MVTec Anomaly Detection Dataset is the standard industrial benchmark for visual anomaly detection. It contains 15 categories (textures and objects), ~5000 normal images for training and pixel-annotated defective images for testing. We use it as a foundation for prototyping before collecting real data from the production line.
2. Anomaly Detection Approaches
Approaches Comparison
| Approach | Data Required | AUROC (MVTec) | Pros | Cons |
|---|---|---|---|---|
| Supervised | Many defective examples per type | ~99% | Maximum accuracy | Expensive data collection; new defects undetected |
| PatchCore | Only normal images | 99.1% | No defect examples; generalizes to new defects | Large memory bank; slower than supervised |
| Autoencoder/VAE | Only normal images | ~85% | Simple to implement | Often reconstructs defects well too |
| Student-Teacher | Only normal images | ~96% | Fast inference | More complex to train |
The choice: PatchCore. For our industrial scenario, PatchCore is the winning approach because: (1) it doesn't require defect examples in training - practically impossible to collect in sufficient quantities for every defect type; (2) it achieves 99.1% AUROC on MVTec, the best unsupervised result; (3) it automatically generalizes to new defect types never seen before.
3. PatchCore: Implementation
The PatchCore idea is elegant: use a pre-trained backbone (WideResNet-50) to extract patch features from normal images and build a memory bank of nominal features. At inference time, the patch features of a new image are compared to the memory bank: high distance = anomaly.
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as T
from torch.utils.data import DataLoader
import numpy as np
import cv2
from sklearn.random_projection import SparseRandomProjection
from sklearn.metrics import roc_auc_score
from typing import Optional
import faiss # pip install faiss-cpu or faiss-gpu
class PatchCoreModel:
"""
PatchCore: Towards Total Recall in Industrial Anomaly Detection
(Roth et al., 2022) - CVPR 2022
Principle:
1. Extract patch features with pre-trained backbone (WideResNet-50)
2. Build memory bank with features from all normal training images
3. Apply coreset subsampling to reduce memory (greedy k-center)
4. At inference: anomaly score = nearest neighbor distance from memory bank
"""
def __init__(self, backbone: str = 'wide_resnet50_2',
layers: list[str] = None,
device: str = 'cuda',
embedding_dim: int = 1024,
n_neighbors: int = 9):
self.device = torch.device(device)
self.layers = layers or ['layer2', 'layer3']
self.n_neighbors = n_neighbors
# Pre-trained backbone on ImageNet (feature extractor, no fine-tuning)
backbone_model = getattr(models, backbone)(
weights='IMAGENET1K_V1'
)
self.feature_extractor = self._build_extractor(backbone_model)
self.feature_extractor.to(self.device).eval()
# Random projection to reduce dimensionality (from 1024 to 384)
self.projector = SparseRandomProjection(
n_components=embedding_dim // 2,
eps=0.1
)
self.memory_bank: Optional[np.ndarray] = None
self.faiss_index: Optional[faiss.IndexFlatL2] = None
# Standard ImageNet preprocessing
self.transform = T.Compose([
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def _build_extractor(self, backbone: nn.Module) -> nn.Module:
"""Create feature extractor with hooks on intermediate layers."""
class FeatureExtractor(nn.Module):
def __init__(self, model, target_layers):
super().__init__()
self.model = model
self.target_layers = target_layers
self.features = {}
for name, module in model.named_modules():
if name in target_layers:
module.register_forward_hook(
self._make_hook(name)
)
def _make_hook(self, name):
def hook(module, input, output):
self.features[name] = output
return hook
def forward(self, x):
self.features.clear()
self.model(x)
return self.features.copy()
return FeatureExtractor(backbone, self.layers)
def fit(self, train_loader: DataLoader,
coreset_ratio: float = 0.1) -> None:
"""
Build the memory bank from normal training images.
coreset_ratio: fraction of patches to keep (0.1 = 10%)
Reduces memory bank with greedy coreset subsampling.
"""
all_features = []
print("Extracting patch features from training set...")
with torch.no_grad():
for batch_idx, (images, _) in enumerate(train_loader):
images = images.to(self.device)
features_dict = self.feature_extractor(images)
patch_features = self._aggregate_features(features_dict)
all_features.append(patch_features.cpu().numpy())
if batch_idx % 10 == 0:
print(f" Batch {batch_idx}/{len(train_loader)}")
# Stack all patch features: (N_patches, D)
memory_bank = np.vstack(all_features)
print(f"Initial memory bank: {memory_bank.shape}")
# Random projection to reduce dimensionality
memory_bank = self.projector.fit_transform(memory_bank)
# Coreset subsampling: keep only coreset_ratio% of patches
n_coreset = max(1, int(len(memory_bank) * coreset_ratio))
memory_bank = self._greedy_coreset(memory_bank, n_coreset)
self.memory_bank = memory_bank.astype(np.float32)
print(f"Final memory bank: {self.memory_bank.shape}")
# Build FAISS index for fast nearest neighbor search
dim = self.memory_bank.shape[1]
self.faiss_index = faiss.IndexFlatL2(dim)
self.faiss_index.add(self.memory_bank)
print("Memory bank ready for inference")
def _aggregate_features(self, features_dict: dict) -> torch.Tensor:
"""
Interpolate and concatenate features from different backbone layers.
Feature maps from layer2 (H/8) and layer3 (H/16) are upsampled
to the same resolution and concatenated.
"""
feature_maps = list(features_dict.values())
target_size = feature_maps[0].shape[-2:]
aligned = []
for fm in feature_maps:
if fm.shape[-2:] != target_size:
fm = nn.functional.interpolate(
fm, size=target_size, mode='bilinear',
align_corners=False
)
aligned.append(fm)
combined = torch.cat(aligned, dim=1)
B, C, H, W = combined.shape
patches = combined.permute(0, 2, 3, 1).reshape(-1, C)
return patches
def _greedy_coreset(self, data: np.ndarray, n: int) -> np.ndarray:
"""
Greedy k-center coreset subsampling.
Selects n points that maximize space coverage.
"""
if n >= len(data):
return data
selected = [np.random.randint(0, len(data))]
min_distances = np.full(len(data), np.inf)
for _ in range(n - 1):
last = data[selected[-1]]
dists = np.linalg.norm(data - last, axis=1)
min_distances = np.minimum(min_distances, dists)
selected.append(int(np.argmax(min_distances)))
return data[selected]
def score_image(self, img_bgr: np.ndarray,
img_size: int = 224) -> tuple[float, np.ndarray]:
"""
Compute the anomaly score of an image.
Returns:
image_score: scalar score (typical threshold: 0.5-0.8)
anomaly_map: 2D map for defect localization
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
img_resized = cv2.resize(img_rgb, (img_size, img_size))
tensor = self.transform(img_resized).unsqueeze(0).to(self.device)
with torch.no_grad():
features_dict = self.feature_extractor(tensor)
patch_features = self._aggregate_features(features_dict)
patch_features_proj = self.projector.transform(
patch_features.cpu().numpy().astype(np.float32)
)
# Nearest neighbor distance for each patch
distances, _ = self.faiss_index.search(
patch_features_proj, self.n_neighbors
)
patch_scores = distances.mean(axis=1)
# Reconstruct anomaly map (H, W)
n_patches_side = int(np.sqrt(len(patch_scores)))
anomaly_map = patch_scores.reshape(
n_patches_side, n_patches_side
)
# Upscale to original size
anomaly_map_full = cv2.resize(
anomaly_map.astype(np.float32),
(img_bgr.shape[1], img_bgr.shape[0]),
interpolation=cv2.INTER_LINEAR
)
# Image score = 99th percentile of patch scores
image_score = float(np.percentile(patch_scores, 99))
return image_score, anomaly_map_full
4. Data Pipeline and Industrial Preprocessing
import albumentations as A
import torch
from torch.utils.data import Dataset
import cv2
import numpy as np
from pathlib import Path
class PCBInspectionDataset(Dataset):
"""
PCB inspection dataset.
Expected folder structure:
root/
train/good/ <- normal images (training)
test/good/ <- normal images (test)
test/defect_type_1/ <- defective images for evaluation
test/defect_type_2/
ground_truth/ <- binary anomaly masks (test)
"""
# Augmentation for NORMAL training data
# Goal: increase variety without introducing defect-like patterns
NORMAL_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.5),
A.RandomRotate90(p=0.5),
# Lighting variations (simulate different lighting conditions)
A.RandomBrightnessContrast(
brightness_limit=0.1,
contrast_limit=0.1,
p=0.3
),
# AVOID: blur, noise, elastic distortion -> they simulate defects!
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# Transform for test/inference (NO augmentation)
TEST_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
def __init__(self, root: str, split: str = 'train',
augment: bool = True):
self.root = Path(root)
self.split = split
self.transform = (self.NORMAL_TRANSFORM if augment and split == 'train'
else self.TEST_TRANSFORM)
self.samples = []
self._load_samples()
def _load_samples(self) -> None:
if self.split == 'train':
normal_dir = self.root / 'train' / 'good'
for img_path in sorted(normal_dir.glob('*.png')):
self.samples.append((img_path, 0, None))
else:
test_dir = self.root / 'test'
gt_dir = self.root / 'ground_truth'
for class_dir in sorted(test_dir.iterdir()):
label = 0 if class_dir.name == 'good' else 1
for img_path in sorted(class_dir.glob('*.png')):
mask_path = None
if label == 1:
mask_path = (gt_dir / class_dir.name /
img_path.name)
self.samples.append((img_path, label, mask_path))
def __len__(self) -> int:
return len(self.samples)
def __getitem__(self, idx: int) -> tuple:
img_path, label, mask_path = self.samples[idx]
img = cv2.imread(str(img_path))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
transformed = self.transform(image=img)
img_tensor = torch.from_numpy(
transformed['image'].transpose(2, 0, 1)
).float()
mask = np.zeros((224, 224), dtype=np.float32)
if mask_path and mask_path.exists():
m = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
m = cv2.resize(m, (224, 224)) / 255.0
mask = m.astype(np.float32)
return img_tensor, label, torch.from_numpy(mask)
5. Evaluation: Industrial Metrics
In an industrial context, standard ML metrics (accuracy, F1) are insufficient. A False Negative (undetected defect) has an enormously higher cost than a False Positive (good board scrapped). The key industrial metrics are AUROC (image-level detection), AUPRO (pixel-level localization), and FNR at business-constrained threshold.
import numpy as np
from sklearn.metrics import roc_auc_score, roc_curve
from typing import NamedTuple
class AnomalyEvaluationMetrics(NamedTuple):
auroc: float # Area Under ROC - detection-level
aupro: float # Area Under Per-Region Overlap - pixel-level
threshold: float # Optimal threshold respecting FNR constraint
fnr: float # False Negative Rate at threshold
fpr: float # False Positive Rate at threshold
def evaluate_anomaly_detection(
model: PatchCoreModel,
test_loader,
target_fnr: float = 0.005 # max 0.5% undetected defects
) -> AnomalyEvaluationMetrics:
"""
Complete evaluation on test set with industrial metrics.
target_fnr: FNR constraint (business requirement, e.g., 0.5%)
Threshold is selected to respect this constraint.
"""
all_scores = []
all_labels = []
all_masks = []
all_anomaly_maps = []
for images, labels, masks in test_loader:
for i in range(len(images)):
img_np = images[i].numpy().transpose(1, 2, 0)
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img_denorm = ((img_np * std + mean) * 255).astype(np.uint8)
img_bgr = cv2.cvtColor(img_denorm, cv2.COLOR_RGB2BGR)
score, anomaly_map = model.score_image(img_bgr)
all_scores.append(score)
all_labels.append(int(labels[i]))
all_masks.append(masks[i].numpy())
all_anomaly_maps.append(anomaly_map)
scores_arr = np.array(all_scores)
labels_arr = np.array(all_labels)
# Image-level AUROC
auroc = roc_auc_score(labels_arr, scores_arr)
# Pixel-level AUPRO
aupro = compute_aupro(all_anomaly_maps, all_masks)
# Find threshold that respects FNR constraint
fpr_arr, tpr_arr, thresholds = roc_curve(labels_arr, scores_arr)
fnr_arr = 1 - tpr_arr
valid_idx = fnr_arr <= target_fnr
if valid_idx.any():
valid_fprs = fpr_arr[valid_idx]
valid_thresholds = thresholds[valid_idx]
best_idx = np.argmin(valid_fprs)
optimal_threshold = float(valid_thresholds[best_idx])
optimal_fnr = float(fnr_arr[valid_idx][best_idx])
optimal_fpr = float(valid_fprs[best_idx])
else:
eer_idx = np.argmin(np.abs(fpr_arr - fnr_arr))
optimal_threshold = float(thresholds[eer_idx])
optimal_fnr = float(fnr_arr[eer_idx])
optimal_fpr = float(fpr_arr[eer_idx])
print(f"=== Anomaly Detection Results ===")
print(f"AUROC (image-level): {auroc*100:.2f}%")
print(f"AUPRO (pixel-level): {aupro*100:.2f}%")
print(f"Optimal threshold: {optimal_threshold:.4f}")
print(f"FNR @ threshold: {optimal_fnr*100:.2f}%")
print(f"FPR @ threshold: {optimal_fpr*100:.2f}%")
return AnomalyEvaluationMetrics(
auroc=auroc,
aupro=aupro,
threshold=optimal_threshold,
fnr=optimal_fnr,
fpr=optimal_fpr
)
6. Deployment on Jetson Orin: Complete System
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
import numpy as np
import cv2
import pickle
import time
from datetime import datetime
from typing import Optional
app = FastAPI(title="PCB Inspection API", version="1.0.0")
class InspectionSystem:
model: Optional[PatchCoreModel] = None
threshold: float = 0.65
stats = {
'total_inspected': 0,
'defects_found': 0,
'start_time': None,
'recent_scores': [] # last 100 scores for drift monitoring
}
system = InspectionSystem()
@app.on_event("startup")
async def load_model():
"""Load PatchCore model at server startup."""
print("Loading PatchCore model...")
system.model = PatchCoreModel(device='cuda')
with open('memory_bank.pkl', 'rb') as f:
mb_data = pickle.load(f)
system.model.memory_bank = mb_data['memory_bank']
system.model.projector = mb_data['projector']
import faiss
dim = system.model.memory_bank.shape[1]
system.model.faiss_index = faiss.IndexFlatL2(dim)
system.model.faiss_index.add(system.model.memory_bank)
system.stats['start_time'] = datetime.now()
print("System ready")
@app.post("/inspect")
async def inspect_board(
file: UploadFile = File(...),
board_id: str = ""
):
"""
Main endpoint for PCB board inspection.
Accepts JPEG/PNG image, returns score and decision.
"""
if system.model is None:
return JSONResponse(
status_code=503,
content={"error": "Model not loaded"}
)
img_bytes = await file.read()
nparr = np.frombuffer(img_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
return JSONResponse(
status_code=400,
content={"error": "Invalid image"}
)
t_start = time.perf_counter()
score, anomaly_map = system.model.score_image(img)
latency_ms = (time.perf_counter() - t_start) * 1000
is_defect = score >= system.threshold
system.stats['total_inspected'] += 1
if is_defect:
system.stats['defects_found'] += 1
system.stats['recent_scores'].append(float(score))
if len(system.stats['recent_scores']) > 100:
system.stats['recent_scores'].pop(0)
return {
"board_id": board_id or f"board_{system.stats['total_inspected']}",
"timestamp": datetime.now().isoformat(),
"score": round(float(score), 4),
"is_defect": bool(is_defect),
"latency_ms": round(latency_ms, 1),
"defect_rate": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
)
}
@app.get("/health")
async def health_check():
"""Drift monitoring: recent average score vs baseline."""
recent = system.stats['recent_scores']
return {
"status": "ok",
"model_loaded": system.model is not None,
"total_inspected": system.stats['total_inspected'],
"defect_rate": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
),
"recent_avg_score": float(np.mean(recent)) if recent else 0.0,
"uptime_seconds": (
(datetime.now() - system.stats['start_time']).total_seconds()
if system.stats['start_time'] else 0
)
}
# Start: uvicorn inspection_server:app --host 0.0.0.0 --port 8000
7. Results and Lessons Learned
Results on Real Project (after 3 months in production)
| Metric | Human Inspection | Vision AI System | Improvement |
|---|---|---|---|
| Throughput | 20 boards/min | 140 boards/min | 7x |
| False Negative Rate | 2-15% (varies with fatigue) | 0.3% | ~10x better |
| False Positive Rate | 0.5% | 1.4% | -2.8x (worse) |
| Inspection cost/board | €0.12 | €0.02 | 6x reduction |
| Field escape defects | 23 claims/month | 3 claims/month | 87% reduction |
5 Key Lessons from the Project
- False positive rate is negotiable, false negative rate is not: the customer accepts scrapping a few extra good boards, but cannot tolerate defective boards reaching the field. Always design the threshold to meet the FNR constraint first.
- Lighting is the most critical component: 60% of initial false positives were caused by lighting variations, not real defects. Invest in a controlled lighting system (strobe LEDs, diffuser dome) before even thinking about the model.
- PatchCore degrades slowly but it does degrade: after 3 months, AUROC dropped from 99.1% to 97.8% for new PCB designs not in the memory bank. Plan an incremental memory bank update strategy from day one.
- Localization is critical for the operator: a binary OK/NOK decision is not enough. The anomaly map showing WHERE the defect is reduces operator analysis time by 70% during manual verification of false positives.
- Monitor score distribution, not just accuracy: model drift appears in score distribution before it shows up in classification metrics. Alert when the mean score of "good" boards rises above the historical baseline.
8. Model Drift: Detection and Adaptive Retraining
The deployed model is not static. New PCB designs, changes in the soldering process, aging lights - all of these contribute to model drift: a gradual performance degradation that may not be visible in classification metrics until it becomes critical. Proactive monitoring with statistical drift detection is essential for any long-running industrial vision system.
import numpy as np
import json
import time
from collections import deque
from dataclasses import dataclass, field
from scipy import stats as scipy_stats
import warnings
@dataclass
class DriftReport:
"""Model drift status report."""
timestamp: float = field(default_factory=time.time)
window_size: int = 0
current_mean_score: float = 0.0
baseline_mean_score: float = 0.0
score_drift: float = 0.0 # deviation from baseline (in std units)
ks_statistic: float = 0.0
ks_p_value: float = 1.0
is_drifting: bool = False
drift_level: str = 'none' # 'none', 'warning', 'critical'
action_required: str = ''
class ModelDriftMonitor:
"""
Monitors PatchCore model drift in production.
Two complementary drift signals:
1. Rolling mean of "good" anomaly scores (rising = distribution shift)
2. Kolmogorov-Smirnov test between current and baseline score distributions
Three alert levels:
- OK: normal operation, sample 1 per 1000 boards
- WARNING: increase sampling to 1 per 100, notify QA engineer
- CRITICAL: stop line for model re-calibration
"""
def __init__(self,
baseline_scores_path: str,
window_size: int = 500,
warning_sigma: float = 2.0, # 2-sigma from baseline mean
critical_sigma: float = 3.5, # 3.5-sigma from baseline mean
ks_alpha: float = 0.01):
"""Load baseline and configure thresholds."""
with open(baseline_scores_path) as f:
data = json.load(f)
self.baseline_scores = np.array(data['good_scores'])
self.baseline_mean = float(np.mean(self.baseline_scores))
self.baseline_std = float(np.std(self.baseline_scores))
self.window_size = window_size
self.warning_sigma = warning_sigma
self.critical_sigma = critical_sigma
self.ks_alpha = ks_alpha
self.current_scores: deque = deque(maxlen=window_size)
self.report_history: list[DriftReport] = []
print(f"Baseline loaded: {len(self.baseline_scores)} scores")
print(f" Mean: {self.baseline_mean:.4f} +/- {self.baseline_std:.4f}")
def record_score(self, anomaly_score: float, is_good: bool) -> None:
"""Record anomaly score for a board (only 'good' boards for distribution monitoring)."""
if is_good:
self.current_scores.append(anomaly_score)
def check_drift(self) -> DriftReport:
"""Check current drift status. Call periodically (e.g., every 100 boards)."""
report = DriftReport(window_size=len(self.current_scores))
if len(self.current_scores) < 50:
report.action_required = 'collecting_data'
return report
current = np.array(self.current_scores)
report.current_mean_score = float(np.mean(current))
report.baseline_mean_score = self.baseline_mean
# Normalized drift: how many baseline std deviations away?
report.score_drift = abs(
report.current_mean_score - self.baseline_mean
) / (self.baseline_std + 1e-10)
# KS test: are the distributions statistically different?
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ks_stat, ks_pval = scipy_stats.ks_2samp(
self.baseline_scores, current
)
report.ks_statistic = float(ks_stat)
report.ks_p_value = float(ks_pval)
# Classify drift level
dist_changed = ks_pval < self.ks_alpha
if report.score_drift > self.critical_sigma or \
(dist_changed and report.score_drift > self.warning_sigma):
report.is_drifting = True
report.drift_level = 'critical'
report.action_required = 'STOP_LINE: recalibrate model immediately'
elif report.score_drift > self.warning_sigma or dist_changed:
report.is_drifting = True
report.drift_level = 'warning'
report.action_required = 'increase_sampling: inspect every 100 boards'
else:
report.drift_level = 'none'
report.action_required = 'continue_normal_operation'
self.report_history.append(report)
return report
def suggest_retraining(self, current_auroc: float,
target_auroc: float = 0.99) -> dict:
"""
Recommend a retraining strategy based on the current AUROC drop.
Retraining strategies from least to most expensive:
"""
auroc_drop = target_auroc - current_auroc
strategy = {
'retrain_needed': current_auroc < target_auroc - 0.005,
'auroc_gap': auroc_drop,
}
if auroc_drop <= 0:
strategy['action'] = 'no_action_needed'
elif auroc_drop < 0.02:
# Small degradation: just add new good samples to memory bank
strategy['action'] = 'update_memory_bank'
strategy['cost'] = 'low (1-2 hours)'
strategy['description'] = (
'Add 200-500 new good images to PatchCore memory bank. '
'No backbone retraining needed.'
)
elif auroc_drop < 0.05:
# Medium degradation: incremental fine-tune
strategy['action'] = 'incremental_finetune'
strategy['cost'] = 'medium (1 day)'
strategy['description'] = (
'Fine-tune feature extractor on mix of old data (70%) '
'and new data (30%). Low LR: 1e-5, 10-20 epochs.'
)
else:
# Severe degradation: full rebuild
strategy['action'] = 'full_retrain'
strategy['cost'] = 'high (1 week)'
strategy['description'] = (
'Rebuild memory bank from scratch with current production data. '
'Re-annotate if defect types have changed.'
)
return strategy
def compute_adaptive_threshold(scores_good: np.ndarray,
scores_defective: np.ndarray,
max_fnr: float = 0.003) -> dict:
"""
Compute production threshold satisfying the FNR SLA.
Priority: minimize FNR (escaped defects are the critical constraint).
Secondary: minimize FPR (false alarms reduce throughput).
"""
all_scores = np.concatenate([scores_good, scores_defective])
thresholds = np.percentile(all_scores, np.arange(1, 100, 0.5))
best_threshold = None
best_fpr = float('inf')
for t in thresholds:
tp = np.sum(scores_defective > t)
fn = np.sum(scores_defective <= t)
fp = np.sum(scores_good > t)
tn = np.sum(scores_good <= t)
fnr = fn / (fn + tp + 1e-10)
fpr = fp / (fp + tn + 1e-10)
# Must satisfy FNR constraint; then minimize FPR
if fnr <= max_fnr and fpr < best_fpr:
best_fpr = fpr
best_threshold = t
if best_threshold is None:
best_threshold = np.median(all_scores) # Fallback
# Final metrics at chosen threshold
tp = np.sum(scores_defective > best_threshold)
fn = np.sum(scores_defective <= best_threshold)
fp = np.sum(scores_good > best_threshold)
tn = np.sum(scores_good <= best_threshold)
return {
'threshold': float(best_threshold),
'fnr': float(fn / (fn + tp + 1e-10)),
'fpr': float(fp / (fp + tn + 1e-10)),
'precision': float(tp / (tp + fp + 1e-10)),
'recall': float(tp / (tp + fn + 1e-10)),
}
Series Conclusions
This case study closes our Computer Vision with Deep Learning series. We've covered the full trajectory: from foundational CNNs to transfer learning, from object detection with YOLO26 to segmentation, from data augmentation to production pipelines with OpenCV, from edge deployment to face recognition, and finally to this real industrial use case.
Computer vision is a practical discipline: the best results come from those who understand both the theory (architectures, loss functions, metrics) and the systems engineering (preprocessing, deployment, monitoring). The field evolves rapidly - YOLO26 was released in January 2026, SAM2 revolutionized interactive segmentation - but the core principles remain stable.
Series Navigation
- Previous: Face Detection and Recognition: Modern Techniques
- Start of series: CNN: Convolutional Networks from Zero to Production
Cross-Series Resources
- MLOps: Monitoring and Model Drift Detection - advanced production monitoring
- Deep Learning Advanced: Quantization and Compression - model optimization
- AI Engineering: RAG and Vector Search - AI system integration







