OpenCV と PyTorch: 完全なコンピューター ビジョン パイプライン
OpenCV e パイトーチ それらはコンピュータ エコシステムの 2 本の柱です 現代のビジョン。 OpenCV は、画像の取得、前処理、後処理に優れています。 従来の操作、Web カメラとビデオの管理、形態学的変換、古典的なフィルター。 PyTorch は、ニューラル ネットワーク、GPU コンピューティング、エンドツーエンドのトレーニングなどの深層学習をもたらします。一緒に、彼らは形成します 生のピクセルの取得からインテリジェントな予測に至るまでの完全な CV パイプライン。
この記事では、ビデオ キャプチャから完全なコンピューター ビジョン パイプラインを構築します。 OpenCVによる前処理、PyTorch/YOLOモデルによる推論、可視化 そして結果のログ記録。直接導入できる実稼働対応システム。
何を学ぶか
- 基本的な OpenCV: 画像/ビデオの読み取り、色空間、形態学的操作
- OpenCV-PyTorch 統合: テンソル変換、最適化されたパイプライン
- バッファ管理によるリアルタイムビデオ取得
- 前処理パイプライン: サイズ変更、正規化、バッチ準備
- 最適化された推論: バッチ処理、非同期推論
- 後処理: NMS、座標変換、フレームアノテーション
- マルチカメラパイプラインとRTSPストリーム処理
- 検出されたイベントのログおよびアラート システム
- パフォーマンスの最適化: スレッディング、GPU ストリーム、プロファイリング
1. CV パイプラインの OpenCV の基礎
1.1 色空間と変換
見落とされがちな重要な点: OpenCV は形式を使用します BGR デフォルトでは、 PyTorch (および PIL) が使用している間 RGB。この 2 つを混同すると、悲惨な結果が生じます。 RGB 画像で事前トレーニングされたモデルは、反転されたチャネルを受け取ります。常に変換してください!
import cv2
import numpy as np
import torch
# ---- Lettura e conversioni ----
def load_image_rgb(path: str) -> np.ndarray:
"""Carica immagine in formato RGB (corretto per PyTorch)."""
img_bgr = cv2.imread(path)
if img_bgr is None:
raise FileNotFoundError(f"Immagine non trovata: {path}")
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
def bgr_to_torch(img_bgr: np.ndarray) -> torch.Tensor:
"""
Converte immagine OpenCV BGR in tensor PyTorch normalizzato.
BGR [H, W, C] uint8 -> RGB [C, H, W] float32 normalizzato [0,1]
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(img_rgb).permute(2, 0, 1).float() / 255.0
return tensor
def torch_to_bgr(tensor: torch.Tensor) -> np.ndarray:
"""
Converte tensor PyTorch in immagine OpenCV BGR.
RGB [C, H, W] float32 -> BGR [H, W, C] uint8
"""
img_rgb = (tensor.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
return cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
# ---- Spazi colore utili ----
def analyze_color_spaces(img_bgr: np.ndarray) -> dict:
"""Analizza l'immagine in diversi spazi colore."""
return {
'bgr': img_bgr,
'rgb': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB),
'gray': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY),
'hsv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV),
'lab': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB),
'yuv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2YUV),
}
# ---- Operazioni morfologiche ----
def apply_morphology(gray: np.ndarray, operation: str = 'opening',
kernel_size: int = 5) -> np.ndarray:
"""
Operazioni morfologiche per pre/post-processing.
opening = erosione + dilatazione (rimuove rumore piccolo)
closing = dilatazione + erosione (chiude piccoli buchi)
"""
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
ops = {
'erode': cv2.erode(gray, kernel),
'dilate': cv2.dilate(gray, kernel),
'opening': cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel),
'closing': cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel),
'gradient': cv2.morphologyEx(gray, cv2.MORPH_GRADIENT, kernel),
'tophat': cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel)
}
return ops.get(operation, gray)
# ---- Rilevamento contorni e feature classiche ----
def detect_edges_and_contours(img_bgr: np.ndarray) -> tuple:
"""Pipeline classica: blur -> Canny -> contorni."""
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blurred, threshold1=50, threshold2=150)
contours, hierarchy = cv2.findContours(
edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
# Filtra contorni per area minima
significant_contours = [c for c in contours if cv2.contourArea(c) > 500]
return edges, significant_contours
# ---- Preprocessing per modelli DL ----
def preprocess_for_model(img_bgr: np.ndarray,
target_size: tuple[int,int] = (640, 640),
mean: list = [0.485, 0.456, 0.406],
std: list = [0.229, 0.224, 0.225]) -> torch.Tensor:
"""
Pipeline completa: BGR immagine -> tensor normalizzato pronto per inference.
"""
# Letterbox resize (mantiene aspect ratio)
h, w = img_bgr.shape[:2]
target_h, target_w = target_size
scale = min(target_h / h, target_w / w)
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(img_bgr, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
# Padding per raggiungere target_size
pad_h = target_h - new_h
pad_w = target_w - new_w
padded = cv2.copyMakeBorder(
resized,
pad_h // 2, pad_h - pad_h // 2,
pad_w // 2, pad_w - pad_w // 2,
cv2.BORDER_CONSTANT, value=(114, 114, 114)
)
# BGR -> RGB -> float -> normalize -> tensor
rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1) # HWC -> CHW
mean_t = torch.tensor(mean).view(3, 1, 1)
std_t = torch.tensor(std).view(3, 1, 1)
tensor = (tensor - mean_t) / std_t
return tensor.unsqueeze(0), scale, (pad_w // 2, pad_h // 2) # batch dim + metadata
2. ビデオとウェブカメラの取得
2.1 ビデオキャプチャとフレームバッファ
import cv2
import threading
import queue
import time
from dataclasses import dataclass
@dataclass
class Frame:
"""Wrapper per un frame con metadata."""
data: np.ndarray
timestamp: float
frame_id: int
class ThreadedVideoCapture:
"""
VideoCapture con lettura in thread separato.
Previene il drop di frame dovuto al processing lento:
la GPU che fa inference non blocca la lettura della camera.
"""
def __init__(self, source, max_buffer_size: int = 5):
self.cap = cv2.VideoCapture(source)
if not self.cap.isOpened():
raise RuntimeError(f"Impossibile aprire: {source}")
# Ottimizza per latenza bassa
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.buffer = queue.Queue(maxsize=max_buffer_size)
self.stopped = False
self.frame_id = 0
# Avvia thread di lettura
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
def _read_frames(self) -> None:
"""Thread worker: legge frame continuamente."""
while not self.stopped:
ret, frame = self.cap.read()
if not ret:
self.stopped = True
break
frame_obj = Frame(
data=frame,
timestamp=time.time(),
frame_id=self.frame_id
)
self.frame_id += 1
# Scarta frame vecchi se il buffer e pieno
if self.buffer.full():
try:
self.buffer.get_nowait()
except queue.Empty:
pass
self.buffer.put(frame_obj)
def read(self) -> Frame | None:
"""Leggi prossimo frame disponibile."""
try:
return self.buffer.get(timeout=1.0)
except queue.Empty:
return None
def get_fps(self) -> float:
return self.cap.get(cv2.CAP_PROP_FPS)
def get_resolution(self) -> tuple[int, int]:
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
return w, h
def release(self) -> None:
self.stopped = True
self.cap.release()
def __enter__(self):
return self
def __exit__(self, *args):
self.release()
3. 完全な CV パイプライン: OpenCV + YOLO
import cv2
import torch
import numpy as np
import time
import logging
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field
from ultralytics import YOLO
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class Detection:
"""Singola detection con tutti i metadata."""
class_name: str
class_id: int
confidence: float
bbox: tuple[int, int, int, int] # x1, y1, x2, y2
timestamp: float = field(default_factory=time.time)
frame_id: int = 0
def to_dict(self) -> dict:
return {
'class': self.class_name,
'confidence': round(self.confidence, 3),
'bbox': list(self.bbox),
'timestamp': self.timestamp,
'frame_id': self.frame_id
}
class CVPipeline:
"""
Pipeline completa Computer Vision:
Acquisizione -> Preprocessing -> Inference -> Post-processing -> Output
"""
def __init__(
self,
model_path: str,
source, # int (webcam), str (file/RTSP)
conf_threshold: float = 0.4,
iou_threshold: float = 0.45,
target_classes: list[str] | None = None, # None = tutte le classi
output_dir: str = 'output',
save_video: bool = False,
alert_classes: list[str] | None = None # classi che triggerano alert
):
self.model = YOLO(model_path)
self.conf = conf_threshold
self.iou = iou_threshold
self.target_classes = target_classes
self.alert_classes = alert_classes or []
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.source = source
self.save_video = save_video
# Stats
self.frame_count = 0
self.total_detections = 0
self.start_time = None
self.fps_history = []
def run(self) -> None:
"""Avvia la pipeline di processing."""
logger.info(f"Avvio pipeline: source={self.source} modello={self.model.model_name}")
cap = cv2.VideoCapture(self.source)
if not cap.isOpened():
raise RuntimeError(f"Impossibile aprire source: {self.source}")
# Setup video writer se richiesto
writer = None
if self.save_video:
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
out_path = str(self.output_dir / f"output_{timestamp}.mp4")
writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
logger.info(f"Salvataggio video: {out_path}")
self.start_time = time.time()
detection_log = []
try:
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
# Inference
detections = self._run_inference(frame, self.frame_count)
# Visualizzazione
annotated = self._annotate_frame(frame, detections)
# Stats overlay
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed if elapsed > 0 else 0.0
self.fps_history.append(fps)
annotated = self._add_stats_overlay(annotated, fps, len(detections))
# Salvataggio
if writer:
writer.write(annotated)
cv2.imshow('CV Pipeline', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Logging detections
for det in detections:
detection_log.append(det.to_dict())
if det.class_name in self.alert_classes:
self._trigger_alert(det)
self.frame_count += 1
self.total_detections += len(detections)
finally:
cap.release()
if writer:
writer.release()
cv2.destroyAllWindows()
# Salva log
log_path = self.output_dir / 'detection_log.json'
with open(log_path, 'w') as f:
json.dump(detection_log, f, indent=2)
self._print_stats()
def _run_inference(self, frame: np.ndarray, frame_id: int) -> list[Detection]:
"""Esegue YOLO inference su un singolo frame."""
results = self.model.predict(
frame, conf=self.conf, iou=self.iou, verbose=False
)
detections = []
for box in results[0].boxes:
class_name = self.model.names[int(box.cls[0])]
# Filtra per classi target se specificato
if self.target_classes and class_name not in self.target_classes:
continue
x1, y1, x2, y2 = [int(c) for c in box.xyxy[0]]
detections.append(Detection(
class_name=class_name,
class_id=int(box.cls[0]),
confidence=float(box.conf[0]),
bbox=(x1, y1, x2, y2),
frame_id=frame_id
))
return detections
def _annotate_frame(self, frame: np.ndarray, detections: list[Detection]) -> np.ndarray:
"""Annota il frame con bounding boxes e label."""
annotated = frame.copy()
np.random.seed(42)
colors = {name: tuple(np.random.randint(50, 255, 3).tolist())
for name in self.model.names.values()}
for det in detections:
x1, y1, x2, y2 = det.bbox
color = colors.get(det.class_name, (0, 255, 0))
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = f"{det.class_name} {det.confidence:.2f}"
label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(annotated,
(x1, y1 - label_size[1] - 10),
(x1 + label_size[0] + 5, y1), color, -1)
cv2.putText(annotated, label, (x1 + 2, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
return annotated
def _add_stats_overlay(self, frame: np.ndarray, fps: float,
n_detections: int) -> np.ndarray:
"""Aggiunge overlay con statistiche real-time."""
h = frame.shape[0]
stats = [
f"FPS: {fps:.1f}",
f"Detections: {n_detections}",
f"Frame: {self.frame_count}",
f"Avg FPS: {np.mean(self.fps_history[-30:]):.1f}" if self.fps_history else "Avg FPS: -"
]
for i, text in enumerate(stats):
cv2.putText(frame, text, (10, h - 100 + i * 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
return frame
def _trigger_alert(self, det: Detection) -> None:
"""Gestisce un evento di alert per classi critiche."""
logger.warning(f"ALERT: {det.class_name} rilevato con confidenza {det.confidence:.2f} "
f"(frame {det.frame_id})")
# In produzione: invia notifica (email, Slack, webhook)
def _print_stats(self) -> None:
"""Stampa statistiche finali della sessione."""
elapsed = time.time() - self.start_time
avg_fps = self.frame_count / elapsed if elapsed > 0 else 0
logger.info(f"\n=== Statistiche Pipeline ===")
logger.info(f"Frame processati: {self.frame_count}")
logger.info(f"Detections totali: {self.total_detections}")
logger.info(f"Tempo totale: {elapsed:.1f}s")
logger.info(f"FPS medio: {avg_fps:.1f}")
# Utilizzo
if __name__ == '__main__':
pipeline = CVPipeline(
model_path='yolo26m.pt',
source=0, # webcam (o 'video.mp4', 'rtsp://...')
conf_threshold=0.4,
target_classes=['person', 'car', 'truck'],
alert_classes=['person'],
output_dir='output',
save_video=True
)
pipeline.run()
4. 後処理のための OpenCV 操作
import cv2
import numpy as np
# ---- Background Subtraction (MOG2) ----
def setup_background_subtraction():
"""
MOG2: Mixture of Gaussians per rilevamento movimento.
Utile come primo filtro prima dell'inference DL.
"""
subtractor = cv2.createBackgroundSubtractorMOG2(
history=500,
varThreshold=50,
detectShadows=True
)
return subtractor
def detect_motion(frame_bgr: np.ndarray, subtractor,
min_contour_area: int = 1000) -> list[tuple]:
"""Rileva regioni di movimento nel frame."""
fg_mask = subtractor.apply(frame_bgr)
# Cleanup maschera
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(
fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
motion_regions = []
for c in contours:
if cv2.contourArea(c) >= min_contour_area:
x, y, w, h = cv2.boundingRect(c)
motion_regions.append((x, y, x+w, y+h))
return motion_regions
# ---- Optical Flow (Lucas-Kanade) ----
def compute_sparse_optical_flow(prev_gray: np.ndarray,
curr_gray: np.ndarray,
prev_points: np.ndarray) -> tuple:
"""
Lucas-Kanade optical flow per tracking di punti specifici.
Utile per tracking di keypoints rilevati nel frame precedente.
"""
lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
curr_points, status, error = cv2.calcOpticalFlowPyrLK(
prev_gray, curr_gray, prev_points, None, **lk_params
)
good_prev = prev_points[status.flatten() == 1]
good_curr = curr_points[status.flatten() == 1]
# Calcola velocità media del movimento
if len(good_prev) > 0:
flow_vectors = good_curr - good_prev
avg_speed = np.mean(np.linalg.norm(flow_vectors, axis=1))
else:
avg_speed = 0.0
return good_curr, good_prev, avg_speed
# ---- CSRT Tracker (più preciso di MOSSE) ----
class ObjectTracker:
"""
Tracker multi-oggetto per mantenere l'identità degli oggetti
tra frame consecutivi senza rieseguire l'inference a ogni frame.
"""
def __init__(self, tracker_type: str = 'CSRT'):
self.tracker_type = tracker_type
self.trackers = [] # lista di (tracker, class_name, id)
self.next_id = 0
def add_tracker(self, frame: np.ndarray, bbox: tuple, class_name: str) -> int:
"""Aggiunge un tracker per un nuovo oggetto."""
x1, y1, x2, y2 = bbox
cv2_bbox = (x1, y1, x2 - x1, y2 - y1) # formato OpenCV: x, y, w, h
tracker = cv2.TrackerCSRT_create()
tracker.init(frame, cv2_bbox)
obj_id = self.next_id
self.trackers.append((tracker, class_name, obj_id))
self.next_id += 1
return obj_id
def update(self, frame: np.ndarray) -> list[dict]:
"""Aggiorna tutti i tracker con il nuovo frame."""
active_objects = []
failed_trackers = []
for i, (tracker, class_name, obj_id) in enumerate(self.trackers):
success, cv2_bbox = tracker.update(frame)
if success:
x, y, w, h = [int(v) for v in cv2_bbox]
active_objects.append({
'id': obj_id,
'class_name': class_name,
'bbox': (x, y, x + w, y + h)
})
else:
failed_trackers.append(i)
# Rimuovi tracker falliti (indietro per evitare shift indici)
for i in reversed(failed_trackers):
self.trackers.pop(i)
return active_objects
def clear(self) -> None:
self.trackers = []
5. マルチカメラパイプラインとRTSPストリーム
import cv2
import threading
import queue
import torch
from ultralytics import YOLO
from dataclasses import dataclass
@dataclass
class CameraFrame:
camera_id: int
frame: np.ndarray
timestamp: float
class MultiCameraPipeline:
"""
Pipeline multi-camera con:
- Thread separato per ogni camera
- Coda condivisa per batch inference
- GPU condivisa tra tutte le camere
"""
def __init__(self, sources: list, model_path: str, batch_size: int = 4):
self.sources = sources
self.model = YOLO(model_path)
self.batch_size = batch_size
self.frame_queue = queue.Queue(maxsize=batch_size * 2)
self.stopped = False
def _camera_reader(self, camera_id: int, source) -> None:
"""Thread worker per singola camera."""
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if isinstance(source, str) and 'rtsp' in source:
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
while not self.stopped:
ret, frame = cap.read()
if not ret:
logger.warning(f"Camera {camera_id} disconnessa")
time.sleep(1.0)
cap = cv2.VideoCapture(source) # Reconnect
continue
frame_obj = CameraFrame(camera_id=camera_id, frame=frame,
timestamp=time.time())
try:
self.frame_queue.put(frame_obj, timeout=0.1)
except queue.Full:
pass # Drop frame se la coda e piena
cap.release()
def run(self) -> None:
"""Avvia tutti i reader thread e processa batch dalla coda."""
threads = []
for i, source in enumerate(self.sources):
t = threading.Thread(
target=self._camera_reader, args=(i, source), daemon=True
)
t.start()
threads.append(t)
logger.info(f"Pipeline avviata: {len(self.sources)} camere")
batch = []
while not self.stopped:
try:
frame_obj = self.frame_queue.get(timeout=0.5)
batch.append(frame_obj)
# Processa batch quando pieno o timeout
if len(batch) >= self.batch_size:
self._process_batch(batch)
batch = []
except queue.Empty:
if batch:
self._process_batch(batch)
batch = []
def _process_batch(self, batch: list[CameraFrame]) -> None:
"""Inference batch su GPU - più efficiente di inference singola."""
frames = [f.frame for f in batch]
# Batch inference con YOLO
results = self.model.predict(
frames, conf=0.4, iou=0.45, verbose=False
)
for frame_obj, result in zip(batch, results):
n_det = len(result.boxes)
if n_det > 0:
logger.info(f"Camera {frame_obj.camera_id}: {n_det} oggetti rilevati")
def stop(self) -> None:
self.stopped = True
# Configurazione multi-camera
pipeline = MultiCameraPipeline(
sources=[
0, # Webcam locale
'rtsp://192.168.1.100/stream1', # Camera IP RTSP
'rtsp://192.168.1.101/stream1', # Camera IP RTSP 2
'videos/recording.mp4' # File video
],
model_path='yolo26m.pt',
batch_size=4
)
# pipeline.run()
6. プロファイリングと OpenCV CUDA Accelerated
6.1 パイプライン CV 用の PyTorch プロファイラー
最適化する前に、測定してください。の PyTorch プロファイラー そしてツール パイプラインのボトルネックを特定するのにさらに強力: 正確に理解します 時間のかかる場所 (前処理、推論、後処理、CPU/GPU データ転送)。
import torch
import torch.profiler as profiler
import cv2
import numpy as np
from ultralytics import YOLO
def profile_cv_pipeline(model_path: str,
video_source,
n_frames: int = 100,
output_dir: str = './profiler_logs') -> None:
"""
Profila la pipeline CV con PyTorch Profiler.
Genera log compatibili con TensorBoard per analisi visuale.
Output: profiler_logs/ (aprire con: tensorboard --logdir profiler_logs)
"""
model = YOLO(model_path)
cap = cv2.VideoCapture(video_source)
frame_count = 0
with profiler.profile(
activities=[
profiler.ProfilerActivity.CPU,
profiler.ProfilerActivity.CUDA,
],
schedule=profiler.schedule(
wait=5, # Skip first 5 iterations (warmup variability)
warmup=5, # Warmup 5 iterations (profiler overhead)
active=20, # Profile 20 iterations
repeat=2 # Repeat the cycle 2 times
),
on_trace_ready=profiler.tensorboard_trace_handler(output_dir),
record_shapes=True, # Record tensor shapes
profile_memory=True, # Track memory allocations
with_stack=True # Include call stack
) as prof:
while frame_count < n_frames:
ret, frame = cap.read()
if not ret:
break
with torch.profiler.record_function("preprocessing"):
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1).unsqueeze(0)
if torch.cuda.is_available():
tensor = tensor.cuda()
with torch.profiler.record_function("inference"):
with torch.no_grad():
results = model.predict(frame, verbose=False)
with torch.profiler.record_function("postprocessing"):
boxes = results[0].boxes
n_det = len(boxes)
prof.step()
frame_count += 1
cap.release()
print(f"Profiling completato. Risultati in: {output_dir}")
print(f"Avvia: tensorboard --logdir {output_dir}")
# Stampa tabella top operazioni (utile senza TensorBoard)
print(prof.key_averages().table(
sort_by="cuda_time_total", row_limit=15
))
# Profiling semplice con timer manuale per produzioni senza TensorBoard
class PipelineProfiler:
"""Profiler leggero per monitoraggio continuo in produzione."""
def __init__(self, window_size: int = 100):
import collections
self.window_size = window_size
self.times: dict = {
'preprocess': collections.deque(maxlen=window_size),
'inference': collections.deque(maxlen=window_size),
'postprocess': collections.deque(maxlen=window_size),
}
def record(self, stage: str, duration_ms: float) -> None:
if stage in self.times:
self.times[stage].append(duration_ms)
def report(self) -> dict:
"""Restituisce statistiche rolling degli ultimi N frame."""
stats = {}
for stage, times in self.times.items():
if times:
arr = np.array(times)
stats[stage] = {
'mean_ms': float(np.mean(arr)),
'p50_ms': float(np.percentile(arr, 50)),
'p95_ms': float(np.percentile(arr, 95)),
'p99_ms': float(np.percentile(arr, 99)),
}
return stats
def print_report(self) -> None:
stats = self.report()
print("\n=== Pipeline Performance (last {self.window_size} frames) ===")
for stage, s in stats.items():
print(f" {stage:12s}: mean={s['mean_ms']:.1f}ms "
f"p95={s['p95_ms']:.1f}ms p99={s['p99_ms']:.1f}ms")
6.2 CUDA アクセラレーションを備えた OpenCV
OpenCV は 1 つのモジュールをサポートします クダ (CUDA でコンパイルされた opencv-contrib-python) これにより、NVIDIA GPU での前処理操作が最大 10 倍高速化されます。特に 高周波パイプラインでのサイズ変更、ガウスぼかし、カラー変換に役立ちます。
import cv2
import numpy as np
import torch
def check_opencv_cuda() -> dict:
"""Verifica disponibilità e configurazione di OpenCV CUDA."""
info = {
'opencv_version': cv2.__version__,
'cuda_enabled': cv2.cuda.getCudaEnabledDeviceCount() > 0,
'gpu_count': cv2.cuda.getCudaEnabledDeviceCount(),
}
if info['cuda_enabled']:
info['gpu_name'] = cv2.cuda.printShortCudaDeviceInfo(0)
return info
class CUDAPreprocessor:
"""
Preprocessor OpenCV accelerato su GPU.
Richiede: pip install opencv-contrib-python
e compilazione OpenCV con CUDA support.
Speedup tipico vs CPU: 5-10x per resize+cvtColor
"""
def __init__(self, target_size: tuple = (640, 640)):
self.target_size = target_size
self.has_cuda = cv2.cuda.getCudaEnabledDeviceCount() > 0
if self.has_cuda:
# Pre-alloca stream CUDA per pipeline asincrona
self.stream = cv2.cuda_Stream()
# GPU matrix per evitare ri-allocazioni
self.gpu_frame = cv2.cuda_GpuMat()
self.gpu_rgb = cv2.cuda_GpuMat()
self.gpu_resized = cv2.cuda_GpuMat()
def preprocess_cuda(self, img_bgr: np.ndarray) -> np.ndarray:
"""
Preprocessing su GPU: BGR->RGB + Resize + (opzionale) Gaussian blur.
Returns: numpy array RGB normalizzato [0,1] float32
"""
if not self.has_cuda:
return self._preprocess_cpu(img_bgr)
# Upload a GPU
self.gpu_frame.upload(img_bgr, self.stream)
# BGR -> RGB su GPU
cv2.cuda.cvtColor(self.gpu_frame, cv2.COLOR_BGR2RGB,
self.gpu_rgb, stream=self.stream)
# Resize su GPU (INTER_LINEAR e supportato in cuda)
cv2.cuda.resize(self.gpu_rgb, self.target_size,
self.gpu_resized,
interpolation=cv2.INTER_LINEAR,
stream=self.stream)
# Download da GPU (sincrono - necessario prima dell'inference PyTorch)
result = self.gpu_resized.download(stream=self.stream)
self.stream.waitForCompletion()
return result.astype(np.float32) / 255.0
def _preprocess_cpu(self, img_bgr: np.ndarray) -> np.ndarray:
"""Fallback CPU se CUDA non e disponibile."""
rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, self.target_size, interpolation=cv2.INTER_LINEAR)
return resized.astype(np.float32) / 255.0
def preprocess_batch_cuda(self, frames: list[np.ndarray]) -> torch.Tensor:
"""
Preprocessing batch su GPU.
Converte lista di frame in un batch tensor pronto per inference.
Massimizza throughput per pipeline multi-camera.
"""
preprocessed = [self.preprocess_cuda(f) for f in frames]
# Stack in batch tensor [B, C, H, W]
batch = np.stack(preprocessed) # [B, H, W, C]
batch_tensor = torch.from_numpy(batch).permute(0, 3, 1, 2)
if torch.cuda.is_available():
batch_tensor = batch_tensor.cuda(non_blocking=True)
return batch_tensor
# Benchmark: CPU vs CUDA preprocessing
def benchmark_preprocessing(n_frames: int = 500,
img_size: tuple = (1920, 1080)) -> None:
"""Confronta velocità preprocessing CPU vs CUDA."""
import time
# Genera frame sintetici
frames = [np.random.randint(0, 255, (*img_size, 3), dtype=np.uint8)
for _ in range(10)]
preprocessor = CUDAPreprocessor(target_size=(640, 640))
# Benchmark CPU
t0 = time.perf_counter()
for i in range(n_frames):
f = frames[i % 10]
preprocessor._preprocess_cpu(f)
cpu_time = time.perf_counter() - t0
# Benchmark CUDA (se disponibile)
if preprocessor.has_cuda:
t0 = time.perf_counter()
for i in range(n_frames):
f = frames[i % 10]
preprocessor.preprocess_cuda(f)
cuda_time = time.perf_counter() - t0
print(f"Preprocessing {n_frames} frame ({img_size[1]}x{img_size[0]} -> 640x640):")
print(f" CPU: {cpu_time:.2f}s ({n_frames/cpu_time:.0f} FPS)")
print(f" CUDA: {cuda_time:.2f}s ({n_frames/cuda_time:.0f} FPS)")
print(f" Speedup: {cpu_time/cuda_time:.1f}x")
else:
print("CUDA non disponibile, solo benchmark CPU")
print(f" CPU: {cpu_time:.2f}s ({n_frames/cpu_time:.0f} FPS)")
7. 画質評価とフレームメトリクス
フレームを推論に送信する前に、その品質を評価する価値があります。 揺れたり、露出オーバーしたり、鮮明さが不十分なフレームは偽陰性を引き起こし、画質を低下させます。 システムの信頼性。 PSNR および SSIM メトリクス。通常は品質のために使用されます。 圧縮の観点から、フレーム フィルタリングにもよく適用されます。
import cv2
import numpy as np
from dataclasses import dataclass
@dataclass
class FrameQuality:
"""Metriche di qualità per un singolo frame."""
blur_score: float # Varianza Laplaciano (alto = nitido)
brightness: float # Luminosita media [0, 255]
contrast: float # Deviazione standard luminosita
is_valid: bool # Frame accettabile per inference?
reject_reason: str # Motivo del rifiuto ('', 'blur', 'dark', ecc.)
class FrameQualityFilter:
"""
Filtra frame di bassa qualità prima dell'inference.
Riduce falsi negativi in sistemi di sorveglianza.
"""
def __init__(self,
blur_threshold: float = 100.0, # Laplacian variance
brightness_min: float = 30.0,
brightness_max: float = 220.0,
contrast_min: float = 15.0):
self.blur_threshold = blur_threshold
self.brightness_min = brightness_min
self.brightness_max = brightness_max
self.contrast_min = contrast_min
def assess(self, frame_bgr: np.ndarray) -> FrameQuality:
"""Valuta qualità del frame."""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
# Blur detection: varianza del Laplaciano
# Un frame nitido ha bordi marcati -> alta varianza
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
# Luminosita e contrasto
brightness = float(np.mean(gray))
contrast = float(np.std(gray))
# Valutazione
reject_reason = ''
if laplacian_var < self.blur_threshold:
reject_reason = f'blur (var={laplacian_var:.1f})'
elif brightness < self.brightness_min:
reject_reason = f'too_dark (mean={brightness:.1f})'
elif brightness > self.brightness_max:
reject_reason = f'overexposed (mean={brightness:.1f})'
elif contrast < self.contrast_min:
reject_reason = f'low_contrast (std={contrast:.1f})'
return FrameQuality(
blur_score=laplacian_var,
brightness=brightness,
contrast=contrast,
is_valid=(reject_reason == ''),
reject_reason=reject_reason
)
def filter_batch(self, frames: list[np.ndarray]) -> tuple[list, list]:
"""
Filtra una batch di frame.
Returns: (valid_frames, rejected_frames_with_reason)
"""
valid = []
rejected = []
for frame in frames:
quality = self.assess(frame)
if quality.is_valid:
valid.append(frame)
else:
rejected.append((frame, quality.reject_reason))
return valid, rejected
def compute_psnr(original: np.ndarray, compressed: np.ndarray) -> float:
"""
Peak Signal-to-Noise Ratio tra immagine originale e compressa.
PSNR > 40 dB = ottima qualità; < 20 dB = bassa qualità percettibile.
"""
mse = np.mean((original.astype(float) - compressed.astype(float)) ** 2)
if mse == 0:
return float('inf')
max_pixel = 255.0
return 20 * np.log10(max_pixel / np.sqrt(mse))
def compute_ssim(img1: np.ndarray, img2: np.ndarray) -> float:
"""
Structural Similarity Index (SSIM) tra due immagini grayscale.
Range: [-1, 1], dove 1 = identiche, 0 = non correlate.
Più fedele alla percezione umana rispetto a MSE/PSNR.
"""
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(float)
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(float)
C1 = (0.01 * 255) ** 2 # costante stabilità
C2 = (0.03 * 255) ** 2
mu1, mu2 = gray1.mean(), gray2.mean()
sigma1 = gray1.std()
sigma2 = gray2.std()
sigma12 = np.mean((gray1 - mu1) * (gray2 - mu2))
numerator = (2 * mu1 * mu2 + C1) * (2 * sigma12 + C2)
denominator = (mu1**2 + mu2**2 + C1) * (sigma1**2 + sigma2**2 + C2)
return float(numerator / denominator)
# Utilizzo in pipeline con motion gating intelligente
class SmartMotionGate:
"""
Combina background subtraction + SSIM per gating intelligente.
Evita sia di inferire su frame identici (statica) sia su frame corrotti.
"""
def __init__(self, ssim_change_threshold: float = 0.95):
self.subtractor = cv2.createBackgroundSubtractorMOG2(
history=200, varThreshold=40, detectShadows=False
)
self.quality_filter = FrameQualityFilter()
self.ssim_threshold = ssim_change_threshold
self.prev_frame = None
def should_infer(self, frame: np.ndarray) -> tuple[bool, str]:
"""
Decide se inferire su questo frame.
Returns: (should_infer, reason)
"""
# 1. Quality check
quality = self.quality_filter.assess(frame)
if not quality.is_valid:
return False, f"low_quality: {quality.reject_reason}"
# 2. Motion detection (veloce)
fg_mask = self.subtractor.apply(frame)
motion_ratio = np.sum(fg_mask > 0) / fg_mask.size
if motion_ratio < 0.005: # meno dello 0.5% dei pixel in movimento
return False, "no_motion"
# 3. SSIM check (rileva cambiamenti strutturali, non solo rumore)
if self.prev_frame is not None:
ssim = compute_ssim(frame, self.prev_frame)
if ssim > self.ssim_threshold:
return False, f"scene_static (ssim={ssim:.3f})"
self.prev_frame = frame.copy()
return True, "ok"
8. ベストプラクティスとパフォーマンス
一般的なパフォーマンス: CUDA 前処理を使用した OpenCV パイプラインでの YOLO26m
| ハードウェア | FPS (640x640) | レイテンシ | 最適化 |
|---|---|---|---|
| NVIDIA A100 | ~220FPS | ~4.5ms | TensorRT FP16 |
| NVIDIA RTX 4090 | ~180FPS | ~5.6ms | TensorRT FP16 |
| NVIDIA RTX 3080 | ~120FPS | ~8.3ms | ONNX ランタイム |
| インテル i9 CPU (ONNX) | ~18 FPS | ~55ms | OpenVINO |
| ラズベリーパイ5 | ~3 FPS | ~330ms | YOLOv8n INT8 |
パフォーマンスの最適化
- 別のスレッドで読んでください: カメラ I/O による推論ループをブロックしないでください。 ThreadedVideoCapture を使用して GPU の使用率を最大化します。
- BGR->RGB 1 回のみ: 通話のたびに変換しないでください。パイプライン全体で RGB を読み取り、維持するように変換します。
- torch.no_grad() は常に推論されます。 推論中の autograd グラフ計算を無効にします: メモリ -30%、速度 +10%。
- バッチ推論: 複数のビデオ ストリームがある場合は、フレームを蓄積してバッチで推論します。 GPU のスループットは、バッチ サイズに応じてほぼ直線的に増加します。
- 実稼働用 TensorRT: NVIDIA ハードウェアでは、最大速度を得るために常に TensorRT にエクスポートしてください。 YOLOv8m: TensorRT FP16 で 50 ~ 180 FPS。
- ゲートとしてのバックグラウンド減算: 動きの少ないシーンでは、MOG2 が動きを検出したフレームでのみ DL 推論を実行します。コンピューティングの 60 ~ 70% を節約します。
- フレーム品質フィルター: 推論の前に、揺れている (ブラー スコア < 100) フレームや露出不足のフレームを破棄します。偽陰性を減らし、システムの精度を向上させます。
- 静的シーン用の SSIM: 2 つの連続するフレームの SSIM > 0.95 がある場合、シーンは変更されません。推論を再実行せず、以前の結果を再利用します。
- 前処理用の OpenCV CUDA: NVIDIA GPU と CUDA でコンパイルされた opencv-contrib を使用している場合、前処理 (サイズ変更、cvtColor) は GPU 上で 5 ~ 10 倍高速になります。
避けるべきよくある間違い
- モデルのウォームアップを実行しないでください。 最初の推論はますます遅くなります (JIT コンパイル、CUDA 初期化)。パフォーマンスを測定する前に、3 ~ 5 個の推論ダミーを実行します。
- model.eval() のことは忘れてください。 トレイン モードでは、Dropout レイヤーと BatchNorm レイヤーの動作が異なります。常に電話する
model.eval()推理の前に。 - アノテーションにframe.copy()がありません: 元のフレームを推論にも使用している場合は、元のフレームに直接注釈を付けないでください。常に使用する
frame.copy(). - VideoCapture をリリースしないでください: すべて順調です
cv2.VideoCapture開いた状態で解放する必要がありますcap.release()。コンテキストマネージャーまたはtry/finallyを使用してください。 - 再接続ロジックのない RTSP: RTSP 接続が切断されます。各カメラ リーダーには、指数バックオフを備えた再試行ロジックが必要です。
結論
OpenCV と PyTorch/YOLO を統合する完全な CV パイプラインを構築しました。 本番環境に対応したシステムの各層:
- OpenCV による取得、前処理、後処理、視覚化
- GPU 使用率を最大化するノンブロッキング ビデオ キャプチャ スレッド
- 構造化されたロギング、アラート、ビデオ保存を備えた本番環境に対応したパイプライン
- 監視シナリオ向けのバッチ推論と RTSP 自動再接続を備えたマルチカメラ
- バックグラウンド減算 (MOG2) とスマート モーション ゲートにより、コンピューティングを最大 70% 削減
- ボトルネックを正確に特定するための PyTorch Profiler
- GPU 用の OpenCV CUDA による前処理の高速化 (CPU と比較して 5 ~ 10 倍)
- 推論前に破損したフレームをフィルタリングするためのフレーム品質評価
- 実際のハードウェアでのパフォーマンス ベンチマーク: A100 (220 FPS) から Raspberry Pi (3 FPS)







