얼굴 감지 및 인식: MediaPipe, MTCNN 및 FaceNet
얼굴 인식은 가장 성숙한 컴퓨터 비전 애플리케이션 중 하나이며 광범위한 분야: 보안 시스템부터 스마트폰까지, 접근 제어부터 분석까지 소매업의 인구통계. 하지만 올바르게 구현하세요. 신중하게 정확성, 속도 및 무엇보다도 윤리를 위해서는 이해가 필요합니다. 관련된 기술의 깊이.
이 기사에서는 전체 스택인 얼굴 감지(얼굴 찾기)를 살펴보겠습니다. 이미지), 얼굴 정렬(기하학적 정규화), 얼굴 임베딩(표현) 벡터) 및 얼굴 확인/식별. 실시간으로 MediaPipe를 사용하겠습니다. 정확성을 위한 MTCNN, 인식을 위한 FaceNet/ArcFace.
무엇을 배울 것인가
- 얼굴 감지와 얼굴 인식 파이프라인: 차이점 및 사용 사례
- MediaPipe 얼굴 감지: 빠르고 가벼운 크로스 플랫폼
- MediaPipe 페이스 메시: 실시간 468개 얼굴 랜드마크
- MTCNN: 정확한 감지를 위한 다중 작업 계단식 CNN
- 얼굴 정렬: 랜드마크를 사용한 기하학적 정규화
- 얼굴 임베딩: 간결한 표현을 위한 FaceNet 및 ArcFace
- 얼굴인증(1:1) 및 본인확인(1:N)
- 얼굴 데이터베이스를 사용해 처음부터 인식 시스템 구축
- 윤리적 및 법적 고려사항: GDPR, 편견, 동의
1. 얼굴 인식과 얼굴 인식: 완전한 파이프라인
"얼굴 인식"이라는 용어는 요구 사항이 있는 두 가지 별개의 작업을 함께 그룹화하는 경우가 많습니다. 매우 다른 기술자:
얼굴 파이프라인의 구성 요소
| 단계 | 작업 | 출력 | 대표적인 모델 |
|---|---|---|---|
| 발각 | 얼굴 위치 찾기 | 경계 상자 | 미디어파이프, MTCNN, 레티나페이스 |
| 조정 | 기하학 정규화 | 정규화된 112x112 이미지 | 랜드마크와 유사한 유사성 |
| 임베딩 | 기능 설명자 추출 | 캐리어 128-512D | FaceNet, ArcFace, AdaFace |
| 확인 | 같은 사람? (1:1) | 유사성 점수, 부울 | 임베딩 간 코사인 거리 |
| 신분증 | 누구입니까? (1:N) | 정체성 + 자신감 | 내장 데이터베이스의 KNN |
2. MediaPipe: 얼굴 감지 및 얼굴 메시
미디어파이프 Google의 얼굴 인식을 위한 가장 실용적인 프레임워크 CPU에서 실시간으로. BlazeFace 모델은 특히 속도에 최적화되어 있습니다. 모바일 및 임베디드 장치에서 최신 노트북에서는 200FPS 이상에 도달합니다.
2.1 MediaPipe를 이용한 얼굴 검출
import mediapipe as mp
import cv2
import numpy as np
from dataclasses import dataclass
from typing import Optional
@dataclass
class FaceDetection:
"""Risultato di detection per un singolo volto."""
bbox: tuple[int, int, int, int] # x1, y1, x2, y2
confidence: float
keypoints: dict[str, tuple[int, int]] # nome -> (x, y) in pixel
class MediaPipeFaceDetector:
"""
Face detector basato su MediaPipe BlazeFace.
Velocissimo su CPU: 200+ FPS su immagini 640x480.
Ottimo per real-time, non per immagini ad alta densita di volti.
"""
KEYPOINT_NAMES = [
'right_eye', 'left_eye', 'nose_tip',
'mouth_center', 'right_ear_tragion', 'left_ear_tragion'
]
def __init__(self, min_confidence: float = 0.5,
model_selection: int = 0):
"""
model_selection:
0 = short range (entro 2m, più veloce)
1 = full range (fino a 5m, più accurato)
"""
self.mp_face = mp.solutions.face_detection
self.detector = self.mp_face.FaceDetection(
model_selection=model_selection,
min_detection_confidence=min_confidence
)
self.mp_draw = mp.solutions.drawing_utils
def detect(self, img_bgr: np.ndarray) -> list[FaceDetection]:
"""Rileva volti in un'immagine BGR."""
h, w = img_bgr.shape[:2]
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
results = self.detector.process(img_rgb)
faces = []
if not results.detections:
return faces
for detection in results.detections:
score = detection.score[0]
bbox_rel = detection.location_data.relative_bounding_box
# Coordinate relative -> pixel
x1 = max(0, int(bbox_rel.xmin * w))
y1 = max(0, int(bbox_rel.ymin * h))
x2 = min(w, int((bbox_rel.xmin + bbox_rel.width) * w))
y2 = min(h, int((bbox_rel.ymin + bbox_rel.height) * h))
# Keypoints (occhi, naso, bocca, orecchie)
keypoints = {}
for idx, name in enumerate(self.KEYPOINT_NAMES):
kp = detection.location_data.relative_keypoints[idx]
keypoints[name] = (int(kp.x * w), int(kp.y * h))
faces.append(FaceDetection(
bbox=(x1, y1, x2, y2),
confidence=float(score),
keypoints=keypoints
))
return faces
def draw(self, img_bgr: np.ndarray,
faces: list[FaceDetection]) -> np.ndarray:
"""Annota immagine con i risultati della detection."""
annotated = img_bgr.copy()
for face in faces:
x1, y1, x2, y2 = face.bbox
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(annotated, f"{face.confidence:.2f}",
(x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (0, 255, 0), 2)
# Disegna keypoints
for name, (kx, ky) in face.keypoints.items():
color = (0, 0, 255) if 'eye' in name else (255, 0, 0)
cv2.circle(annotated, (kx, ky), 4, color, -1)
return annotated
# Utilizzo: detection su webcam in real-time
def run_face_detection_webcam() -> None:
detector = MediaPipeFaceDetector(min_confidence=0.5)
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
faces = detector.detect(frame)
annotated = detector.draw(frame, faces)
cv2.putText(annotated, f"Faces: {len(faces)}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('MediaPipe Face Detection', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
2.2 페이스 메시: 실시간 468개 랜드마크
모델 페이스메시 MediaPipe는 468개의 3D 랜드마크(x, y, z)를 추출합니다. 얼굴의. 얼굴 정렬, 감정 추정, AR 필터, 시선 및 졸음 감지(눈 종횡비).
import mediapipe as mp
import cv2
import numpy as np
class FaceMeshAnalyzer:
"""
MediaPipe Face Mesh: 468 landmark 3D in real-time.
Utilita incluse: eye aspect ratio (sonnolenza), head pose, ecc.
"""
# Indici dei landmark MediaPipe per occhi
LEFT_EYE_IDX = [362, 385, 387, 263, 373, 380]
RIGHT_EYE_IDX = [33, 160, 158, 133, 153, 144]
def __init__(self, max_faces: int = 1,
refine_landmarks: bool = True):
"""
refine_landmarks=True: aggiunge landmark attorno agli occhi
e alle iridi (468 -> 478 punti totali).
"""
self.mp_mesh = mp.solutions.face_mesh
self.face_mesh = self.mp_mesh.FaceMesh(
max_num_faces=max_faces,
refine_landmarks=refine_landmarks,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
self.mp_draw = mp.solutions.drawing_utils
self.mp_styles = mp.solutions.drawing_styles
def process(self, img_bgr: np.ndarray) -> Optional[list]:
"""Processa immagine e restituisce lista di landmark per ogni volto."""
h, w = img_bgr.shape[:2]
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(img_rgb)
if not results.multi_face_landmarks:
return None
all_faces_lm = []
for face_landmarks in results.multi_face_landmarks:
# Converti da coordinate normalizzate a pixel
lm_pixels = []
for lm in face_landmarks.landmark:
lm_pixels.append((int(lm.x * w), int(lm.y * h), lm.z))
all_faces_lm.append(lm_pixels)
return all_faces_lm
def eye_aspect_ratio(self, landmarks: list,
eye_indices: list) -> float:
"""
Eye Aspect Ratio (EAR) - indicatore di sonnolenza.
EAR < 0.2 per 20+ frame consecutivi = occhio chiuso.
Formula: EAR = (|p2-p6| + |p3-p5|) / (2 * |p1-p4|)
"""
pts = [np.array(landmarks[i][:2]) for i in eye_indices]
# Distanze verticali
A = np.linalg.norm(pts[1] - pts[5])
B = np.linalg.norm(pts[2] - pts[4])
# Distanza orizzontale
C = np.linalg.norm(pts[0] - pts[3])
return (A + B) / (2.0 * C) if C > 0 else 0.0
def draw_mesh(self, img_bgr: np.ndarray,
results_raw) -> np.ndarray:
"""Disegna la mesh completa con stili MediaPipe predefiniti."""
annotated = img_bgr.copy()
if results_raw and results_raw.multi_face_landmarks:
for face_lm in results_raw.multi_face_landmarks:
self.mp_draw.draw_landmarks(
image=annotated,
landmark_list=face_lm,
connections=self.mp_mesh.FACEMESH_TESSELATION,
landmark_drawing_spec=None,
connection_drawing_spec=self.mp_styles
.get_default_face_mesh_tesselation_style()
)
return annotated
# Rilevamento sonnolenza con Eye Aspect Ratio
def drowsiness_detector(threshold: float = 0.22,
consec_frames: int = 20) -> None:
"""Sistema di alert sonnolenza basato su EAR."""
analyzer = FaceMeshAnalyzer(max_faces=1)
cap = cv2.VideoCapture(0)
ear_counter = 0
alert_active = False
while True:
ret, frame = cap.read()
if not ret:
break
landmarks_list = analyzer.process(frame)
if landmarks_list:
lms = landmarks_list[0] # primo volto
ear_l = analyzer.eye_aspect_ratio(lms, analyzer.LEFT_EYE_IDX)
ear_r = analyzer.eye_aspect_ratio(lms, analyzer.RIGHT_EYE_IDX)
avg_ear = (ear_l + ear_r) / 2.0
if avg_ear < threshold:
ear_counter += 1
if ear_counter >= consec_frames:
alert_active = True
cv2.putText(frame, "ALERT: SONNOLENZA!",
(50, 200), cv2.FONT_HERSHEY_SIMPLEX,
1.5, (0, 0, 255), 3)
else:
ear_counter = 0
alert_active = False
cv2.putText(frame, f"EAR: {avg_ear:.3f}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.8, (0, 255, 0), 2)
cv2.imshow('Drowsiness Detector', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
3. MTCNN: 다중 작업 계단식 CNN
MTCNN 그리고 균형을 잡아주는 3단 검출기(P-Net, R-Net, O-Net) 속도와 정확성. 이는 시스템의 정확한 감지를 위한 최적의 표준입니다. 인식: 5가지 필수 랜드마크(눈, 코, 입가)로 얼굴을 식별합니다. 얼굴 정렬을 위해 MediaPipe보다 느리지만 열악한 조건에서는 더 강력합니다.
from mtcnn import MTCNN
import cv2
import numpy as np
from PIL import Image
class MTCNNFaceProcessor:
"""
MTCNN per detection precisa + face alignment.
Produce immagini 112x112 normalizzate, ottimali per FaceNet/ArcFace.
"""
def __init__(self, min_face_size: int = 40,
thresholds: list = None,
scale_factor: float = 0.709):
self.detector = MTCNN(
min_face_size=min_face_size,
thresholds=thresholds or [0.6, 0.7, 0.7],
scale_factor=scale_factor
)
def detect_and_align(self, img_bgr: np.ndarray,
output_size: int = 112) -> list[np.ndarray]:
"""
Rileva volti e li restituisce allineati (112x112 default).
L'allineamento usa una trasformazione affine sui 5 landmark
per portare gli occhi in posizione canonica.
Returns: lista di immagini volto allineate (BGR, float32 [0,1])
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
detections = self.detector.detect_faces(img_rgb)
aligned_faces = []
for det in detections:
if det['confidence'] < 0.90:
continue
keypoints = det['keypoints']
src_pts = np.array([
keypoints['left_eye'],
keypoints['right_eye'],
keypoints['nose'],
keypoints['mouth_left'],
keypoints['mouth_right']
], dtype=np.float32)
# Punti di destinazione canonici per 112x112
dst_pts = np.array([
[38.2946, 51.6963],
[73.5318, 51.6963],
[56.0252, 71.7366],
[41.5493, 92.3655],
[70.7299, 92.3655]
], dtype=np.float32)
# Scale per output_size diversi da 112
scale = output_size / 112.0
dst_pts *= scale
# Trasformazione affine -> immagine allineata
M = cv2.estimateAffinePartial2D(src_pts, dst_pts)[0]
aligned = cv2.warpAffine(img_bgr, M, (output_size, output_size))
aligned_faces.append(aligned.astype(np.float32) / 255.0)
return aligned_faces
def detect_with_info(self, img_bgr: np.ndarray) -> list[dict]:
"""Rileva volti con tutte le informazioni MTCNN."""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
detections = self.detector.detect_faces(img_rgb)
results = []
h, w = img_bgr.shape[:2]
for det in detections:
x, y, bw, bh = det['box']
x1 = max(0, x)
y1 = max(0, y)
x2 = min(w, x + bw)
y2 = min(h, y + bh)
results.append({
'bbox': (x1, y1, x2, y2),
'confidence': det['confidence'],
'keypoints': det['keypoints']
})
return results
4. 얼굴 인식: FaceNet 및 ArcFace
감지 및 정렬 후 인식 시스템의 핵심은 얼굴 임베딩 모델: 이미지를 변환하는 신경망 128-512 차원의 벡터에서 112x112. 같은 사람의 얼굴이 나온다 공간에서 가까운 벡터; 다른 얼굴은 멀리 있습니다.
얼굴 임베딩 모델 비교
| 모델 | 희미한 임베딩 | 손실 | LFW Acc. | 크기 |
|---|---|---|---|---|
| 페이스넷(구글) | 128 | 삼중 손실 | 99.63% | 90MB |
| ArcFace(인사이트페이스) | 512 | ArcFaceLoss | 99.83% | 249MB |
| 에이다페이스 | 512 | Ada페이스 손실 | 99.82% | 249MB |
| MobileFaceNet(에지) | 128 | ArcFaceLoss | 99.55% | 4MB |
import insightface
from insightface.app import FaceAnalysis
import numpy as np
import cv2
import pickle
from pathlib import Path
from sklearn.preprocessing import normalize
from sklearn.neighbors import KNeighborsClassifier
class FaceRecognitionSystem:
"""
Sistema completo di face recognition basato su InsightFace (ArcFace).
Supporta registrazione di nuove identità e riconoscimento real-time.
Installazione: pip install insightface onnxruntime scikit-learn
"""
def __init__(self, db_path: str = 'face_db.pkl',
recognition_threshold: float = 0.5):
"""
recognition_threshold: soglia coseno per considerare un match
(0.5 e un buon default per ArcFace 512D)
"""
# Inizializza FaceAnalysis (detection + embedding in un'unica API)
self.app = FaceAnalysis(
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
self.app.prepare(ctx_id=0, det_size=(640, 640))
self.db_path = Path(db_path)
self.threshold = recognition_threshold
self.database: dict[str, list[np.ndarray]] = {}
self.knn: Optional[KNeighborsClassifier] = None
if self.db_path.exists():
self._load_database()
def register_person(self, name: str,
images: list[np.ndarray],
max_faces_per_image: int = 1) -> int:
"""
Registra una nuova persona nel database.
name: identificatore della persona
images: lista di immagini BGR (almeno 5 per robustezza)
Returns: numero di embedding registrati con successo
"""
embeddings = []
for img in images:
faces = self.app.get(img)
if not faces:
continue
# Prendi il volto più grande (per immagini con una persona)
face = max(faces, key=lambda f: (f.bbox[2]-f.bbox[0]) *
(f.bbox[3]-f.bbox[1]))
emb = normalize(face.embedding.reshape(1, -1))[0]
embeddings.append(emb)
if len(embeddings) >= max_faces_per_image * len(images):
break
if not embeddings:
print(f"[WARN] Nessun volto rilevato per {name}")
return 0
if name not in self.database:
self.database[name] = []
self.database[name].extend(embeddings)
self._rebuild_knn()
self._save_database()
print(f"Registrato {name}: {len(embeddings)} embedding")
return len(embeddings)
def recognize(self, img_bgr: np.ndarray) -> list[dict]:
"""
Riconosce tutti i volti in un'immagine.
Returns: lista di dict con bbox, identity, confidence per ogni volto
"""
faces = self.app.get(img_bgr)
results = []
for face in faces:
emb = normalize(face.embedding.reshape(1, -1))[0]
identity, confidence = self._match_embedding(emb)
x1, y1, x2, y2 = face.bbox.astype(int)
results.append({
'bbox': (x1, y1, x2, y2),
'identity': identity,
'confidence': confidence,
'is_known': confidence >= self.threshold
})
return results
def _match_embedding(self, emb: np.ndarray) -> tuple[str, float]:
"""Trova la corrispondenza migliore nel database."""
if not self.database or self.knn is None:
return ('unknown', 0.0)
# Usa KNN con metrica coseno (1 - cosine_similarity = cosine_distance)
dist, idx = self.knn.kneighbors([emb], n_neighbors=1)
labels = [name for name, embs in self.database.items()
for _ in embs]
best_name = labels[idx[0][0]]
similarity = 1.0 - dist[0][0] # da distanza coseno a similarità
return (best_name, float(similarity))
def _rebuild_knn(self) -> None:
"""Ricostruisce il classificatore KNN dopo aggiornamenti al DB."""
all_embs = []
all_labels = []
for name, embs in self.database.items():
all_embs.extend(embs)
all_labels.extend([name] * len(embs))
if len(all_embs) < 2:
return
self.knn = KNeighborsClassifier(
n_neighbors=min(3, len(all_embs)),
metric='cosine',
algorithm='brute'
)
self.knn.fit(np.array(all_embs), all_labels)
def _save_database(self) -> None:
with open(self.db_path, 'wb') as f:
pickle.dump(self.database, f)
def _load_database(self) -> None:
with open(self.db_path, 'rb') as f:
self.database = pickle.load(f)
self._rebuild_knn()
print(f"Database caricato: {len(self.database)} identità")
def annotate(self, img_bgr: np.ndarray,
results: list[dict]) -> np.ndarray:
"""Annota l'immagine con i risultati del riconoscimento."""
annotated = img_bgr.copy()
for r in results:
x1, y1, x2, y2 = r['bbox']
color = (0, 255, 0) if r['is_known'] else (0, 0, 255)
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = (f"{r['identity']} ({r['confidence']:.2f})"
if r['is_known'] else "Unknown")
cv2.putText(annotated, label, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return annotated
5. 얼굴 검증: 임계값 및 ROC 곡선
La 얼굴 확인 질문에 대답합니다: "이 두 사진은 같은 사람?" 식별(1:N)과는 다른 1:1 매칭 문제입니다. 핵심은 ROC 곡선 분석을 통해 올바른 유사성 임계값을 선택하는 것입니다.
import numpy as np
from sklearn.metrics import roc_curve, auc
import matplotlib
matplotlib.use('Agg') # Per ambienti senza display
import matplotlib.pyplot as plt
def cosine_similarity(emb1: np.ndarray, emb2: np.ndarray) -> float:
"""Similarità coseno tra due embedding normalizzati."""
emb1_n = emb1 / (np.linalg.norm(emb1) + 1e-10)
emb2_n = emb2 / (np.linalg.norm(emb2) + 1e-10)
return float(np.dot(emb1_n, emb2_n))
def find_optimal_threshold(same_person_pairs: list[tuple],
diff_person_pairs: list[tuple]) -> dict:
"""
Trova la soglia ottimale analizzando la ROC curve.
same_person_pairs: lista di coppie (emb1, emb2) della stessa persona
diff_person_pairs: lista di coppie (emb1, emb2) di persone diverse
Returns: {threshold, eer, auc, far, frr}
"""
scores = []
labels = []
for emb1, emb2 in same_person_pairs:
scores.append(cosine_similarity(emb1, emb2))
labels.append(1) # stessa persona
for emb1, emb2 in diff_person_pairs:
scores.append(cosine_similarity(emb1, emb2))
labels.append(0) # persone diverse
scores_arr = np.array(scores)
labels_arr = np.array(labels)
# ROC curve
fpr, tpr, thresholds = roc_curve(labels_arr, scores_arr)
roc_auc = auc(fpr, tpr)
# Equal Error Rate (EER): punto dove FAR = FRR
fnr = 1 - tpr
eer_idx = np.argmin(np.abs(fpr - fnr))
eer = (fpr[eer_idx] + fnr[eer_idx]) / 2.0
optimal_threshold = thresholds[eer_idx]
# Metriche a soglia ottimale
predictions = (scores_arr >= optimal_threshold).astype(int)
tp = np.sum((predictions == 1) & (labels_arr == 1))
fp = np.sum((predictions == 1) & (labels_arr == 0))
fn = np.sum((predictions == 0) & (labels_arr == 1))
tn = np.sum((predictions == 0) & (labels_arr == 0))
far = fp / (fp + tn) if (fp + tn) > 0 else 0 # False Accept Rate
frr = fn / (fn + tp) if (fn + tp) > 0 else 0 # False Reject Rate
print(f"=== Face Verification Metrics ===")
print(f"AUC-ROC: {roc_auc:.4f}")
print(f"EER: {eer:.4f} ({eer*100:.2f}%)")
print(f"Soglia ottimale: {optimal_threshold:.4f}")
print(f"FAR @ EER: {far:.4f} ({far*100:.2f}%)")
print(f"FRR @ EER: {frr:.4f} ({frr*100:.2f}%)")
return {
'threshold': float(optimal_threshold),
'eer': float(eer),
'auc': float(roc_auc),
'far': float(far),
'frr': float(frr)
}
6. 스푸핑 방지 및 생체 감지
없는 얼굴인식 시스템 생체 감지 취약하고 스푸핑 공격: 인쇄된 사진, 스마트폰에 있는 동영상, 마스크만 있으면 가능 3D는 대부분의 탐지기를 속입니다. 생체 감지로 얼굴 구별 인공물에서 나온 진짜.
스푸핑 공격 유형
| 공격 유형 | 설명 | 방어 난이도 | 완화 기법 |
|---|---|---|---|
| 인쇄 공격 | 종이/광택지에 인쇄된 사진 | 낮은 | 질감 분석, 모아레 패턴 감지 |
| 재생 공격 | 화면 속 얼굴 영상 | 평균 | 화면 반사 감지, 3D 깊이 |
| 3D 마스크 | 사실적인 3D 프린팅 마스크 | 높은 | IR 센서, 도전-응답, 마이크로모션 |
| 딥페이크 동영상 | AI가 생성한 합성 비디오 | 매우 높음 | 딥페이크 탐지기, 혈류 분석 |
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from typing import Optional
class LivenessDetector:
"""
Sistema di liveness detection basato su due segnali complementari:
1. Texture analysis (LBP-based + CNN) - rileva print attacks
2. Micro-motion analysis - rileva replay attacks (video statici non hanno micro-movimenti)
Per deployment serio, considera: SilentFace, FAS-SGTD, CentralDiff-CNN
"""
def __init__(self, model_path: Optional[str] = None,
device: str = 'auto'):
self.device = torch.device(
'cuda' if torch.cuda.is_available() and device == 'auto'
else 'cpu'
)
# Modello CNN per texture analysis (fine-tuned su dataset anti-spoofing)
# Dataset: CelebA-Spoof, OULU-NPU, MSU-MFSD
self.model = self._build_model(model_path)
self.model.eval()
self.transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# Buffer per micro-motion analysis
self.frame_buffer: list[np.ndarray] = []
self.buffer_size = 10 # 10 frame ~= 333ms @ 30FPS
def _build_model(self, model_path: Optional[str]) -> nn.Module:
"""
MobileNetV2 fine-tuned per binary classification: real vs spoof.
MobileNetV2 perchè e leggero (3.4M params) e veloce su CPU/edge.
"""
model = models.mobilenet_v2(pretrained=False)
model.classifier = nn.Sequential(
nn.Dropout(0.2),
nn.Linear(model.last_channel, 2) # [spoof, real]
)
if model_path:
state_dict = torch.load(model_path, map_location=self.device)
model.load_state_dict(state_dict)
return model.to(self.device)
def is_live_texture(self, face_roi: np.ndarray,
threshold: float = 0.7) -> tuple[bool, float]:
"""
Analisi texture CNN: classifica il volto come reale o spoof.
face_roi: crop del volto BGR [H, W, 3]
threshold: soglia per considerare il volto reale
Returns: (is_live, confidence_score)
"""
img_rgb = cv2.cvtColor(face_roi, cv2.COLOR_BGR2RGB)
tensor = self.transform(img_rgb).unsqueeze(0).to(self.device)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.softmax(logits, dim=1)
live_prob = probs[0, 1].item() # indice 1 = "real"
return live_prob >= threshold, live_prob
def compute_lbp_features(self, gray: np.ndarray,
radius: int = 3, n_points: int = 24) -> np.ndarray:
"""
Local Binary Pattern (LBP) texture descriptor.
Le stampe su carta hanno pattern LBP caratteristici (moaré).
Feature complementare alla CNN per robustezza.
"""
h, w = gray.shape
lbp = np.zeros_like(gray, dtype=np.uint8)
for r in range(radius, h - radius):
for c in range(radius, w - radius):
center = int(gray[r, c])
code = 0
for p in range(n_points):
angle = 2 * np.pi * p / n_points
nr = r - int(radius * np.sin(angle))
nc = c + int(radius * np.cos(angle))
nr = np.clip(nr, 0, h - 1)
nc = np.clip(nc, 0, w - 1)
code |= (int(gray[nr, nc]) >= center) << p
lbp[r, c] = code % 256
# Istogramma LBP come feature vector
hist, _ = np.histogram(lbp.ravel(), bins=256, range=(0, 256))
hist = hist.astype(float)
hist /= (hist.sum() + 1e-7)
return hist
def analyze_micro_motion(self, frame_bgr: np.ndarray) -> tuple[bool, float]:
"""
Analisi micro-movimento: rileva movimenti naturali del volto (micro-espressioni,
respiro, battito ciglia) assenti in foto/video statici.
Returns: (has_micro_motion, motion_score)
Un video replay di solito ha motion_score < 0.5
"""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
self.frame_buffer.append(gray)
if len(self.frame_buffer) > self.buffer_size:
self.frame_buffer.pop(0)
if len(self.frame_buffer) < 3:
return True, 1.0 # Non abbastanza frame, assumi live
# Optical flow su ultimi 3 frame
flow_magnitudes = []
for i in range(len(self.frame_buffer) - 2, len(self.frame_buffer) - 1):
flow = cv2.calcOpticalFlowFarneback(
self.frame_buffer[i], self.frame_buffer[i+1],
None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
flow_magnitudes.append(np.mean(magnitude))
avg_motion = np.mean(flow_magnitudes)
# Calcola varianza del motion (micro-movimenti irregolari = live)
if len(self.frame_buffer) >= self.buffer_size:
all_flows = []
for i in range(len(self.frame_buffer) - 1):
flow = cv2.calcOpticalFlowFarneback(
self.frame_buffer[i], self.frame_buffer[i+1],
None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
all_flows.append(np.mean(magnitude))
motion_variance = np.var(all_flows)
# Alta varianza = movimenti naturali irregolari = live
motion_score = min(1.0, motion_variance * 100)
else:
motion_score = 0.5
return motion_score > 0.3, float(motion_score)
def predict(self, face_roi: np.ndarray,
frame_bgr: np.ndarray) -> dict:
"""
Prediction combinata: texture CNN + micro-motion.
Fusion con regola AND conservativa per sicurezza.
"""
is_live_tex, tex_score = self.is_live_texture(face_roi)
has_motion, motion_score = self.analyze_micro_motion(frame_bgr)
# Logica di fusione: entrambi i segnali devono concordare
combined_score = 0.6 * tex_score + 0.4 * motion_score
is_live = is_live_tex and (motion_score > 0.2)
return {
'is_live': is_live,
'combined_score': combined_score,
'texture_score': tex_score,
'motion_score': motion_score,
'verdict': 'LIVE' if is_live else 'SPOOF'
}
# Pipeline completa: face recognition + liveness check
def secure_recognition_pipeline(recognizer, liveness_detector, frame_bgr):
"""
Pipeline sicura: prima verifica liveness, poi riconosce.
Se il volto non e live, non procedere con il riconoscimento.
"""
# 1. Rileva volti
faces = recognizer.app.get(frame_bgr)
results = []
for face in faces:
x1, y1, x2, y2 = face.bbox.astype(int)
face_roi = frame_bgr[max(0,y1):y2, max(0,x1):x2]
if face_roi.size == 0:
continue
# 2. Liveness check (PRIMA del riconoscimento)
liveness = liveness_detector.predict(face_roi, frame_bgr)
if not liveness['is_live']:
results.append({
'bbox': (x1, y1, x2, y2),
'verdict': 'SPOOF',
'identity': None,
'liveness': liveness
})
continue
# 3. Face recognition (solo se live)
from sklearn.preprocessing import normalize
emb = normalize(face.embedding.reshape(1, -1))[0]
identity, confidence = recognizer._match_embedding(emb)
results.append({
'bbox': (x1, y1, x2, y2),
'verdict': 'LIVE',
'identity': identity if confidence > 0.5 else 'unknown',
'confidence': confidence,
'liveness': liveness
})
return results
7. 확장 가능한 데이터베이스: 수백만 개의 임베딩을 위한 FAISS
scikit-learn KNN 시스템은 최대 10,000개의 임베딩까지 잘 작동합니다. 그 너머 임계값을 초과하면 무차별 대입 검색이 병목 현상이 됩니다. FAISS (Facebook AI 유사성 검색)은 대략적인 검색을 통해 수십억 개의 벡터로 확장됩니다. 마이크로초 단위.
import faiss
import numpy as np
import pickle
from pathlib import Path
class FAISSFaceDatabase:
"""
Database di embedding facciali scalabile con FAISS.
Ricerca approssimata (HNSW) per 1M+ embedding in < 1ms.
Installazione: pip install faiss-cpu (o faiss-gpu per GPU)
"""
def __init__(self, embedding_dim: int = 512,
db_path: str = 'faiss_face_db',
index_type: str = 'hnsw'):
"""
index_type:
'flat' - Ricerca esatta, O(n), per < 100K embedding
'hnsw' - Ricerca approssimata HNSW, per 100K - 10M embedding
'ivf' - Inverted File Index, per 10M+ embedding
"""
self.embedding_dim = embedding_dim
self.db_path = Path(db_path)
self.db_path.mkdir(exist_ok=True)
self.index_type = index_type
self.index = self._build_index()
self.id_to_name: dict[int, str] = {} # FAISS ID -> nome persona
self.next_id = 0
# Carica se esiste
if (self.db_path / 'index.faiss').exists():
self._load()
def _build_index(self) -> faiss.Index:
"""Costruisce indice FAISS appropriato."""
if self.index_type == 'flat':
# Ricerca esatta con distanza coseno (IP su vettori normalizzati)
index = faiss.IndexFlatIP(self.embedding_dim)
elif self.index_type == 'hnsw':
# HNSW: Hierarchical Navigable Small World
# M=32: connessioni per nodo (più alto = più accurato ma più RAM)
# ef_construction=200: qualità indice durante build
index = faiss.IndexHNSWFlat(self.embedding_dim, 32)
index.hnsw.efConstruction = 200
index.hnsw.efSearch = 64 # tradeoff accuratezza/velocità a query time
elif self.index_type == 'ivf':
# IVF: divide lo spazio in cluster, cerca solo nei cluster più vicini
n_lists = 100 # numero di cluster (sqrt(N) e una buona regola)
quantizer = faiss.IndexFlatIP(self.embedding_dim)
index = faiss.IndexIVFFlat(quantizer, self.embedding_dim, n_lists,
faiss.METRIC_INNER_PRODUCT)
else:
raise ValueError(f"Tipo indice sconosciuto: {self.index_type}")
return index
def add_embedding(self, name: str,
embedding: np.ndarray) -> int:
"""
Aggiunge un embedding al database.
Normalizza L2 per usare inner product come similarità coseno.
"""
emb_norm = embedding / (np.linalg.norm(embedding) + 1e-10)
emb_norm = emb_norm.astype(np.float32).reshape(1, -1)
# IVF richiede training prima del primo add
if self.index_type == 'ivf' and not self.index.is_trained:
print("Training IVF index... (richiede un batch iniziale)")
# In pratica: train con tutti gli embedding prima di usare
self.index.train(emb_norm)
self.index.add(emb_norm)
self.id_to_name[self.next_id] = name
self.next_id += 1
return self.next_id - 1
def add_person(self, name: str,
embeddings: list[np.ndarray]) -> int:
"""Aggiunge più embedding per la stessa persona."""
for emb in embeddings:
self.add_embedding(name, emb)
return len(embeddings)
def search(self, query_embedding: np.ndarray,
k: int = 1,
min_similarity: float = 0.5) -> list[dict]:
"""
Cerca i k embedding più simili nel database.
Returns: lista di {name, similarity, faiss_id}
Ordinate per similarità decrescente.
"""
if self.next_id == 0:
return []
emb_norm = query_embedding / (np.linalg.norm(query_embedding) + 1e-10)
emb_norm = emb_norm.astype(np.float32).reshape(1, -1)
actual_k = min(k, self.next_id)
similarities, indices = self.index.search(emb_norm, actual_k)
results = []
for sim, idx in zip(similarities[0], indices[0]):
if idx == -1 or sim < min_similarity:
continue
results.append({
'name': self.id_to_name.get(int(idx), 'unknown'),
'similarity': float(sim),
'faiss_id': int(idx)
})
return results
def identify(self, query_embedding: np.ndarray,
threshold: float = 0.5) -> tuple[str, float]:
"""Identifica la persona con maggiore similarità."""
results = self.search(query_embedding, k=3)
if not results:
return 'unknown', 0.0
# Voto di maggioranza tra top-3 (robustezza)
from collections import Counter
names = [r['name'] for r in results]
best_name = Counter(names).most_common(1)[0][0]
best_sim = max(r['similarity'] for r in results
if r['name'] == best_name)
if best_sim < threshold:
return 'unknown', best_sim
return best_name, best_sim
def save(self) -> None:
"""Salva indice FAISS e mapping ID->nome su disco."""
faiss.write_index(self.index,
str(self.db_path / 'index.faiss'))
with open(self.db_path / 'id_map.pkl', 'wb') as f:
pickle.dump({'id_to_name': self.id_to_name,
'next_id': self.next_id}, f)
print(f"Database salvato: {self.next_id} embedding")
def _load(self) -> None:
"""Carica indice FAISS e mapping da disco."""
self.index = faiss.read_index(str(self.db_path / 'index.faiss'))
with open(self.db_path / 'id_map.pkl', 'rb') as f:
data = pickle.load(f)
self.id_to_name = data['id_to_name']
self.next_id = data['next_id']
print(f"Database caricato: {self.next_id} embedding, "
f"{len(set(self.id_to_name.values()))} identità")
def stats(self) -> dict:
"""Statistiche del database."""
names = list(self.id_to_name.values())
from collections import Counter
name_counts = Counter(names)
return {
'total_embeddings': self.next_id,
'total_identities': len(name_counts),
'avg_embeddings_per_person': np.mean(list(name_counts.values()))
if name_counts else 0,
'index_type': self.index_type
}
# Benchmark: KNN sklearn vs FAISS per database di varie dimensioni
def benchmark_search_backends(n_identities: int = 10000,
embs_per_person: int = 5) -> None:
"""Confronta tempi di ricerca KNN vs FAISS."""
import time
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import normalize
n_total = n_identities * embs_per_person
dim = 512
# Genera embedding sintetici
embeddings = np.random.randn(n_total, dim).astype(np.float32)
embeddings = normalize(embeddings)
labels = np.repeat(np.arange(n_identities), embs_per_person)
query = np.random.randn(1, dim).astype(np.float32)
query = normalize(query)
# KNN sklearn
knn = KNeighborsClassifier(n_neighbors=3, metric='cosine', algorithm='brute')
knn.fit(embeddings, labels)
t0 = time.perf_counter()
for _ in range(100):
knn.predict(query)
knn_time = (time.perf_counter() - t0) / 100 * 1000
# FAISS HNSW
index = faiss.IndexHNSWFlat(dim, 32)
index.add(embeddings)
t0 = time.perf_counter()
for _ in range(100):
index.search(query, 3)
faiss_time = (time.perf_counter() - t0) / 100 * 1000
print(f"\nBenchmark ricerca ({n_total:,} embedding, dim={dim}):")
print(f" KNN sklearn: {knn_time:.2f} ms/query")
print(f" FAISS HNSW: {faiss_time:.3f} ms/query")
print(f" Speedup: {knn_time/faiss_time:.0f}x")
8. 윤리적, 법적 고려사항
주의: GDPR에 따른 생체 인식 데이터
얼굴 데이터는 GDPR(제9조)에 따른 생체 인식 데이터이며 그 처리 과정은 다음과 같습니다. 그리고 엄격한 제한이 적용됩니다. 이탈리아 및 EU:
- 필수 명시적 동의: 생체정보는 각 목적에 대한 구체적인 사전 동의 없이 수집될 수 없습니다.
- 데이터 최소화: 원본 이미지는 제외하고 필요한 임베딩만 유지하세요.
- 삭제 권리: 개인의 모든 데이터를 삭제하는 엔드포인트 구현
- 제한된 목적: 로그인 시스템을 위해 수집된 데이터는 마케팅 분석에 사용될 수 없습니다.
- 필수 바이어스 테스트: 배포하기 전에 다양한 인구통계 그룹(성별, 연령, 민족별 EER)에 대한 측정항목을 확인하세요.
- 공개 감시 없음: AI Act EU 2024 규정은 공공장소에서의 얼굴 인식을 거의 완전히 금지합니다.
바이어스 테스트 알고리즘
배포할 때마다 항상 인구통계학적 하위 평가를 수행하세요. 총 EER이 2%인 시스템은 하위 그룹에 대해 EER이 5%일 수 있습니다. 구체적 - 윤리적으로나 법적으로 용납될 수 없는 것입니다.
9. 모범 사례
생산 준비가 완료된 얼굴 인식 시스템을 위한 체크리스트
- 실시간에는 MediaPipe를 사용하고 높은 정밀도에는 MTCNN을 사용합니다. 경쟁하고 있지 않습니다. 상황에 따라 선택하세요.
- 1인당 최소 5-10개의 이미지: 다양한 조건(빛, 각도, 표현)에서. 이미지가 1개뿐이므로 시스템이 취약합니다.
- 항상 임베딩을 정규화하세요.
emb = emb / np.linalg.norm(emb). 정규화가 없으면 코사인 거리가 올바르게 작동하지 않습니다. - 실제 데이터에 대한 임계값을 보정합니다. 데이터 세트에서 검증하지 않고 0.5를 기본 임계값으로 사용하지 마십시오. 특정 시나리오에서 EER 계산
- 스푸핑 방지: 생체 감지 기능이 없는 시스템은 사진과 비디오에 취약합니다. 생체 감지 모델 통합(스푸핑 데이터 세트에 대해 미세 조정된 MobileNetV2)
- 시간이 지남에 따라 임베딩을 업데이트합니다. 사람들은 외모를 바꾼다. 정기적인 재등록 또는 임베딩의 온라인 업데이트를 계획합니다.
- 개인 정보 보호 로깅: 개인 데이터를 노출하지 않고 디버깅을 위해 ID 해싱을 사용하여 임베딩(이미지 아님)만 기록합니다.
결론
현대적이고 강력한 얼굴 인식 파이프라인은 모듈식이며 접근 가능합니다. 우리는 생산 준비 시스템의 모든 계층을 다루었습니다.
- 미디어파이프: CPU에 대한 초고속 감지로 리소스 제약이 있는 실시간 검색에 적합합니다. 최신 노트북에서 200FPS 이상.
- MTCNN + 얼굴 정렬: 정밀한 인식 시스템을 위한 견고한 기반. 5개의 랜드마크는 표준 112x112 정렬의 기본입니다.
- InsightFace/ArcFace: LFW에서 99.83% 정확도의 512D 임베딩 - pip를 통해 액세스할 수 있는 최첨단 기술입니다.
- ROC/EER로 보정된 임계값: 견고한 시스템과 신뢰할 수 없는 시스템의 차이. 유효성 검사 없이 기본적으로 0.5를 사용하지 마세요.
- 스푸핑 방지 + 활성 감지: 보안 시스템에 필수적입니다. 인쇄/재생 공격에 대한 저항을 위한 텍스처 CNN + 마이크로 모션 분석.
- 스케일링을 위한 FAISS: KNN scikit-learn(10K 임베딩)에서 FAISS HNSW(1M+ 임베딩)까지 100~1000배의 속도 향상.
- 윤리 및 GDPR: 선택사항이 아닌 기본 요구사항입니다. EU AI법 2024는 공공장소에서의 얼굴 인식을 금지합니다.
시리즈 탐색
시리즈 간 리소스
- MLOps: 프로덕션에서 모델 제공 - REST API에 모델 배포
- 고급 딥 러닝: 비전 트랜스포머







