エッジ上のコンピューター ビジョン: モバイルおよび組み込みデバイスの最適化
コンピューター ビジョン モデルをエッジ デバイスに展開 - Raspberry Pi、NVIDIA Jetson、スマートフォン、 ARM マイクロコントローラー - クラウド展開やクラウド展開とはまったく異なるエンジニアリング上の課題 GPUサーバー。リソースは限られています: 消費電力は数ワット、RAM は数十ではなくギガバイト、 専用の GPU やエントリーレベルの GPU はありません。しかし、何百万ものアプリケーションが推論を必要としています ローカル: オフライン監視、ロボット工学、ポータブル医療機器、産業オートメーション 接続できない環境では。
この記事では、エッジ導入のための最適化テクニック、つまり量子化、 枝刈り、知識の抽出、最適化されたフォーマット (ONNX、TFLite、NCNN)、および実際のベンチマーク Raspberry Pi 5 と NVIDIA Jetson Orin で。
何を学ぶか
- エッジハードウェアの概要: Raspberry Pi、Jetson Nano/Orin、Coral TPU、Hailo
- 量子化: INT8、FP16 - 理論と実際の実装
- パラメータを削減するための構造化および非構造化枝刈り
- 知識の蒸留: 大規模なモデルから小規模なモデルをトレーニングする
- TFLite と NCNN: ARM デバイスへの展開
- TensorRT: NVIDIA GPU (Jetson) の最大速度
- CPU と NPU を最適化した ONNX ランタイム
- Raspberry Pi 5 の YOLO26: ベンチマークと完全な構成
- Jetson Orin Nano のリアルタイム ビデオ パイプライン
1. コンピュータビジョン用のエッジハードウェア
2026 年のエッジ ハードウェア比較
| デバイス | CPU | GPU/NPU | ラム | TDP | YOLOv8n FPS |
|---|---|---|---|---|---|
| ラズベリーパイ5 | ARM Cortex-A76 4コア | ビデオコア VII | 8GB | 15W | ~5 FPS |
| ジェットソンナノ(2GB) | ARM A57 4コア | 128 CUDA コア | 2GB | 10W | ~20FPS |
| ジェットソン オリン ナノ | ARM Cortex-A78AE 6コア | 1024 CUDA + DLA | 8GB | 25W | ~80FPS |
| Jetson AGX オリン | ARM Cortex-A78AE 12コア | 2048 CUDA + DLA | 64GB | 60W | ~200FPS |
| Google コーラル TPU | ARM Cortex-A53 4コア | 4 TOPS エッジ TPU | 1GB | 4W | ~30 FPS (TFLite) |
| ハイロ-8 | - (PCIe アクセラレータ) | 26 TOPS ニューラル エンジン | - | 5W | ~120FPS |
2. 量子化: FP32 ~ INT8
La 量子化 重みとアクティベーションの数値精度が低下します。 テンプレートの値: float32 (32 ビット) から float16 (16 ビット) または int8 (8 ビット) まで。実際の効果: INT8 を搭載した 4 倍小型のモデル、2 ~ 4 倍高速な推論、より低い消費電力。 最新の技術による精度の損失は通常 1% 未満です。
2.1 ポストトレーニング量子化 (PTQ)
import torch
import torch.quantization as quant
from torch.ao.quantization import get_default_qconfig, prepare, convert
from torchvision import models
import copy
def quantize_model_ptq(
model: torch.nn.Module,
calibration_loader,
backend: str = 'x86' # 'x86' per CPU Intel, 'qnnpack' per ARM
) -> torch.nn.Module:
"""
Post-Training Quantization (PTQ): quantizza il modello senza retraining.
Richiede solo un piccolo calibration dataset (~100-1000 immagini).
Flusso:
1. Fuse operazioni (Conv+BN+ReLU -> singola op)
2. Insert observer per calibrazione
3. Esegui calibrazione (forward pass sul dataset di calibrazione)
4. Converti in modello quantizzato
"""
torch.backends.quantized.engine = backend
model_to_quantize = copy.deepcopy(model)
model_to_quantize.eval()
# Step 1: Fuse layer comuni per efficienza
# Esempio per ResNet: (Conv, BN, ReLU) -> singola operazione fused
model_to_quantize = torch.quantization.fuse_modules(
model_to_quantize,
[['conv1', 'bn1', 'relu']], # adatta ai nomi del tuo modello
inplace=True
)
# Step 2: Set qconfig e prepara per calibrazione
qconfig = get_default_qconfig(backend)
model_to_quantize.qconfig = qconfig
prepared_model = prepare(model_to_quantize, inplace=False)
# Step 3: Calibrazione con dati reali
print("Calibrazione quantizzazione...")
prepared_model.eval()
with torch.no_grad():
for i, (images, _) in enumerate(calibration_loader):
prepared_model(images)
if i >= 99: # 100 batch di calibrazione sufficienti
break
if i % 10 == 0:
print(f" Batch {i+1}/100")
# Step 4: Conversione al modello quantizzato
quantized_model = convert(prepared_model, inplace=False)
# Verifica dimensioni
def model_size_mb(m: torch.nn.Module) -> float:
param_size = sum(p.nelement() * p.element_size() for p in m.parameters())
buffer_size = sum(b.nelement() * b.element_size() for b in m.buffers())
return (param_size + buffer_size) / (1024 ** 2)
original_size = model_size_mb(model)
quantized_size = model_size_mb(quantized_model)
print(f"Dimensione originale: {original_size:.1f} MB")
print(f"Dimensione quantizzata: {quantized_size:.1f} MB")
print(f"Riduzione: {original_size / quantized_size:.1f}x")
return quantized_model
def compare_inference_speed(original_model, quantized_model,
input_tensor: torch.Tensor, n_runs: int = 100) -> dict:
"""Confronta velocità tra modello originale e quantizzato."""
import time
results = {}
for name, model in [('FP32', original_model), ('INT8', quantized_model)]:
model.eval()
# Warmup
with torch.no_grad():
for _ in range(10):
model(input_tensor)
# Benchmark
start = time.perf_counter()
with torch.no_grad():
for _ in range(n_runs):
model(input_tensor)
elapsed = time.perf_counter() - start
avg_ms = (elapsed / n_runs) * 1000
results[name] = avg_ms
print(f"{name}: {avg_ms:.2f}ms / inference")
speedup = results['FP32'] / results['INT8']
print(f"Speedup INT8: {speedup:.2f}x")
return results
2.2 YOLO (Ultralytics) による定量
from ultralytics import YOLO
model = YOLO('yolo26n.pt') # nano per edge
# ---- TFLite INT8 per Raspberry Pi / Coral TPU ----
model.export(
format='tflite',
imgsz=320, # risoluzione ridotta per edge
int8=True, # quantizzazione INT8
data='coco.yaml' # dataset per calibrazione PTQ
)
# Output: yolo26n_int8.tflite
# ---- NCNN per CPU ARM (Raspberry Pi, Android) ----
model.export(
format='ncnn',
imgsz=320,
half=False # NCNN usa FP32 o INT8 nativo
)
# Output: yolo26n_ncnn_model/
# ---- TensorRT FP16 per Jetson ----
model.export(
format='engine',
imgsz=640,
half=True, # FP16
workspace=2, # GB workspace (ridotto per Jetson Nano)
device=0
)
# Output: yolo26n.engine
# ---- ONNX + ONNX Runtime per CPU/NPU ----
model.export(
format='onnx',
imgsz=320,
opset=17,
simplify=True,
dynamic=False # batch size fisso per deployment edge
)
print("Export completati per tutti i target edge")
3. Raspberry Pi 5 の YOLO
Il ラズベリーパイ5 8 GB の RAM と ARM Cortex-A76 プロセッサを搭載 エッジ AI の最もアクセスしやすいエントリー ポイントです。適切な最適化 (NCNN、解像度) 低減、トラッキングによる推論頻度低減)検出システムを実現可能 リアルタイムで機能します。
# ============================================
# SETUP RASPBERRY PI 5 per Computer Vision
# ============================================
# 1. Installazione dipendenze base
# sudo apt update && sudo apt install -y python3-pip libopencv-dev
# pip install ultralytics ncnn onnxruntime
# 2. Ottimizzazioni sistema per AI
# In /boot/firmware/config.txt:
# gpu_mem=256 # Aumenta memoria GPU (VideoCore VII)
# over_voltage=6 # Overclock lieve
# arm_freq=2800 # Frequenza CPU max (stock 2.4GHz)
# ============================================
# INFERENCE con NCNN su Raspberry Pi
# ============================================
import ncnn
import cv2
import numpy as np
import time
class YOLOncnn:
"""
YOLO inference con NCNN - ottimizzato per CPU ARM.
NCNN e sviluppato da Tencent ed e il runtime più veloce per ARM CPU.
"""
def __init__(self, param_path: str, bin_path: str,
num_threads: int = 4, input_size: int = 320):
self.net = ncnn.Net()
self.net.opt.num_threads = num_threads # usa tutti i core
self.net.opt.use_vulkan_compute = False # no GPU su RPi
self.net.load_param(param_path)
self.net.load_model(bin_path)
self.input_size = input_size
def predict(self, img_bgr: np.ndarray, conf_thresh: float = 0.4) -> list[dict]:
"""Inference NCNN su CPU ARM."""
h, w = img_bgr.shape[:2]
# Resize + normalizzazione per NCNN
img_resized = cv2.resize(img_bgr, (self.input_size, self.input_size))
img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)
mat_in = ncnn.Mat.from_pixels(
img_rgb, ncnn.Mat.PixelType.PIXEL_RGB, self.input_size, self.input_size
)
mean_vals = [0.485 * 255, 0.456 * 255, 0.406 * 255]
norm_vals = [1/0.229/255, 1/0.224/255, 1/0.225/255]
mat_in.substract_mean_normalize(mean_vals, norm_vals)
ex = self.net.create_extractor()
ex.input("images", mat_in)
_, mat_out = ex.extract("output0")
return self._parse_output(mat_out, conf_thresh, w, h)
def _parse_output(self, mat_out, conf_thresh, orig_w, orig_h) -> list[dict]:
"""Parsing dell'output NCNN in formato detection."""
detections = []
for i in range(mat_out.h):
row = np.array(mat_out.row(i))
confidence = row[4]
if confidence < conf_thresh:
continue
class_scores = row[5:]
class_id = int(np.argmax(class_scores))
class_conf = confidence * class_scores[class_id]
if class_conf >= conf_thresh:
# Coordinate normalizzate -> pixel
cx, cy, bw, bh = row[:4]
x1 = int((cx - bw/2) * orig_w / self.input_size)
y1 = int((cy - bh/2) * orig_h / self.input_size)
x2 = int((cx + bw/2) * orig_w / self.input_size)
y2 = int((cy + bh/2) * orig_h / self.input_size)
detections.append({
'class_id': class_id,
'confidence': float(class_conf),
'bbox': (x1, y1, x2, y2)
})
return detections
def run_rpi_detection_loop(model_param: str, model_bin: str,
camera_id: int = 0) -> None:
"""Loop di detection real-time ottimizzato per Raspberry Pi."""
detector = YOLOncnn(model_param, model_bin, num_threads=4, input_size=320)
cap = cv2.VideoCapture(camera_id)
# Ottimizza acquisizione per RPi
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
frame_skip = 2 # Processa 1 frame su 3 per risparmiare CPU
frame_count = 0
cached_dets = []
fps_history = []
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
if frame_count % frame_skip == 0:
cached_dets = detector.predict(frame, conf_thresh=0.4)
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed if elapsed > 0 else 0
fps_history.append(fps)
# Visualizzazione
for det in cached_dets:
x1, y1, x2, y2 = det['bbox']
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, f"{det['confidence']:.2f}",
(x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 2)
avg_fps = sum(fps_history[-30:]) / min(len(fps_history), 30)
cv2.putText(frame, f"FPS: {avg_fps:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('RPi Detection', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
print(f"FPS medio: {sum(fps_history)/len(fps_history):.1f}")
4. NVIDIA Jetson Orin: TensorRT と DLA
Il ジェットソン オリン ナノ (25W) は 1024 の CUDA コアと DLA (ディープラーニング) を提供します アクセラレータ)専用。 TensorRT FP16 と YOLO26n モデルを使用すると、これらは簡単に克服できます 640x640 ビデオで 100 FPS。
from ultralytics import YOLO
import cv2
import time
def setup_jetson_pipeline(model_path: str = 'yolo26n.pt') -> YOLO:
"""
Setup ottimale per Jetson Orin:
1. Esporta in TensorRT FP16
2. Configura jetson_clocks per prestazioni massime
3. Imposta modalità performance per la GPU
"""
import subprocess
# Massimizza performance Jetson (esegui una sola volta)
# subprocess.run(['sudo', 'jetson_clocks'], check=True)
# subprocess.run(['sudo', 'nvpmodel', '-m', '0'], check=True) # MAXN mode
model = YOLO(model_path)
print("Esportazione TensorRT FP16...")
model.export(
format='engine',
imgsz=640,
half=True, # FP16 - quasi la stessa accuratezza di FP32 ma 2x più veloce
workspace=2, # GB workspace GPU (Jetson Orin Nano ha 8GB shared)
device=0,
batch=1,
simplify=True
)
# Carica il modello TensorRT
trt_model = YOLO('yolo26n.engine')
print("Modello TensorRT pronto")
return trt_model
def run_jetson_pipeline(model: YOLO, source=0) -> None:
"""Pipeline real-time ottimizzata per Jetson con statistiche."""
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
fps_list = []
frame_count = 0
try:
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
results = model.predict(
frame, conf=0.35, iou=0.45,
verbose=False, half=True # FP16 inference
)
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed
fps_list.append(fps)
# Annotazione con informazioni performance
annotated = results[0].plot()
avg_fps = sum(fps_list[-30:]) / min(len(fps_list), 30)
info_text = [
f"FPS: {fps:.0f} (avg: {avg_fps:.0f})",
f"Detections: {len(results[0].boxes)}",
f"Inference: {elapsed*1000:.1f}ms"
]
for i, text in enumerate(info_text):
cv2.putText(annotated, text, (10, 30 + i * 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
cv2.imshow('Jetson Pipeline', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_count += 1
finally:
cap.release()
cv2.destroyAllWindows()
if fps_list:
print(f"\n=== Stats Jetson ===")
print(f"Frame: {frame_count}")
print(f"FPS medio: {sum(fps_list)/len(fps_list):.1f}")
print(f"FPS massimo: {max(fps_list):.1f}")
print(f"Latenza minima: {1000/max(fps_list):.1f}ms")
5. 剪定と知識の蒸留
5.1 構造化された枝刈り
import torch
import torch.nn as nn
import torch.nn.utils.prune as prune
def apply_structured_pruning(model: nn.Module,
amount: float = 0.3,
n: int = 2) -> nn.Module:
"""
Structured L2-norm pruning: rimuove interi filtri/neuroni.
Produce modelli più veloci in inferenza (a differenza del pruning non strutturato
che produce solo modelli più piccoli ma non necessariamente più veloci).
amount: percentuale di filtri da rimuovere (0.3 = 30%)
n: norma L_n usata per il ranking dei filtri
"""
for name, module in model.named_modules():
if isinstance(module, nn.Conv2d):
# Prune i filtri convoluzionali meno importanti
prune.ln_structured(
module,
name='weight',
amount=amount,
n=n,
dim=0 # dim=0 = prune filtri in output
)
elif isinstance(module, nn.Linear):
prune.ln_structured(
module,
name='weight',
amount=amount,
n=n,
dim=0
)
return model
def remove_pruning_masks(model: nn.Module) -> nn.Module:
"""
Rende permanente il pruning: rimuove le maschere e i parametri "orig",
lasciando solo i pesi pruned. Necessario prima dell'export.
"""
for name, module in model.named_modules():
if isinstance(module, (nn.Conv2d, nn.Linear)):
try:
prune.remove(module, 'weight')
except ValueError:
pass
return model
def prune_and_finetune(model: nn.Module, train_loader, val_loader,
prune_amount: float = 0.2, finetune_epochs: int = 5) -> nn.Module:
"""
Pipeline completa:
1. Prune il modello (rimuove il prune_amount% dei filtri)
2. Fine-tunes per recuperare l'accuratezza persa
3. Rimuove le maschere e finalizza
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
print(f"Applying {prune_amount*100:.0f}% structured pruning...")
model = apply_structured_pruning(model, amount=prune_amount)
# Fine-tuning rapido per recupero accuratezza
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
for epoch in range(finetune_epochs):
model.train()
total_loss = 0.0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
loss = criterion(model(images), labels)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
total_loss += loss.item()
model.eval()
correct = total = 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
preds = model(images).argmax(1)
correct += preds.eq(labels).sum().item()
total += labels.size(0)
print(f" FT Epoch {epoch+1}/{finetune_epochs} | "
f"Loss: {total_loss/len(train_loader):.4f} | "
f"Acc: {100.*correct/total:.2f}%")
# Finalizza pruning
model = remove_pruning_masks(model)
print("Pruning completato e finalizzato")
return model
6. エッジモデルの知識の蒸留
Il 知識の蒸留 (KD, Hinton et al., 2015) 「知識」を伝達する 大きなモデル(教師)を小さなモデル(生徒)に変換します。学生はただ学ぶだけではありません データセットのハードラベルですが、 ソフトな予測 教師の: の分布 データ空間の構造に関する情報を含む確率 (例: 「猫」は「車」というより「虎」に似ています)。
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationLoss(nn.Module):
"""
Loss combinata per Knowledge Distillation.
L_total = alpha * L_hard + (1 - alpha) * L_soft
L_hard = CrossEntropyLoss(student_logits, true_labels)
L_soft = KLDivLoss(softmax(student/T), softmax(teacher/T)) * T^2
T (temperature): valori alti -> distribuzioni più soft -> più informazione strutturale
alpha: peso relativo tra label reali e distillazione dal teacher
"""
def __init__(self, temperature: float = 4.0, alpha: float = 0.7):
super().__init__()
self.T = temperature
self.alpha = alpha
self.hard_loss = nn.CrossEntropyLoss()
self.soft_loss = nn.KLDivLoss(reduction='batchmean')
def forward(self,
student_logits: torch.Tensor,
teacher_logits: torch.Tensor,
labels: torch.Tensor) -> torch.Tensor:
# Loss su label reali (hard labels)
hard = self.hard_loss(student_logits, labels)
# Loss su soft predictions del teacher (KL divergence)
student_soft = F.log_softmax(student_logits / self.T, dim=1)
teacher_soft = F.softmax(teacher_logits / self.T, dim=1)
soft = self.soft_loss(student_soft, teacher_soft) * (self.T ** 2)
return self.alpha * hard + (1 - self.alpha) * soft
def train_with_distillation(
teacher: nn.Module, # modello grande, già addestrato
student: nn.Module, # modello piccolo da addestrare
train_loader,
val_loader,
n_epochs: int = 50,
temperature: float = 4.0,
alpha: float = 0.7,
lr: float = 1e-3
) -> nn.Module:
"""
Training del modello student con KD.
Il teacher rimane frozen durante tutto il training.
Tipico risultato:
- MobileNetV3 senza KD su ImageNet: ~67% Top-1
- MobileNetV3 con KD da ResNet-50: ~72% Top-1
- ResNet-50 (teacher): ~76% Top-1
- Delta: +5% con 5x meno parametri!
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
teacher.eval() # Teacher sempre in eval mode
student.to(device)
teacher.to(device)
criterion = DistillationLoss(temperature=temperature, alpha=alpha)
optimizer = torch.optim.AdamW(student.parameters(), lr=lr, weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=n_epochs)
best_val_acc = 0.0
best_state = None
for epoch in range(n_epochs):
student.train()
total_loss = 0.0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
# Forward pass
student_logits = student(images)
with torch.no_grad(): # Teacher: nessun gradiente
teacher_logits = teacher(images)
# Loss combinata
loss = criterion(student_logits, teacher_logits, labels)
optimizer.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(student.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
scheduler.step()
# Validation
student.eval()
correct = total = 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
preds = student(images).argmax(1)
correct += preds.eq(labels).sum().item()
total += labels.size(0)
val_acc = 100.0 * correct / total
if val_acc > best_val_acc:
best_val_acc = val_acc
best_state = {k: v.cpu().clone() for k, v in student.state_dict().items()}
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{n_epochs} | "
f"Loss: {total_loss/len(train_loader):.4f} | "
f"Val Acc: {val_acc:.2f}% | "
f"Best: {best_val_acc:.2f}%")
student.load_state_dict(best_state)
print(f"\nBest validation accuracy: {best_val_acc:.2f}%")
return student
エッジの圧縮戦略の比較
| 技術 | パラメータの削減 | 高速化 | 準拠損失 | 再トレーニングが必要 |
|---|---|---|---|---|
| 量子化INT8 | 4x | 2~4倍 | <1% | いいえ (PTQ) / はい (QAT) |
| 構造化された剪定 30% | 1.4倍 | 1.3~1.6倍 | 1-3% | はい (微調整) |
| 知識の蒸留 | 5~10倍(モデル交換) | 5~10倍 | 3~8% | はい (完全なトレーニング) |
| FP16 (TensorRT) | 2x | 1.5~2倍 | <0.5% | No |
| Q + 剪定 + KD | 10~20倍 | 8~15倍 | 2~5% | Si |
7. ONNX ランタイム: ハードウェア間の移植性
ONNX (オープン ニューラル ネットワーク エクスチェンジ) 持ち運びに便利な標準サイズ 深層学習モデルの。 ONNX にエクスポートすると、同じモデルを CPU、NVIDIA GPU、ARM NPU、Intel OpenVINO、Apple Neural Engine 上の ONNX ランタイムで実行 推論コードを変更する必要はありません。
import torch
import onnx
import onnxruntime as ort
import numpy as np
import time
def export_to_onnx(model: torch.nn.Module,
input_shape: tuple = (1, 3, 640, 640),
output_path: str = 'model.onnx',
opset: int = 17) -> str:
"""
Esporta modello PyTorch in formato ONNX ottimizzato.
opset=17: versione del opset ONNX (più alta = più operatori supportati)
dynamic_axes: permette batch size variabile (utile per server, non per edge)
"""
model.eval()
dummy_input = torch.zeros(input_shape)
# Export con ottimizzazioni
torch.onnx.export(
model,
dummy_input,
output_path,
opset_version=opset,
input_names=['images'],
output_names=['output'],
dynamic_axes={
'images': {0: 'batch'},
'output': {0: 'batch'}
},
do_constant_folding=True, # ottimizza operazioni costanti
verbose=False
)
# Verifica il modello esportato
onnx_model = onnx.load(output_path)
onnx.checker.check_model(onnx_model)
print(f"Modello ONNX valido: {output_path}")
return output_path
class ONNXRuntimeInference:
"""
Inference ottimizzata con ONNX Runtime.
Supporta CPU, GPU CUDA, ARM (QNN), Intel OpenVINO come backend.
"""
def __init__(self, model_path: str, device: str = 'cpu'):
providers = self._get_providers(device)
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = (
ort.GraphOptimizationLevel.ORT_ENABLE_ALL
)
# Numero di thread per CPU inference
sess_options.intra_op_num_threads = 4
sess_options.inter_op_num_threads = 2
self.session = ort.InferenceSession(
model_path, sess_options, providers=providers
)
# Cache nomi input/output
self.input_name = self.session.get_inputs()[0].name
self.output_name = self.session.get_outputs()[0].name
print(f"ONNX Runtime caricato su: {providers[0]}")
def _get_providers(self, device: str) -> list:
if device == 'cuda':
return ['CUDAExecutionProvider', 'CPUExecutionProvider']
elif device == 'openvino':
return ['OpenVINOExecutionProvider', 'CPUExecutionProvider']
else:
return ['CPUExecutionProvider']
def predict(self, image: np.ndarray) -> np.ndarray:
"""Inference su immagine numpy preprocessata."""
# Assicura formato float32 [B, C, H, W]
if image.ndim == 3:
image = image[np.newaxis, ...]
image = image.astype(np.float32)
return self.session.run(
[self.output_name], {self.input_name: image}
)[0]
def benchmark(self, input_shape: tuple = (1, 3, 640, 640),
n_runs: int = 100) -> dict:
"""Misura latenza e throughput."""
dummy = np.random.rand(*input_shape).astype(np.float32)
# Warmup
for _ in range(10):
self.predict(dummy)
# Benchmark
start = time.perf_counter()
for _ in range(n_runs):
self.predict(dummy)
elapsed = time.perf_counter() - start
avg_ms = (elapsed / n_runs) * 1000
fps = 1000.0 / avg_ms
print(f"ONNX Runtime: {avg_ms:.2f}ms ({fps:.1f} FPS)")
return {'avg_ms': avg_ms, 'fps': fps}
8. エッジ展開のベストプラクティス
実稼働対応のエッジ展開のチェックリスト
- 要件を満たす最小のモデルを選択してください。 RPi の場合は YOLOv8n または YOLO26n、Jetson Orin の場合は YOLOv8m。エッジでは Large または XLarge モデルを使用しないでください。常にターゲット ハードウェアで測定してください。
- 入力解像度を下げる: 640x640 の代わりに 320x320 を使用すると、精度は中程度低下しますが、推論時間が 75% 短縮されます。大きなアイテムの場合は 320 で十分です。
- インテリジェントなフレームスキップ: オブジェクトの動きが遅い場合は、3 ~ 5 フレームに 1 フレームを処理します。トラッカー (CSRT、ByteTrack) を使用して、スキップされたフレーム内の位置を補間します。
- 獲得パイプラインを最適化します。 待ち時間を最小限に抑えるには、CAP_PROP_BUFFERSIZE=1 に設定します。 Linux 上で V4L2 を直接使用すると、OpenCV よりもオーバーヘッドが少なくなります。
- Jetson 上の TensorRT: いつも。 PyTorch と TensorRT FP16 の違いは 5 ~ 8 倍です。 Jetson での推論生成に PyTorch を使用する理由はありません。
- サーマルスロットリング: RPi と Jetson では、過熱によりスロットルが発生します。ヒートシンクを追加し、温度を制御します
vcgencmd measure_temp(RPi) またはtegrastats(ジェットソン)。 - 速度だけでなくエネルギーも測定します。 FPS/ワットは、バッテリー デバイスにとって重要な指標です。速度は 2 倍ですが、エネルギー効率は 4 倍高く、多くの場合推奨されるモデルです。
- ウォッチドッグとグレースフル リスタート: 本番エッジデバイスでは、クラッシュまたはフリーズが発生した場合に推論プロセスを再起動するウォッチドッグを常に実装してください。
- エッジフレンドリーなロギング: RPi では、リモート データベースの代わりに SQLite を使用してイベントをローカルに保存します。接続が利用可能な場合は、バッチでクラウドに同期します。
import subprocess
import threading
import time
import logging
class ThermalMonitor:
"""
Monitor termico per Raspberry Pi/Jetson.
Riduce automaticamente il carico di lavoro se la temperatura e troppo alta.
"""
TEMP_WARNING = 75.0 # Celsius: riduce frame rate
TEMP_CRITICAL = 85.0 # Celsius: ferma il processing
def __init__(self, platform: str = 'rpi',
check_interval: float = 5.0):
self.platform = platform
self.check_interval = check_interval
self.current_temp = 0.0
self.throttle_factor = 1.0 # 1.0 = nessun throttling
self._stop = threading.Event()
def get_temperature(self) -> float:
"""Legge la temperatura del SoC."""
try:
if self.platform == 'rpi':
result = subprocess.run(
['vcgencmd', 'measure_temp'],
capture_output=True, text=True
)
# Output: "temp=62.1'C"
temp_str = result.stdout.strip()
return float(temp_str.split('=')[1].replace("'C", ''))
elif self.platform == 'jetson':
# Legge da sysfs
with open('/sys/class/thermal/thermal_zone0/temp') as f:
return float(f.read().strip()) / 1000.0
except Exception as e:
logging.warning(f"Impossibile leggere temperatura: {e}")
return 0.0
def get_throttle_factor(self) -> float:
"""Restituisce il fattore di throttling (0.0-1.0)."""
temp = self.current_temp
if temp < self.TEMP_WARNING:
return 1.0
elif temp < self.TEMP_CRITICAL:
# Throttling lineare tra 75 e 85 gradi
factor = 1.0 - (temp - self.TEMP_WARNING) / (
self.TEMP_CRITICAL - self.TEMP_WARNING
)
return max(0.2, factor) # mai sotto il 20%
else:
return 0.0 # ferma il processing
def monitor_loop(self) -> None:
"""Thread di monitoraggio termico."""
while not self._stop.is_set():
self.current_temp = self.get_temperature()
self.throttle_factor = self.get_throttle_factor()
if self.current_temp >= self.TEMP_CRITICAL:
logging.critical(f"TEMP CRITICA: {self.current_temp:.1f}C - "
f"Processing fermato!")
elif self.current_temp >= self.TEMP_WARNING:
logging.warning(f"TEMP ALTA: {self.current_temp:.1f}C - "
f"Throttle: {self.throttle_factor:.2f}")
time.sleep(self.check_interval)
def start(self) -> None:
t = threading.Thread(target=self.monitor_loop, daemon=True)
t.start()
def stop(self) -> None:
self._stop.set()
結論
コンピューター ビジョン モデルをエッジ デバイスに展開するには、総合的なアプローチが必要です これは、ハードウェアの選択、モデルの最適化、パイプライン エンジニアリングを組み合わせたものです。それは存在しません 独自のソリューション: 最適な組み合わせは主要な制約 (待ち時間、エネルギー、 精度、コスト)。この記事では、完全なツールキットを構築しました。
- エッジ ハードウェア: 予算シナリオ向けの Raspberry Pi 5、リアルタイム パフォーマンス向けの Jetson Orin、超低電力向けの Coral TPU および Hailo-8
- INT8 量子化: PTQ によるサイズの 4 倍の縮小、2 ~ 4 倍のスピードアップ、<1% の精度損失
- ARM CPU 用の NCNN、NVIDIA GPU 用の TensorRT、超低電力用の TFLite + Coral TPU
- 構造化プルーニング + 微調整: 精度の損失を最小限に抑えながらフィルターの 20 ~ 30% を削除します。
- 知識の蒸留: 大規模モデルから組み込みモデルに知識を転送します。
- ONNX ランタイム: 異なるハードウェア プラットフォーム間でのモデルの移植性
- 温度監視とウォッチドッグ: 24 時間年中無休のエッジ生産のための堅牢なシステム
- フレームスキップ + トラッキング: 動きの少ないシーンでの計算量を 70 ~ 80% 削減します。
シリーズナビゲーション
シリーズ間のリソース
- MLOps: 本番環境で提供されるモデル - Kubernetes と Triton を使用したクラウド展開
- 高度な深層学習: 量子化と圧縮







