OpenCV 및 PyTorch: 완벽한 컴퓨터 비전 파이프라인
OpenCV e 파이토치 그들은 컴퓨터 생태계의 두 기둥입니다 현대적인 비전. OpenCV는 이미지 획득, 전처리 및 후처리에 탁월합니다. 기존 작업, 웹캠 및 비디오 관리, 형태학적 변환, 클래식 필터. PyTorch는 신경망, GPU 컴퓨팅, 엔드투엔드 훈련 등 딥 러닝을 제공합니다. 함께, 그들은 형성한다 원시 픽셀 획득부터 지능형 예측에 이르는 완전한 CV 파이프라인.
이 기사에서는 비디오 캡처부터 완전한 컴퓨터 비전 파이프라인을 구축할 것입니다. OpenCV 사용, 전처리, PyTorch/YOLO 모델 추론, 시각화 결과를 기록합니다. 직접 배포할 수 있는 프로덕션 지원 시스템입니다.
무엇을 배울 것인가
- 기본 OpenCV: 이미지/비디오 읽기, 색상 공간, 형태학적 연산
- OpenCV-PyTorch 통합: 텐서 변환, 최적화된 파이프라인
- 버퍼 관리를 통한 실시간 비디오 획득
- 사전 처리 파이프라인: 크기 조정, 정규화, 일괄 준비
- 최적화된 추론: 일괄 처리, 비동기 추론
- 후처리: NMS, 좌표 변환, 프레임 주석
- 다중 카메라 파이프라인 및 RTSP 스트림 처리
- 감지된 이벤트에 대한 로깅 및 경고 시스템
- 성능 최적화: 스레딩, GPU 스트림, 프로파일링
1. CV 파이프라인을 위한 OpenCV 기초
1.1 색 공간 및 변환
자주 간과되는 중요한 점: OpenCV는 다음 형식을 사용합니다. BGR 기본적으로, PyTorch(및 PIL)가 사용하는 동안 RGB. 두 가지를 혼동하면 비참한 결과가 나옵니다. RGB 이미지로 사전 훈련된 모델은 반전된 채널을 수신합니다. 항상 전환하세요!
import cv2
import numpy as np
import torch
# ---- Lettura e conversioni ----
def load_image_rgb(path: str) -> np.ndarray:
"""Carica immagine in formato RGB (corretto per PyTorch)."""
img_bgr = cv2.imread(path)
if img_bgr is None:
raise FileNotFoundError(f"Immagine non trovata: {path}")
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
def bgr_to_torch(img_bgr: np.ndarray) -> torch.Tensor:
"""
Converte immagine OpenCV BGR in tensor PyTorch normalizzato.
BGR [H, W, C] uint8 -> RGB [C, H, W] float32 normalizzato [0,1]
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(img_rgb).permute(2, 0, 1).float() / 255.0
return tensor
def torch_to_bgr(tensor: torch.Tensor) -> np.ndarray:
"""
Converte tensor PyTorch in immagine OpenCV BGR.
RGB [C, H, W] float32 -> BGR [H, W, C] uint8
"""
img_rgb = (tensor.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
return cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
# ---- Spazi colore utili ----
def analyze_color_spaces(img_bgr: np.ndarray) -> dict:
"""Analizza l'immagine in diversi spazi colore."""
return {
'bgr': img_bgr,
'rgb': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB),
'gray': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY),
'hsv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV),
'lab': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB),
'yuv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2YUV),
}
# ---- Operazioni morfologiche ----
def apply_morphology(gray: np.ndarray, operation: str = 'opening',
kernel_size: int = 5) -> np.ndarray:
"""
Operazioni morfologiche per pre/post-processing.
opening = erosione + dilatazione (rimuove rumore piccolo)
closing = dilatazione + erosione (chiude piccoli buchi)
"""
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
ops = {
'erode': cv2.erode(gray, kernel),
'dilate': cv2.dilate(gray, kernel),
'opening': cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel),
'closing': cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel),
'gradient': cv2.morphologyEx(gray, cv2.MORPH_GRADIENT, kernel),
'tophat': cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel)
}
return ops.get(operation, gray)
# ---- Rilevamento contorni e feature classiche ----
def detect_edges_and_contours(img_bgr: np.ndarray) -> tuple:
"""Pipeline classica: blur -> Canny -> contorni."""
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blurred, threshold1=50, threshold2=150)
contours, hierarchy = cv2.findContours(
edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
# Filtra contorni per area minima
significant_contours = [c for c in contours if cv2.contourArea(c) > 500]
return edges, significant_contours
# ---- Preprocessing per modelli DL ----
def preprocess_for_model(img_bgr: np.ndarray,
target_size: tuple[int,int] = (640, 640),
mean: list = [0.485, 0.456, 0.406],
std: list = [0.229, 0.224, 0.225]) -> torch.Tensor:
"""
Pipeline completa: BGR immagine -> tensor normalizzato pronto per inference.
"""
# Letterbox resize (mantiene aspect ratio)
h, w = img_bgr.shape[:2]
target_h, target_w = target_size
scale = min(target_h / h, target_w / w)
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(img_bgr, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
# Padding per raggiungere target_size
pad_h = target_h - new_h
pad_w = target_w - new_w
padded = cv2.copyMakeBorder(
resized,
pad_h // 2, pad_h - pad_h // 2,
pad_w // 2, pad_w - pad_w // 2,
cv2.BORDER_CONSTANT, value=(114, 114, 114)
)
# BGR -> RGB -> float -> normalize -> tensor
rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1) # HWC -> CHW
mean_t = torch.tensor(mean).view(3, 1, 1)
std_t = torch.tensor(std).view(3, 1, 1)
tensor = (tensor - mean_t) / std_t
return tensor.unsqueeze(0), scale, (pad_w // 2, pad_h // 2) # batch dim + metadata
2. 비디오 및 웹캠 획득
2.1 VideoCapture 및 프레임 버퍼
import cv2
import threading
import queue
import time
from dataclasses import dataclass
@dataclass
class Frame:
"""Wrapper per un frame con metadata."""
data: np.ndarray
timestamp: float
frame_id: int
class ThreadedVideoCapture:
"""
VideoCapture con lettura in thread separato.
Previene il drop di frame dovuto al processing lento:
la GPU che fa inference non blocca la lettura della camera.
"""
def __init__(self, source, max_buffer_size: int = 5):
self.cap = cv2.VideoCapture(source)
if not self.cap.isOpened():
raise RuntimeError(f"Impossibile aprire: {source}")
# Ottimizza per latenza bassa
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.buffer = queue.Queue(maxsize=max_buffer_size)
self.stopped = False
self.frame_id = 0
# Avvia thread di lettura
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
def _read_frames(self) -> None:
"""Thread worker: legge frame continuamente."""
while not self.stopped:
ret, frame = self.cap.read()
if not ret:
self.stopped = True
break
frame_obj = Frame(
data=frame,
timestamp=time.time(),
frame_id=self.frame_id
)
self.frame_id += 1
# Scarta frame vecchi se il buffer e pieno
if self.buffer.full():
try:
self.buffer.get_nowait()
except queue.Empty:
pass
self.buffer.put(frame_obj)
def read(self) -> Frame | None:
"""Leggi prossimo frame disponibile."""
try:
return self.buffer.get(timeout=1.0)
except queue.Empty:
return None
def get_fps(self) -> float:
return self.cap.get(cv2.CAP_PROP_FPS)
def get_resolution(self) -> tuple[int, int]:
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
return w, h
def release(self) -> None:
self.stopped = True
self.cap.release()
def __enter__(self):
return self
def __exit__(self, *args):
self.release()
3. 완전한 CV 파이프라인: OpenCV + YOLO
import cv2
import torch
import numpy as np
import time
import logging
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field
from ultralytics import YOLO
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class Detection:
"""Singola detection con tutti i metadata."""
class_name: str
class_id: int
confidence: float
bbox: tuple[int, int, int, int] # x1, y1, x2, y2
timestamp: float = field(default_factory=time.time)
frame_id: int = 0
def to_dict(self) -> dict:
return {
'class': self.class_name,
'confidence': round(self.confidence, 3),
'bbox': list(self.bbox),
'timestamp': self.timestamp,
'frame_id': self.frame_id
}
class CVPipeline:
"""
Pipeline completa Computer Vision:
Acquisizione -> Preprocessing -> Inference -> Post-processing -> Output
"""
def __init__(
self,
model_path: str,
source, # int (webcam), str (file/RTSP)
conf_threshold: float = 0.4,
iou_threshold: float = 0.45,
target_classes: list[str] | None = None, # None = tutte le classi
output_dir: str = 'output',
save_video: bool = False,
alert_classes: list[str] | None = None # classi che triggerano alert
):
self.model = YOLO(model_path)
self.conf = conf_threshold
self.iou = iou_threshold
self.target_classes = target_classes
self.alert_classes = alert_classes or []
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.source = source
self.save_video = save_video
# Stats
self.frame_count = 0
self.total_detections = 0
self.start_time = None
self.fps_history = []
def run(self) -> None:
"""Avvia la pipeline di processing."""
logger.info(f"Avvio pipeline: source={self.source} modello={self.model.model_name}")
cap = cv2.VideoCapture(self.source)
if not cap.isOpened():
raise RuntimeError(f"Impossibile aprire source: {self.source}")
# Setup video writer se richiesto
writer = None
if self.save_video:
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
out_path = str(self.output_dir / f"output_{timestamp}.mp4")
writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
logger.info(f"Salvataggio video: {out_path}")
self.start_time = time.time()
detection_log = []
try:
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
# Inference
detections = self._run_inference(frame, self.frame_count)
# Visualizzazione
annotated = self._annotate_frame(frame, detections)
# Stats overlay
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed if elapsed > 0 else 0.0
self.fps_history.append(fps)
annotated = self._add_stats_overlay(annotated, fps, len(detections))
# Salvataggio
if writer:
writer.write(annotated)
cv2.imshow('CV Pipeline', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Logging detections
for det in detections:
detection_log.append(det.to_dict())
if det.class_name in self.alert_classes:
self._trigger_alert(det)
self.frame_count += 1
self.total_detections += len(detections)
finally:
cap.release()
if writer:
writer.release()
cv2.destroyAllWindows()
# Salva log
log_path = self.output_dir / 'detection_log.json'
with open(log_path, 'w') as f:
json.dump(detection_log, f, indent=2)
self._print_stats()
def _run_inference(self, frame: np.ndarray, frame_id: int) -> list[Detection]:
"""Esegue YOLO inference su un singolo frame."""
results = self.model.predict(
frame, conf=self.conf, iou=self.iou, verbose=False
)
detections = []
for box in results[0].boxes:
class_name = self.model.names[int(box.cls[0])]
# Filtra per classi target se specificato
if self.target_classes and class_name not in self.target_classes:
continue
x1, y1, x2, y2 = [int(c) for c in box.xyxy[0]]
detections.append(Detection(
class_name=class_name,
class_id=int(box.cls[0]),
confidence=float(box.conf[0]),
bbox=(x1, y1, x2, y2),
frame_id=frame_id
))
return detections
def _annotate_frame(self, frame: np.ndarray, detections: list[Detection]) -> np.ndarray:
"""Annota il frame con bounding boxes e label."""
annotated = frame.copy()
np.random.seed(42)
colors = {name: tuple(np.random.randint(50, 255, 3).tolist())
for name in self.model.names.values()}
for det in detections:
x1, y1, x2, y2 = det.bbox
color = colors.get(det.class_name, (0, 255, 0))
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = f"{det.class_name} {det.confidence:.2f}"
label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(annotated,
(x1, y1 - label_size[1] - 10),
(x1 + label_size[0] + 5, y1), color, -1)
cv2.putText(annotated, label, (x1 + 2, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
return annotated
def _add_stats_overlay(self, frame: np.ndarray, fps: float,
n_detections: int) -> np.ndarray:
"""Aggiunge overlay con statistiche real-time."""
h = frame.shape[0]
stats = [
f"FPS: {fps:.1f}",
f"Detections: {n_detections}",
f"Frame: {self.frame_count}",
f"Avg FPS: {np.mean(self.fps_history[-30:]):.1f}" if self.fps_history else "Avg FPS: -"
]
for i, text in enumerate(stats):
cv2.putText(frame, text, (10, h - 100 + i * 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
return frame
def _trigger_alert(self, det: Detection) -> None:
"""Gestisce un evento di alert per classi critiche."""
logger.warning(f"ALERT: {det.class_name} rilevato con confidenza {det.confidence:.2f} "
f"(frame {det.frame_id})")
# In produzione: invia notifica (email, Slack, webhook)
def _print_stats(self) -> None:
"""Stampa statistiche finali della sessione."""
elapsed = time.time() - self.start_time
avg_fps = self.frame_count / elapsed if elapsed > 0 else 0
logger.info(f"\n=== Statistiche Pipeline ===")
logger.info(f"Frame processati: {self.frame_count}")
logger.info(f"Detections totali: {self.total_detections}")
logger.info(f"Tempo totale: {elapsed:.1f}s")
logger.info(f"FPS medio: {avg_fps:.1f}")
# Utilizzo
if __name__ == '__main__':
pipeline = CVPipeline(
model_path='yolo26m.pt',
source=0, # webcam (o 'video.mp4', 'rtsp://...')
conf_threshold=0.4,
target_classes=['person', 'car', 'truck'],
alert_classes=['person'],
output_dir='output',
save_video=True
)
pipeline.run()
4. 후처리를 위한 OpenCV 작업
import cv2
import numpy as np
# ---- Background Subtraction (MOG2) ----
def setup_background_subtraction():
"""
MOG2: Mixture of Gaussians per rilevamento movimento.
Utile come primo filtro prima dell'inference DL.
"""
subtractor = cv2.createBackgroundSubtractorMOG2(
history=500,
varThreshold=50,
detectShadows=True
)
return subtractor
def detect_motion(frame_bgr: np.ndarray, subtractor,
min_contour_area: int = 1000) -> list[tuple]:
"""Rileva regioni di movimento nel frame."""
fg_mask = subtractor.apply(frame_bgr)
# Cleanup maschera
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(
fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
motion_regions = []
for c in contours:
if cv2.contourArea(c) >= min_contour_area:
x, y, w, h = cv2.boundingRect(c)
motion_regions.append((x, y, x+w, y+h))
return motion_regions
# ---- Optical Flow (Lucas-Kanade) ----
def compute_sparse_optical_flow(prev_gray: np.ndarray,
curr_gray: np.ndarray,
prev_points: np.ndarray) -> tuple:
"""
Lucas-Kanade optical flow per tracking di punti specifici.
Utile per tracking di keypoints rilevati nel frame precedente.
"""
lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
curr_points, status, error = cv2.calcOpticalFlowPyrLK(
prev_gray, curr_gray, prev_points, None, **lk_params
)
good_prev = prev_points[status.flatten() == 1]
good_curr = curr_points[status.flatten() == 1]
# Calcola velocità media del movimento
if len(good_prev) > 0:
flow_vectors = good_curr - good_prev
avg_speed = np.mean(np.linalg.norm(flow_vectors, axis=1))
else:
avg_speed = 0.0
return good_curr, good_prev, avg_speed
# ---- CSRT Tracker (più preciso di MOSSE) ----
class ObjectTracker:
"""
Tracker multi-oggetto per mantenere l'identità degli oggetti
tra frame consecutivi senza rieseguire l'inference a ogni frame.
"""
def __init__(self, tracker_type: str = 'CSRT'):
self.tracker_type = tracker_type
self.trackers = [] # lista di (tracker, class_name, id)
self.next_id = 0
def add_tracker(self, frame: np.ndarray, bbox: tuple, class_name: str) -> int:
"""Aggiunge un tracker per un nuovo oggetto."""
x1, y1, x2, y2 = bbox
cv2_bbox = (x1, y1, x2 - x1, y2 - y1) # formato OpenCV: x, y, w, h
tracker = cv2.TrackerCSRT_create()
tracker.init(frame, cv2_bbox)
obj_id = self.next_id
self.trackers.append((tracker, class_name, obj_id))
self.next_id += 1
return obj_id
def update(self, frame: np.ndarray) -> list[dict]:
"""Aggiorna tutti i tracker con il nuovo frame."""
active_objects = []
failed_trackers = []
for i, (tracker, class_name, obj_id) in enumerate(self.trackers):
success, cv2_bbox = tracker.update(frame)
if success:
x, y, w, h = [int(v) for v in cv2_bbox]
active_objects.append({
'id': obj_id,
'class_name': class_name,
'bbox': (x, y, x + w, y + h)
})
else:
failed_trackers.append(i)
# Rimuovi tracker falliti (indietro per evitare shift indici)
for i in reversed(failed_trackers):
self.trackers.pop(i)
return active_objects
def clear(self) -> None:
self.trackers = []
5. 다중 카메라 파이프라인 및 RTSP 스트림
import cv2
import threading
import queue
import torch
from ultralytics import YOLO
from dataclasses import dataclass
@dataclass
class CameraFrame:
camera_id: int
frame: np.ndarray
timestamp: float
class MultiCameraPipeline:
"""
Pipeline multi-camera con:
- Thread separato per ogni camera
- Coda condivisa per batch inference
- GPU condivisa tra tutte le camere
"""
def __init__(self, sources: list, model_path: str, batch_size: int = 4):
self.sources = sources
self.model = YOLO(model_path)
self.batch_size = batch_size
self.frame_queue = queue.Queue(maxsize=batch_size * 2)
self.stopped = False
def _camera_reader(self, camera_id: int, source) -> None:
"""Thread worker per singola camera."""
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if isinstance(source, str) and 'rtsp' in source:
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
while not self.stopped:
ret, frame = cap.read()
if not ret:
logger.warning(f"Camera {camera_id} disconnessa")
time.sleep(1.0)
cap = cv2.VideoCapture(source) # Reconnect
continue
frame_obj = CameraFrame(camera_id=camera_id, frame=frame,
timestamp=time.time())
try:
self.frame_queue.put(frame_obj, timeout=0.1)
except queue.Full:
pass # Drop frame se la coda e piena
cap.release()
def run(self) -> None:
"""Avvia tutti i reader thread e processa batch dalla coda."""
threads = []
for i, source in enumerate(self.sources):
t = threading.Thread(
target=self._camera_reader, args=(i, source), daemon=True
)
t.start()
threads.append(t)
logger.info(f"Pipeline avviata: {len(self.sources)} camere")
batch = []
while not self.stopped:
try:
frame_obj = self.frame_queue.get(timeout=0.5)
batch.append(frame_obj)
# Processa batch quando pieno o timeout
if len(batch) >= self.batch_size:
self._process_batch(batch)
batch = []
except queue.Empty:
if batch:
self._process_batch(batch)
batch = []
def _process_batch(self, batch: list[CameraFrame]) -> None:
"""Inference batch su GPU - più efficiente di inference singola."""
frames = [f.frame for f in batch]
# Batch inference con YOLO
results = self.model.predict(
frames, conf=0.4, iou=0.45, verbose=False
)
for frame_obj, result in zip(batch, results):
n_det = len(result.boxes)
if n_det > 0:
logger.info(f"Camera {frame_obj.camera_id}: {n_det} oggetti rilevati")
def stop(self) -> None:
self.stopped = True
# Configurazione multi-camera
pipeline = MultiCameraPipeline(
sources=[
0, # Webcam locale
'rtsp://192.168.1.100/stream1', # Camera IP RTSP
'rtsp://192.168.1.101/stream1', # Camera IP RTSP 2
'videos/recording.mp4' # File video
],
model_path='yolo26m.pt',
batch_size=4
)
# pipeline.run()
6. 프로파일링 및 OpenCV CUDA 가속화
6.1 파이프라인 CV용 PyTorch 프로파일러
최적화하기 전에 측정하세요. 그만큼 PyTorch 프로파일러 그리고 도구 파이프라인의 병목 현상을 식별하는 데 더욱 강력해졌습니다. 정확하게 이해합니다. 시간이 가는 곳(전처리, 추론, 후처리, CPU/GPU 데이터 전송).
import torch
import torch.profiler as profiler
import cv2
import numpy as np
from ultralytics import YOLO
def profile_cv_pipeline(model_path: str,
video_source,
n_frames: int = 100,
output_dir: str = './profiler_logs') -> None:
"""
Profila la pipeline CV con PyTorch Profiler.
Genera log compatibili con TensorBoard per analisi visuale.
Output: profiler_logs/ (aprire con: tensorboard --logdir profiler_logs)
"""
model = YOLO(model_path)
cap = cv2.VideoCapture(video_source)
frame_count = 0
with profiler.profile(
activities=[
profiler.ProfilerActivity.CPU,
profiler.ProfilerActivity.CUDA,
],
schedule=profiler.schedule(
wait=5, # Skip first 5 iterations (warmup variability)
warmup=5, # Warmup 5 iterations (profiler overhead)
active=20, # Profile 20 iterations
repeat=2 # Repeat the cycle 2 times
),
on_trace_ready=profiler.tensorboard_trace_handler(output_dir),
record_shapes=True, # Record tensor shapes
profile_memory=True, # Track memory allocations
with_stack=True # Include call stack
) as prof:
while frame_count < n_frames:
ret, frame = cap.read()
if not ret:
break
with torch.profiler.record_function("preprocessing"):
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1).unsqueeze(0)
if torch.cuda.is_available():
tensor = tensor.cuda()
with torch.profiler.record_function("inference"):
with torch.no_grad():
results = model.predict(frame, verbose=False)
with torch.profiler.record_function("postprocessing"):
boxes = results[0].boxes
n_det = len(boxes)
prof.step()
frame_count += 1
cap.release()
print(f"Profiling completato. Risultati in: {output_dir}")
print(f"Avvia: tensorboard --logdir {output_dir}")
# Stampa tabella top operazioni (utile senza TensorBoard)
print(prof.key_averages().table(
sort_by="cuda_time_total", row_limit=15
))
# Profiling semplice con timer manuale per produzioni senza TensorBoard
class PipelineProfiler:
"""Profiler leggero per monitoraggio continuo in produzione."""
def __init__(self, window_size: int = 100):
import collections
self.window_size = window_size
self.times: dict = {
'preprocess': collections.deque(maxlen=window_size),
'inference': collections.deque(maxlen=window_size),
'postprocess': collections.deque(maxlen=window_size),
}
def record(self, stage: str, duration_ms: float) -> None:
if stage in self.times:
self.times[stage].append(duration_ms)
def report(self) -> dict:
"""Restituisce statistiche rolling degli ultimi N frame."""
stats = {}
for stage, times in self.times.items():
if times:
arr = np.array(times)
stats[stage] = {
'mean_ms': float(np.mean(arr)),
'p50_ms': float(np.percentile(arr, 50)),
'p95_ms': float(np.percentile(arr, 95)),
'p99_ms': float(np.percentile(arr, 99)),
}
return stats
def print_report(self) -> None:
stats = self.report()
print("\n=== Pipeline Performance (last {self.window_size} frames) ===")
for stage, s in stats.items():
print(f" {stage:12s}: mean={s['mean_ms']:.1f}ms "
f"p95={s['p95_ms']:.1f}ms p99={s['p99_ms']:.1f}ms")
6.2 CUDA 가속 기능을 갖춘 OpenCV
OpenCV는 하나의 모듈을 지원합니다. 쿠다 (CUDA로 컴파일된 opencv-contrib-python) NVIDIA GPU에서 전처리 작업을 최대 10배까지 가속화합니다. 특히 고주파수 파이프라인에서 크기 조정, 가우시안 블러 및 색상 변환에 유용합니다.
import cv2
import numpy as np
import torch
def check_opencv_cuda() -> dict:
"""Verifica disponibilità e configurazione di OpenCV CUDA."""
info = {
'opencv_version': cv2.__version__,
'cuda_enabled': cv2.cuda.getCudaEnabledDeviceCount() > 0,
'gpu_count': cv2.cuda.getCudaEnabledDeviceCount(),
}
if info['cuda_enabled']:
info['gpu_name'] = cv2.cuda.printShortCudaDeviceInfo(0)
return info
class CUDAPreprocessor:
"""
Preprocessor OpenCV accelerato su GPU.
Richiede: pip install opencv-contrib-python
e compilazione OpenCV con CUDA support.
Speedup tipico vs CPU: 5-10x per resize+cvtColor
"""
def __init__(self, target_size: tuple = (640, 640)):
self.target_size = target_size
self.has_cuda = cv2.cuda.getCudaEnabledDeviceCount() > 0
if self.has_cuda:
# Pre-alloca stream CUDA per pipeline asincrona
self.stream = cv2.cuda_Stream()
# GPU matrix per evitare ri-allocazioni
self.gpu_frame = cv2.cuda_GpuMat()
self.gpu_rgb = cv2.cuda_GpuMat()
self.gpu_resized = cv2.cuda_GpuMat()
def preprocess_cuda(self, img_bgr: np.ndarray) -> np.ndarray:
"""
Preprocessing su GPU: BGR->RGB + Resize + (opzionale) Gaussian blur.
Returns: numpy array RGB normalizzato [0,1] float32
"""
if not self.has_cuda:
return self._preprocess_cpu(img_bgr)
# Upload a GPU
self.gpu_frame.upload(img_bgr, self.stream)
# BGR -> RGB su GPU
cv2.cuda.cvtColor(self.gpu_frame, cv2.COLOR_BGR2RGB,
self.gpu_rgb, stream=self.stream)
# Resize su GPU (INTER_LINEAR e supportato in cuda)
cv2.cuda.resize(self.gpu_rgb, self.target_size,
self.gpu_resized,
interpolation=cv2.INTER_LINEAR,
stream=self.stream)
# Download da GPU (sincrono - necessario prima dell'inference PyTorch)
result = self.gpu_resized.download(stream=self.stream)
self.stream.waitForCompletion()
return result.astype(np.float32) / 255.0
def _preprocess_cpu(self, img_bgr: np.ndarray) -> np.ndarray:
"""Fallback CPU se CUDA non e disponibile."""
rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, self.target_size, interpolation=cv2.INTER_LINEAR)
return resized.astype(np.float32) / 255.0
def preprocess_batch_cuda(self, frames: list[np.ndarray]) -> torch.Tensor:
"""
Preprocessing batch su GPU.
Converte lista di frame in un batch tensor pronto per inference.
Massimizza throughput per pipeline multi-camera.
"""
preprocessed = [self.preprocess_cuda(f) for f in frames]
# Stack in batch tensor [B, C, H, W]
batch = np.stack(preprocessed) # [B, H, W, C]
batch_tensor = torch.from_numpy(batch).permute(0, 3, 1, 2)
if torch.cuda.is_available():
batch_tensor = batch_tensor.cuda(non_blocking=True)
return batch_tensor
# Benchmark: CPU vs CUDA preprocessing
def benchmark_preprocessing(n_frames: int = 500,
img_size: tuple = (1920, 1080)) -> None:
"""Confronta velocità preprocessing CPU vs CUDA."""
import time
# Genera frame sintetici
frames = [np.random.randint(0, 255, (*img_size, 3), dtype=np.uint8)
for _ in range(10)]
preprocessor = CUDAPreprocessor(target_size=(640, 640))
# Benchmark CPU
t0 = time.perf_counter()
for i in range(n_frames):
f = frames[i % 10]
preprocessor._preprocess_cpu(f)
cpu_time = time.perf_counter() - t0
# Benchmark CUDA (se disponibile)
if preprocessor.has_cuda:
t0 = time.perf_counter()
for i in range(n_frames):
f = frames[i % 10]
preprocessor.preprocess_cuda(f)
cuda_time = time.perf_counter() - t0
print(f"Preprocessing {n_frames} frame ({img_size[1]}x{img_size[0]} -> 640x640):")
print(f" CPU: {cpu_time:.2f}s ({n_frames/cpu_time:.0f} FPS)")
print(f" CUDA: {cuda_time:.2f}s ({n_frames/cuda_time:.0f} FPS)")
print(f" Speedup: {cpu_time/cuda_time:.1f}x")
else:
print("CUDA non disponibile, solo benchmark CPU")
print(f" CPU: {cpu_time:.2f}s ({n_frames/cpu_time:.0f} FPS)")
7. 이미지 품질 평가 및 프레임 측정항목
추론을 위해 프레임을 보내기 전에 품질을 평가하는 것이 좋습니다. 흔들리거나 과다 노출되거나 선명도가 떨어지는 프레임은 잘못된 부정을 생성하고 감소시킵니다. 시스템의 신뢰성. 일반적으로 품질 측정에 사용되는 PSNR 및 SSIM 측정항목 압축의 경우 프레임 필터링에도 잘 적용됩니다.
import cv2
import numpy as np
from dataclasses import dataclass
@dataclass
class FrameQuality:
"""Metriche di qualità per un singolo frame."""
blur_score: float # Varianza Laplaciano (alto = nitido)
brightness: float # Luminosita media [0, 255]
contrast: float # Deviazione standard luminosita
is_valid: bool # Frame accettabile per inference?
reject_reason: str # Motivo del rifiuto ('', 'blur', 'dark', ecc.)
class FrameQualityFilter:
"""
Filtra frame di bassa qualità prima dell'inference.
Riduce falsi negativi in sistemi di sorveglianza.
"""
def __init__(self,
blur_threshold: float = 100.0, # Laplacian variance
brightness_min: float = 30.0,
brightness_max: float = 220.0,
contrast_min: float = 15.0):
self.blur_threshold = blur_threshold
self.brightness_min = brightness_min
self.brightness_max = brightness_max
self.contrast_min = contrast_min
def assess(self, frame_bgr: np.ndarray) -> FrameQuality:
"""Valuta qualità del frame."""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
# Blur detection: varianza del Laplaciano
# Un frame nitido ha bordi marcati -> alta varianza
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
# Luminosita e contrasto
brightness = float(np.mean(gray))
contrast = float(np.std(gray))
# Valutazione
reject_reason = ''
if laplacian_var < self.blur_threshold:
reject_reason = f'blur (var={laplacian_var:.1f})'
elif brightness < self.brightness_min:
reject_reason = f'too_dark (mean={brightness:.1f})'
elif brightness > self.brightness_max:
reject_reason = f'overexposed (mean={brightness:.1f})'
elif contrast < self.contrast_min:
reject_reason = f'low_contrast (std={contrast:.1f})'
return FrameQuality(
blur_score=laplacian_var,
brightness=brightness,
contrast=contrast,
is_valid=(reject_reason == ''),
reject_reason=reject_reason
)
def filter_batch(self, frames: list[np.ndarray]) -> tuple[list, list]:
"""
Filtra una batch di frame.
Returns: (valid_frames, rejected_frames_with_reason)
"""
valid = []
rejected = []
for frame in frames:
quality = self.assess(frame)
if quality.is_valid:
valid.append(frame)
else:
rejected.append((frame, quality.reject_reason))
return valid, rejected
def compute_psnr(original: np.ndarray, compressed: np.ndarray) -> float:
"""
Peak Signal-to-Noise Ratio tra immagine originale e compressa.
PSNR > 40 dB = ottima qualità; < 20 dB = bassa qualità percettibile.
"""
mse = np.mean((original.astype(float) - compressed.astype(float)) ** 2)
if mse == 0:
return float('inf')
max_pixel = 255.0
return 20 * np.log10(max_pixel / np.sqrt(mse))
def compute_ssim(img1: np.ndarray, img2: np.ndarray) -> float:
"""
Structural Similarity Index (SSIM) tra due immagini grayscale.
Range: [-1, 1], dove 1 = identiche, 0 = non correlate.
Più fedele alla percezione umana rispetto a MSE/PSNR.
"""
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(float)
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(float)
C1 = (0.01 * 255) ** 2 # costante stabilità
C2 = (0.03 * 255) ** 2
mu1, mu2 = gray1.mean(), gray2.mean()
sigma1 = gray1.std()
sigma2 = gray2.std()
sigma12 = np.mean((gray1 - mu1) * (gray2 - mu2))
numerator = (2 * mu1 * mu2 + C1) * (2 * sigma12 + C2)
denominator = (mu1**2 + mu2**2 + C1) * (sigma1**2 + sigma2**2 + C2)
return float(numerator / denominator)
# Utilizzo in pipeline con motion gating intelligente
class SmartMotionGate:
"""
Combina background subtraction + SSIM per gating intelligente.
Evita sia di inferire su frame identici (statica) sia su frame corrotti.
"""
def __init__(self, ssim_change_threshold: float = 0.95):
self.subtractor = cv2.createBackgroundSubtractorMOG2(
history=200, varThreshold=40, detectShadows=False
)
self.quality_filter = FrameQualityFilter()
self.ssim_threshold = ssim_change_threshold
self.prev_frame = None
def should_infer(self, frame: np.ndarray) -> tuple[bool, str]:
"""
Decide se inferire su questo frame.
Returns: (should_infer, reason)
"""
# 1. Quality check
quality = self.quality_filter.assess(frame)
if not quality.is_valid:
return False, f"low_quality: {quality.reject_reason}"
# 2. Motion detection (veloce)
fg_mask = self.subtractor.apply(frame)
motion_ratio = np.sum(fg_mask > 0) / fg_mask.size
if motion_ratio < 0.005: # meno dello 0.5% dei pixel in movimento
return False, "no_motion"
# 3. SSIM check (rileva cambiamenti strutturali, non solo rumore)
if self.prev_frame is not None:
ssim = compute_ssim(frame, self.prev_frame)
if ssim > self.ssim_threshold:
return False, f"scene_static (ssim={ssim:.3f})"
self.prev_frame = frame.copy()
return True, "ok"
8. 모범 사례 및 성능
일반적인 성능: CUDA 전처리를 사용하는 OpenCV 파이프라인의 YOLO26m
| 하드웨어 | FPS(640x640) | 숨어 있음 | 최적화 |
|---|---|---|---|
| 엔비디아 A100 | ~220FPS | ~4.5ms | 텐서RT FP16 |
| 엔비디아 RTX 4090 | ~180FPS | ~5.6ms | 텐서RT FP16 |
| 엔비디아 RTX 3080 | ~120FPS | ~8.3ms | ONNX 런타임 |
| 인텔 i9 CPU(ONNX) | ~18FPS | ~55ms | 오픈비노 |
| 라즈베리 파이 5 | ~3FPS | ~330ms | YOLOv8n INT8 |
성능 최적화
- 별도의 스레드에서 읽기: 카메라 I/O로 추론 루프를 차단하지 마십시오. GPU 사용량을 최대화하려면 ThreadedVideoCapture를 사용하세요.
- BGR->RGB는 한 번만: 통화할 때마다 전환하지 마세요. 파이프라인 전체에서 RGB를 읽고 유지하도록 변환합니다.
- torch.no_grad()는 항상 추론합니다: 추론 중 자동 그래프 계산 비활성화: 메모리 -30%, 속도 +10%.
- 일괄 추론: 비디오 스트림이 여러 개인 경우 프레임을 누적하고 일괄적으로 추론합니다. GPU 처리량은 배치 크기에 따라 거의 선형적으로 확장됩니다.
- 프로덕션용 TensorRT: NVIDIA 하드웨어에서는 최대 속도를 위해 항상 TensorRT로 내보내십시오. YOLOv8m: TensorRT FP16 사용 시 50~180FPS.
- 게이팅으로 배경 빼기: 움직임이 거의 없는 장면에서는 MOG2가 움직임을 감지하는 프레임에서만 DL 추론을 실행합니다. 컴퓨팅 비용을 60-70% 절약하세요.
- 프레임 품질 필터: 추론하기 전에 흔들리고(흐림 점수 < 100) 노출이 부족한 프레임을 폐기합니다. 거짓 부정을 줄이고 시스템 정밀도를 향상시킵니다.
- 정적 장면을 위한 SSIM: 두 개의 연속 프레임이 SSIM > 0.95이면 장면이 변경되지 않습니다. 추론을 다시 실행하지 말고 이전 결과를 재사용하세요.
- 전처리를 위한 OpenCV CUDA: NVIDIA GPU와 CUDA로 컴파일된 opencv-contrib가 있는 경우 GPU에서 전처리(크기 조정, cvtColor)가 5-10배 더 빠릅니다.
피해야 할 일반적인 실수
- 모델 준비를 수행하지 마십시오. 첫 번째 추론은 점점 느려집니다(JIT 컴파일, CUDA 초기화). 성능을 측정하기 전에 3~5개의 추론 더미를 실행하세요.
- model.eval()은 잊어버리세요: 학습 모드에서는 Dropout 및 BatchNorm 레이어가 다르게 동작합니다. 항상 전화해
model.eval()추론 전. - 주석에 frame.copy()가 없습니다: 추론에도 사용하는 경우 원본 프레임에 직접 주석을 달지 마세요. 항상 사용
frame.copy(). - VideoCapture를 해제하지 마십시오: 모든 것이 괜찮습니다
cv2.VideoCapture개봉된 제품은 다음과 같이 출시되어야 합니다.cap.release(). 컨텍스트 관리자를 사용하거나 try/finally를 사용하세요. - 재연결 로직이 없는 RTSP: RTSP 연결이 끊어집니다. 각 카메라 리더에는 지수 백오프를 사용하는 재시도 논리가 있어야 합니다.
결론
우리는 OpenCV와 PyTorch/YOLO를 통합하는 완전한 CV 파이프라인을 구축했습니다. 생산 준비 시스템의 각 계층:
- 획득, 전처리, 후처리 및 시각화를 위한 OpenCV
- GPU 활용도를 극대화하는 비차단 비디오 캡처 스레딩
- 구조화된 로깅, 경고 및 비디오 저장 기능을 갖춘 프로덕션 준비 파이프라인
- 감시 시나리오를 위한 일괄 추론 및 RTSP 자동 재연결 기능을 갖춘 다중 카메라
- 배경 빼기(MOG2) 및 Smart Motion Gate를 통해 컴퓨팅 성능을 최대 70%까지 줄일 수 있습니다.
- 병목 현상을 정확하게 식별하는 PyTorch 프로파일러
- GPU 가속 전처리용 OpenCV CUDA(CPU 대비 5~10배)
- 추론 전에 손상된 프레임을 필터링하는 프레임 품질 평가
- 실제 하드웨어 성능 벤치마크: A100(220FPS)부터 Raspberry Pi(3FPS)까지







