顔の検出と認識: MediaPipe、MTCNN、FaceNet
顔認識は、最も成熟したコンピュータ ビジョン アプリケーションの 1 つであり、 広く普及: セキュリティ システムからスマートフォン、アクセス制御から分析まで 小売業の人口統計。ただし、慎重に正しく実装してください 正確さ、スピード、そして何よりも倫理への理解が必要です 含まれる技術の奥深さ。
この記事では、スタック全体、つまり顔検出 (顔の検索) について説明します。 画像)、顔の位置合わせ(幾何学的正規化)、顔の埋め込み(表現) ベクトル)と顔の検証/識別。リアルタイムには MediaPipe を使用します。 精度には MTCNN、認識には FaceNet/ArcFace を使用します。
何を学ぶか
- 顔検出と顔認識パイプライン: 違いと使用例
- MediaPipe 顔検出: 高速、軽量、クロスプラットフォーム
- MediaPipe Face Mesh: リアルタイムの 468 個の顔のランドマーク
- MTCNN: 正確な検出のためのマルチタスク カスケード CNN
- 顔の位置合わせ: ランドマークを使用した幾何学的正規化
- 顔の埋め込み: コンパクトな表現のための FaceNet および ArcFace
- 顔認証(1:1)と本人確認(1:N)
- 顔データベースを使用して認識システムをゼロから構築する
- 倫理的および法的考慮事項: GDPR、偏見、同意
1. 顔検出と顔認識: 完全なパイプライン
「顔認識」という用語は、要件を備えた 2 つの異なるタスクをひとまとめにすることがよくあります。 非常に異なる技術者:
フェイシャルパイプラインのコンポーネント
| 段階 | タスク | 出力 | 代表的なモデル |
|---|---|---|---|
| 検出 | 顔の位置を検索 | 境界ボックス | メディアパイプ、MTCNN、RetinaFace |
| 位置合わせ | ジオメトリを正規化する | 正規化された 112x112 画像 | ランドマークとの類似点 |
| 埋め込み | 機能記述子の抽出 | キャリア 128-512D | FaceNet、ArcFace、AdaFace |
| 検証 | 同一人物ですか? (1:1) | 類似性スコア、ブール値 | 埋め込み間のコサイン距離 |
| 識別 | 誰ですか? (1:N) | アイデンティティ + 自信 | 組み込みデータベースに関する KNN |
2. MediaPipe: 顔検出と顔メッシュ
メディアパイプ Google による最も実用的な顔検出フレームワーク CPU上でリアルタイム。 BlazeFace モデルは特に速度を重視して最適化されています モバイルおよび組み込みデバイスでは、最新のラップトップでは 200+ FPS に達します。
2.1 MediaPipe による顔検出
import mediapipe as mp
import cv2
import numpy as np
from dataclasses import dataclass
from typing import Optional
@dataclass
class FaceDetection:
"""Risultato di detection per un singolo volto."""
bbox: tuple[int, int, int, int] # x1, y1, x2, y2
confidence: float
keypoints: dict[str, tuple[int, int]] # nome -> (x, y) in pixel
class MediaPipeFaceDetector:
"""
Face detector basato su MediaPipe BlazeFace.
Velocissimo su CPU: 200+ FPS su immagini 640x480.
Ottimo per real-time, non per immagini ad alta densita di volti.
"""
KEYPOINT_NAMES = [
'right_eye', 'left_eye', 'nose_tip',
'mouth_center', 'right_ear_tragion', 'left_ear_tragion'
]
def __init__(self, min_confidence: float = 0.5,
model_selection: int = 0):
"""
model_selection:
0 = short range (entro 2m, più veloce)
1 = full range (fino a 5m, più accurato)
"""
self.mp_face = mp.solutions.face_detection
self.detector = self.mp_face.FaceDetection(
model_selection=model_selection,
min_detection_confidence=min_confidence
)
self.mp_draw = mp.solutions.drawing_utils
def detect(self, img_bgr: np.ndarray) -> list[FaceDetection]:
"""Rileva volti in un'immagine BGR."""
h, w = img_bgr.shape[:2]
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
results = self.detector.process(img_rgb)
faces = []
if not results.detections:
return faces
for detection in results.detections:
score = detection.score[0]
bbox_rel = detection.location_data.relative_bounding_box
# Coordinate relative -> pixel
x1 = max(0, int(bbox_rel.xmin * w))
y1 = max(0, int(bbox_rel.ymin * h))
x2 = min(w, int((bbox_rel.xmin + bbox_rel.width) * w))
y2 = min(h, int((bbox_rel.ymin + bbox_rel.height) * h))
# Keypoints (occhi, naso, bocca, orecchie)
keypoints = {}
for idx, name in enumerate(self.KEYPOINT_NAMES):
kp = detection.location_data.relative_keypoints[idx]
keypoints[name] = (int(kp.x * w), int(kp.y * h))
faces.append(FaceDetection(
bbox=(x1, y1, x2, y2),
confidence=float(score),
keypoints=keypoints
))
return faces
def draw(self, img_bgr: np.ndarray,
faces: list[FaceDetection]) -> np.ndarray:
"""Annota immagine con i risultati della detection."""
annotated = img_bgr.copy()
for face in faces:
x1, y1, x2, y2 = face.bbox
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(annotated, f"{face.confidence:.2f}",
(x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (0, 255, 0), 2)
# Disegna keypoints
for name, (kx, ky) in face.keypoints.items():
color = (0, 0, 255) if 'eye' in name else (255, 0, 0)
cv2.circle(annotated, (kx, ky), 4, color, -1)
return annotated
# Utilizzo: detection su webcam in real-time
def run_face_detection_webcam() -> None:
detector = MediaPipeFaceDetector(min_confidence=0.5)
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
faces = detector.detect(frame)
annotated = detector.draw(frame, faces)
cv2.putText(annotated, f"Faces: {len(faces)}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('MediaPipe Face Detection', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
2.2 顔メッシュ: リアルタイムの 468 ランドマーク
モデル フェイスメッシュ の MediaPipe が 468 個の 3D ランドマーク (x、y、z) を抽出します 顔の。顔の位置合わせ、感情推定、AR フィルター、視線、 眠気の検出 (目のアスペクト比)。
import mediapipe as mp
import cv2
import numpy as np
class FaceMeshAnalyzer:
"""
MediaPipe Face Mesh: 468 landmark 3D in real-time.
Utilita incluse: eye aspect ratio (sonnolenza), head pose, ecc.
"""
# Indici dei landmark MediaPipe per occhi
LEFT_EYE_IDX = [362, 385, 387, 263, 373, 380]
RIGHT_EYE_IDX = [33, 160, 158, 133, 153, 144]
def __init__(self, max_faces: int = 1,
refine_landmarks: bool = True):
"""
refine_landmarks=True: aggiunge landmark attorno agli occhi
e alle iridi (468 -> 478 punti totali).
"""
self.mp_mesh = mp.solutions.face_mesh
self.face_mesh = self.mp_mesh.FaceMesh(
max_num_faces=max_faces,
refine_landmarks=refine_landmarks,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
self.mp_draw = mp.solutions.drawing_utils
self.mp_styles = mp.solutions.drawing_styles
def process(self, img_bgr: np.ndarray) -> Optional[list]:
"""Processa immagine e restituisce lista di landmark per ogni volto."""
h, w = img_bgr.shape[:2]
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(img_rgb)
if not results.multi_face_landmarks:
return None
all_faces_lm = []
for face_landmarks in results.multi_face_landmarks:
# Converti da coordinate normalizzate a pixel
lm_pixels = []
for lm in face_landmarks.landmark:
lm_pixels.append((int(lm.x * w), int(lm.y * h), lm.z))
all_faces_lm.append(lm_pixels)
return all_faces_lm
def eye_aspect_ratio(self, landmarks: list,
eye_indices: list) -> float:
"""
Eye Aspect Ratio (EAR) - indicatore di sonnolenza.
EAR < 0.2 per 20+ frame consecutivi = occhio chiuso.
Formula: EAR = (|p2-p6| + |p3-p5|) / (2 * |p1-p4|)
"""
pts = [np.array(landmarks[i][:2]) for i in eye_indices]
# Distanze verticali
A = np.linalg.norm(pts[1] - pts[5])
B = np.linalg.norm(pts[2] - pts[4])
# Distanza orizzontale
C = np.linalg.norm(pts[0] - pts[3])
return (A + B) / (2.0 * C) if C > 0 else 0.0
def draw_mesh(self, img_bgr: np.ndarray,
results_raw) -> np.ndarray:
"""Disegna la mesh completa con stili MediaPipe predefiniti."""
annotated = img_bgr.copy()
if results_raw and results_raw.multi_face_landmarks:
for face_lm in results_raw.multi_face_landmarks:
self.mp_draw.draw_landmarks(
image=annotated,
landmark_list=face_lm,
connections=self.mp_mesh.FACEMESH_TESSELATION,
landmark_drawing_spec=None,
connection_drawing_spec=self.mp_styles
.get_default_face_mesh_tesselation_style()
)
return annotated
# Rilevamento sonnolenza con Eye Aspect Ratio
def drowsiness_detector(threshold: float = 0.22,
consec_frames: int = 20) -> None:
"""Sistema di alert sonnolenza basato su EAR."""
analyzer = FaceMeshAnalyzer(max_faces=1)
cap = cv2.VideoCapture(0)
ear_counter = 0
alert_active = False
while True:
ret, frame = cap.read()
if not ret:
break
landmarks_list = analyzer.process(frame)
if landmarks_list:
lms = landmarks_list[0] # primo volto
ear_l = analyzer.eye_aspect_ratio(lms, analyzer.LEFT_EYE_IDX)
ear_r = analyzer.eye_aspect_ratio(lms, analyzer.RIGHT_EYE_IDX)
avg_ear = (ear_l + ear_r) / 2.0
if avg_ear < threshold:
ear_counter += 1
if ear_counter >= consec_frames:
alert_active = True
cv2.putText(frame, "ALERT: SONNOLENZA!",
(50, 200), cv2.FONT_HERSHEY_SIMPLEX,
1.5, (0, 0, 255), 3)
else:
ear_counter = 0
alert_active = False
cv2.putText(frame, f"EAR: {avg_ear:.3f}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.8, (0, 255, 0), 2)
cv2.imshow('Drowsiness Detector', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
3. MTCNN: マルチタスクカスケード CNN
MTCNN バランスをとる 3 段階の検出器 (P-Net、R-Net、O-Net) スピードと正確さ。システムにおける正確な検出のゴールドスタンダードです 認識: 5 つの必要なランドマーク (目、鼻、口角) で顔を識別します。 顔の調整に。 MediaPipe よりも遅いですが、過酷な条件下ではより堅牢です。
from mtcnn import MTCNN
import cv2
import numpy as np
from PIL import Image
class MTCNNFaceProcessor:
"""
MTCNN per detection precisa + face alignment.
Produce immagini 112x112 normalizzate, ottimali per FaceNet/ArcFace.
"""
def __init__(self, min_face_size: int = 40,
thresholds: list = None,
scale_factor: float = 0.709):
self.detector = MTCNN(
min_face_size=min_face_size,
thresholds=thresholds or [0.6, 0.7, 0.7],
scale_factor=scale_factor
)
def detect_and_align(self, img_bgr: np.ndarray,
output_size: int = 112) -> list[np.ndarray]:
"""
Rileva volti e li restituisce allineati (112x112 default).
L'allineamento usa una trasformazione affine sui 5 landmark
per portare gli occhi in posizione canonica.
Returns: lista di immagini volto allineate (BGR, float32 [0,1])
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
detections = self.detector.detect_faces(img_rgb)
aligned_faces = []
for det in detections:
if det['confidence'] < 0.90:
continue
keypoints = det['keypoints']
src_pts = np.array([
keypoints['left_eye'],
keypoints['right_eye'],
keypoints['nose'],
keypoints['mouth_left'],
keypoints['mouth_right']
], dtype=np.float32)
# Punti di destinazione canonici per 112x112
dst_pts = np.array([
[38.2946, 51.6963],
[73.5318, 51.6963],
[56.0252, 71.7366],
[41.5493, 92.3655],
[70.7299, 92.3655]
], dtype=np.float32)
# Scale per output_size diversi da 112
scale = output_size / 112.0
dst_pts *= scale
# Trasformazione affine -> immagine allineata
M = cv2.estimateAffinePartial2D(src_pts, dst_pts)[0]
aligned = cv2.warpAffine(img_bgr, M, (output_size, output_size))
aligned_faces.append(aligned.astype(np.float32) / 255.0)
return aligned_faces
def detect_with_info(self, img_bgr: np.ndarray) -> list[dict]:
"""Rileva volti con tutte le informazioni MTCNN."""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
detections = self.detector.detect_faces(img_rgb)
results = []
h, w = img_bgr.shape[:2]
for det in detections:
x, y, bw, bh = det['box']
x1 = max(0, x)
y1 = max(0, y)
x2 = min(w, x + bw)
y2 = min(h, y + bh)
results.append({
'bbox': (x1, y1, x2, y2),
'confidence': det['confidence'],
'keypoints': det['keypoints']
})
return results
4. 顔認識: FaceNet と ArcFace
検出と位置合わせの後、認識システムの中心となるのは、 顔埋め込みモデル: 画像を変換するニューラル ネットワーク 128 ~ 512 次元のベクトルの 112x112。同一人物の顔が生み出す 空間内の近くのベクトル。さまざまな顔が遠くにあります。
顔埋め込みモデルの比較
| モデル | 薄暗い埋め込み | 損失 | LFW準拠 | サイズ |
|---|---|---|---|---|
| フェイスネット (Google) | 128 | トリプレット損失 | 99.63% | 90MB |
| ArcFace (インサイトフェイス) | 512 | アークフェイスロス | 99.83% | 249MB |
| エイダフェイス | 512 | AdaFace の損失 | 99.82% | 249MB |
| MobileFaceNet (エッジ) | 128 | アークフェイスロス | 99.55% | 4MB |
import insightface
from insightface.app import FaceAnalysis
import numpy as np
import cv2
import pickle
from pathlib import Path
from sklearn.preprocessing import normalize
from sklearn.neighbors import KNeighborsClassifier
class FaceRecognitionSystem:
"""
Sistema completo di face recognition basato su InsightFace (ArcFace).
Supporta registrazione di nuove identità e riconoscimento real-time.
Installazione: pip install insightface onnxruntime scikit-learn
"""
def __init__(self, db_path: str = 'face_db.pkl',
recognition_threshold: float = 0.5):
"""
recognition_threshold: soglia coseno per considerare un match
(0.5 e un buon default per ArcFace 512D)
"""
# Inizializza FaceAnalysis (detection + embedding in un'unica API)
self.app = FaceAnalysis(
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
self.app.prepare(ctx_id=0, det_size=(640, 640))
self.db_path = Path(db_path)
self.threshold = recognition_threshold
self.database: dict[str, list[np.ndarray]] = {}
self.knn: Optional[KNeighborsClassifier] = None
if self.db_path.exists():
self._load_database()
def register_person(self, name: str,
images: list[np.ndarray],
max_faces_per_image: int = 1) -> int:
"""
Registra una nuova persona nel database.
name: identificatore della persona
images: lista di immagini BGR (almeno 5 per robustezza)
Returns: numero di embedding registrati con successo
"""
embeddings = []
for img in images:
faces = self.app.get(img)
if not faces:
continue
# Prendi il volto più grande (per immagini con una persona)
face = max(faces, key=lambda f: (f.bbox[2]-f.bbox[0]) *
(f.bbox[3]-f.bbox[1]))
emb = normalize(face.embedding.reshape(1, -1))[0]
embeddings.append(emb)
if len(embeddings) >= max_faces_per_image * len(images):
break
if not embeddings:
print(f"[WARN] Nessun volto rilevato per {name}")
return 0
if name not in self.database:
self.database[name] = []
self.database[name].extend(embeddings)
self._rebuild_knn()
self._save_database()
print(f"Registrato {name}: {len(embeddings)} embedding")
return len(embeddings)
def recognize(self, img_bgr: np.ndarray) -> list[dict]:
"""
Riconosce tutti i volti in un'immagine.
Returns: lista di dict con bbox, identity, confidence per ogni volto
"""
faces = self.app.get(img_bgr)
results = []
for face in faces:
emb = normalize(face.embedding.reshape(1, -1))[0]
identity, confidence = self._match_embedding(emb)
x1, y1, x2, y2 = face.bbox.astype(int)
results.append({
'bbox': (x1, y1, x2, y2),
'identity': identity,
'confidence': confidence,
'is_known': confidence >= self.threshold
})
return results
def _match_embedding(self, emb: np.ndarray) -> tuple[str, float]:
"""Trova la corrispondenza migliore nel database."""
if not self.database or self.knn is None:
return ('unknown', 0.0)
# Usa KNN con metrica coseno (1 - cosine_similarity = cosine_distance)
dist, idx = self.knn.kneighbors([emb], n_neighbors=1)
labels = [name for name, embs in self.database.items()
for _ in embs]
best_name = labels[idx[0][0]]
similarity = 1.0 - dist[0][0] # da distanza coseno a similarità
return (best_name, float(similarity))
def _rebuild_knn(self) -> None:
"""Ricostruisce il classificatore KNN dopo aggiornamenti al DB."""
all_embs = []
all_labels = []
for name, embs in self.database.items():
all_embs.extend(embs)
all_labels.extend([name] * len(embs))
if len(all_embs) < 2:
return
self.knn = KNeighborsClassifier(
n_neighbors=min(3, len(all_embs)),
metric='cosine',
algorithm='brute'
)
self.knn.fit(np.array(all_embs), all_labels)
def _save_database(self) -> None:
with open(self.db_path, 'wb') as f:
pickle.dump(self.database, f)
def _load_database(self) -> None:
with open(self.db_path, 'rb') as f:
self.database = pickle.load(f)
self._rebuild_knn()
print(f"Database caricato: {len(self.database)} identità")
def annotate(self, img_bgr: np.ndarray,
results: list[dict]) -> np.ndarray:
"""Annota l'immagine con i risultati del riconoscimento."""
annotated = img_bgr.copy()
for r in results:
x1, y1, x2, y2 = r['bbox']
color = (0, 255, 0) if r['is_known'] else (0, 0, 255)
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = (f"{r['identity']} ({r['confidence']:.2f})"
if r['is_known'] else "Unknown")
cv2.putText(annotated, label, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return annotated
5. 顔検証: しきい値と ROC 曲線
La 顔認証 質問の答えは次のとおりです。「これらの 2 枚の写真は、 同じ人?」これは、識別 (1:N) とは異なり、1:1 のマッチングの問題です。 重要なのは、ROC 曲線分析によって正しい類似性しきい値を選択することです。
import numpy as np
from sklearn.metrics import roc_curve, auc
import matplotlib
matplotlib.use('Agg') # Per ambienti senza display
import matplotlib.pyplot as plt
def cosine_similarity(emb1: np.ndarray, emb2: np.ndarray) -> float:
"""Similarità coseno tra due embedding normalizzati."""
emb1_n = emb1 / (np.linalg.norm(emb1) + 1e-10)
emb2_n = emb2 / (np.linalg.norm(emb2) + 1e-10)
return float(np.dot(emb1_n, emb2_n))
def find_optimal_threshold(same_person_pairs: list[tuple],
diff_person_pairs: list[tuple]) -> dict:
"""
Trova la soglia ottimale analizzando la ROC curve.
same_person_pairs: lista di coppie (emb1, emb2) della stessa persona
diff_person_pairs: lista di coppie (emb1, emb2) di persone diverse
Returns: {threshold, eer, auc, far, frr}
"""
scores = []
labels = []
for emb1, emb2 in same_person_pairs:
scores.append(cosine_similarity(emb1, emb2))
labels.append(1) # stessa persona
for emb1, emb2 in diff_person_pairs:
scores.append(cosine_similarity(emb1, emb2))
labels.append(0) # persone diverse
scores_arr = np.array(scores)
labels_arr = np.array(labels)
# ROC curve
fpr, tpr, thresholds = roc_curve(labels_arr, scores_arr)
roc_auc = auc(fpr, tpr)
# Equal Error Rate (EER): punto dove FAR = FRR
fnr = 1 - tpr
eer_idx = np.argmin(np.abs(fpr - fnr))
eer = (fpr[eer_idx] + fnr[eer_idx]) / 2.0
optimal_threshold = thresholds[eer_idx]
# Metriche a soglia ottimale
predictions = (scores_arr >= optimal_threshold).astype(int)
tp = np.sum((predictions == 1) & (labels_arr == 1))
fp = np.sum((predictions == 1) & (labels_arr == 0))
fn = np.sum((predictions == 0) & (labels_arr == 1))
tn = np.sum((predictions == 0) & (labels_arr == 0))
far = fp / (fp + tn) if (fp + tn) > 0 else 0 # False Accept Rate
frr = fn / (fn + tp) if (fn + tp) > 0 else 0 # False Reject Rate
print(f"=== Face Verification Metrics ===")
print(f"AUC-ROC: {roc_auc:.4f}")
print(f"EER: {eer:.4f} ({eer*100:.2f}%)")
print(f"Soglia ottimale: {optimal_threshold:.4f}")
print(f"FAR @ EER: {far:.4f} ({far*100:.2f}%)")
print(f"FRR @ EER: {frr:.4f} ({frr*100:.2f}%)")
return {
'threshold': float(optimal_threshold),
'eer': float(eer),
'auc': float(roc_auc),
'far': float(far),
'frr': float(frr)
}
6. スプーフィング対策と生存検出
顔認証システムなし 生存検出 そして脆弱な なりすまし攻撃に対抗するには、印刷した写真、スマートフォンのビデオ、マスクだけで十分です 3D はほとんどの検出器を騙します。生体検出により顔を識別する 人工物からの本物。
スプーフィング攻撃の種類
| 攻撃タイプ | 説明 | 防御難易度 | 緩和手法 |
|---|---|---|---|
| 印刷攻撃 | 紙/光沢紙に印刷された写真 | 低い | テクスチャ解析、モアレパターン検出 |
| リプレイアタック | 画面上の顔のビデオ | 平均 | 画面反射検出、3D深度 |
| 3Dマスク | リアルな3Dプリントマスク | 高い | IRセンサー、チャレンジレスポンス、マイクロモーション |
| ディープフェイク動画 | AI生成の合成動画 | 非常に高い | ディープフェイク検出器、血流解析 |
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from typing import Optional
class LivenessDetector:
"""
Sistema di liveness detection basato su due segnali complementari:
1. Texture analysis (LBP-based + CNN) - rileva print attacks
2. Micro-motion analysis - rileva replay attacks (video statici non hanno micro-movimenti)
Per deployment serio, considera: SilentFace, FAS-SGTD, CentralDiff-CNN
"""
def __init__(self, model_path: Optional[str] = None,
device: str = 'auto'):
self.device = torch.device(
'cuda' if torch.cuda.is_available() and device == 'auto'
else 'cpu'
)
# Modello CNN per texture analysis (fine-tuned su dataset anti-spoofing)
# Dataset: CelebA-Spoof, OULU-NPU, MSU-MFSD
self.model = self._build_model(model_path)
self.model.eval()
self.transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# Buffer per micro-motion analysis
self.frame_buffer: list[np.ndarray] = []
self.buffer_size = 10 # 10 frame ~= 333ms @ 30FPS
def _build_model(self, model_path: Optional[str]) -> nn.Module:
"""
MobileNetV2 fine-tuned per binary classification: real vs spoof.
MobileNetV2 perchè e leggero (3.4M params) e veloce su CPU/edge.
"""
model = models.mobilenet_v2(pretrained=False)
model.classifier = nn.Sequential(
nn.Dropout(0.2),
nn.Linear(model.last_channel, 2) # [spoof, real]
)
if model_path:
state_dict = torch.load(model_path, map_location=self.device)
model.load_state_dict(state_dict)
return model.to(self.device)
def is_live_texture(self, face_roi: np.ndarray,
threshold: float = 0.7) -> tuple[bool, float]:
"""
Analisi texture CNN: classifica il volto come reale o spoof.
face_roi: crop del volto BGR [H, W, 3]
threshold: soglia per considerare il volto reale
Returns: (is_live, confidence_score)
"""
img_rgb = cv2.cvtColor(face_roi, cv2.COLOR_BGR2RGB)
tensor = self.transform(img_rgb).unsqueeze(0).to(self.device)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.softmax(logits, dim=1)
live_prob = probs[0, 1].item() # indice 1 = "real"
return live_prob >= threshold, live_prob
def compute_lbp_features(self, gray: np.ndarray,
radius: int = 3, n_points: int = 24) -> np.ndarray:
"""
Local Binary Pattern (LBP) texture descriptor.
Le stampe su carta hanno pattern LBP caratteristici (moaré).
Feature complementare alla CNN per robustezza.
"""
h, w = gray.shape
lbp = np.zeros_like(gray, dtype=np.uint8)
for r in range(radius, h - radius):
for c in range(radius, w - radius):
center = int(gray[r, c])
code = 0
for p in range(n_points):
angle = 2 * np.pi * p / n_points
nr = r - int(radius * np.sin(angle))
nc = c + int(radius * np.cos(angle))
nr = np.clip(nr, 0, h - 1)
nc = np.clip(nc, 0, w - 1)
code |= (int(gray[nr, nc]) >= center) << p
lbp[r, c] = code % 256
# Istogramma LBP come feature vector
hist, _ = np.histogram(lbp.ravel(), bins=256, range=(0, 256))
hist = hist.astype(float)
hist /= (hist.sum() + 1e-7)
return hist
def analyze_micro_motion(self, frame_bgr: np.ndarray) -> tuple[bool, float]:
"""
Analisi micro-movimento: rileva movimenti naturali del volto (micro-espressioni,
respiro, battito ciglia) assenti in foto/video statici.
Returns: (has_micro_motion, motion_score)
Un video replay di solito ha motion_score < 0.5
"""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
self.frame_buffer.append(gray)
if len(self.frame_buffer) > self.buffer_size:
self.frame_buffer.pop(0)
if len(self.frame_buffer) < 3:
return True, 1.0 # Non abbastanza frame, assumi live
# Optical flow su ultimi 3 frame
flow_magnitudes = []
for i in range(len(self.frame_buffer) - 2, len(self.frame_buffer) - 1):
flow = cv2.calcOpticalFlowFarneback(
self.frame_buffer[i], self.frame_buffer[i+1],
None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
flow_magnitudes.append(np.mean(magnitude))
avg_motion = np.mean(flow_magnitudes)
# Calcola varianza del motion (micro-movimenti irregolari = live)
if len(self.frame_buffer) >= self.buffer_size:
all_flows = []
for i in range(len(self.frame_buffer) - 1):
flow = cv2.calcOpticalFlowFarneback(
self.frame_buffer[i], self.frame_buffer[i+1],
None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
all_flows.append(np.mean(magnitude))
motion_variance = np.var(all_flows)
# Alta varianza = movimenti naturali irregolari = live
motion_score = min(1.0, motion_variance * 100)
else:
motion_score = 0.5
return motion_score > 0.3, float(motion_score)
def predict(self, face_roi: np.ndarray,
frame_bgr: np.ndarray) -> dict:
"""
Prediction combinata: texture CNN + micro-motion.
Fusion con regola AND conservativa per sicurezza.
"""
is_live_tex, tex_score = self.is_live_texture(face_roi)
has_motion, motion_score = self.analyze_micro_motion(frame_bgr)
# Logica di fusione: entrambi i segnali devono concordare
combined_score = 0.6 * tex_score + 0.4 * motion_score
is_live = is_live_tex and (motion_score > 0.2)
return {
'is_live': is_live,
'combined_score': combined_score,
'texture_score': tex_score,
'motion_score': motion_score,
'verdict': 'LIVE' if is_live else 'SPOOF'
}
# Pipeline completa: face recognition + liveness check
def secure_recognition_pipeline(recognizer, liveness_detector, frame_bgr):
"""
Pipeline sicura: prima verifica liveness, poi riconosce.
Se il volto non e live, non procedere con il riconoscimento.
"""
# 1. Rileva volti
faces = recognizer.app.get(frame_bgr)
results = []
for face in faces:
x1, y1, x2, y2 = face.bbox.astype(int)
face_roi = frame_bgr[max(0,y1):y2, max(0,x1):x2]
if face_roi.size == 0:
continue
# 2. Liveness check (PRIMA del riconoscimento)
liveness = liveness_detector.predict(face_roi, frame_bgr)
if not liveness['is_live']:
results.append({
'bbox': (x1, y1, x2, y2),
'verdict': 'SPOOF',
'identity': None,
'liveness': liveness
})
continue
# 3. Face recognition (solo se live)
from sklearn.preprocessing import normalize
emb = normalize(face.embedding.reshape(1, -1))[0]
identity, confidence = recognizer._match_embedding(emb)
results.append({
'bbox': (x1, y1, x2, y2),
'verdict': 'LIVE',
'identity': identity if confidence > 0.5 else 'unknown',
'confidence': confidence,
'liveness': liveness
})
return results
7. スケーラブルなデータベース: 何百万もの埋め込みに対応する FAISS
scikit-learn KNN システムは、最大 10,000 個の埋め込みまで適切に動作します。それを超えて しきい値を超えると、総当たり検索がボトルネックになります。 フェイス (Facebook AI 類似性検索) 近似検索により数十億のベクトルに拡張 マイクロ秒単位で。
import faiss
import numpy as np
import pickle
from pathlib import Path
class FAISSFaceDatabase:
"""
Database di embedding facciali scalabile con FAISS.
Ricerca approssimata (HNSW) per 1M+ embedding in < 1ms.
Installazione: pip install faiss-cpu (o faiss-gpu per GPU)
"""
def __init__(self, embedding_dim: int = 512,
db_path: str = 'faiss_face_db',
index_type: str = 'hnsw'):
"""
index_type:
'flat' - Ricerca esatta, O(n), per < 100K embedding
'hnsw' - Ricerca approssimata HNSW, per 100K - 10M embedding
'ivf' - Inverted File Index, per 10M+ embedding
"""
self.embedding_dim = embedding_dim
self.db_path = Path(db_path)
self.db_path.mkdir(exist_ok=True)
self.index_type = index_type
self.index = self._build_index()
self.id_to_name: dict[int, str] = {} # FAISS ID -> nome persona
self.next_id = 0
# Carica se esiste
if (self.db_path / 'index.faiss').exists():
self._load()
def _build_index(self) -> faiss.Index:
"""Costruisce indice FAISS appropriato."""
if self.index_type == 'flat':
# Ricerca esatta con distanza coseno (IP su vettori normalizzati)
index = faiss.IndexFlatIP(self.embedding_dim)
elif self.index_type == 'hnsw':
# HNSW: Hierarchical Navigable Small World
# M=32: connessioni per nodo (più alto = più accurato ma più RAM)
# ef_construction=200: qualità indice durante build
index = faiss.IndexHNSWFlat(self.embedding_dim, 32)
index.hnsw.efConstruction = 200
index.hnsw.efSearch = 64 # tradeoff accuratezza/velocità a query time
elif self.index_type == 'ivf':
# IVF: divide lo spazio in cluster, cerca solo nei cluster più vicini
n_lists = 100 # numero di cluster (sqrt(N) e una buona regola)
quantizer = faiss.IndexFlatIP(self.embedding_dim)
index = faiss.IndexIVFFlat(quantizer, self.embedding_dim, n_lists,
faiss.METRIC_INNER_PRODUCT)
else:
raise ValueError(f"Tipo indice sconosciuto: {self.index_type}")
return index
def add_embedding(self, name: str,
embedding: np.ndarray) -> int:
"""
Aggiunge un embedding al database.
Normalizza L2 per usare inner product come similarità coseno.
"""
emb_norm = embedding / (np.linalg.norm(embedding) + 1e-10)
emb_norm = emb_norm.astype(np.float32).reshape(1, -1)
# IVF richiede training prima del primo add
if self.index_type == 'ivf' and not self.index.is_trained:
print("Training IVF index... (richiede un batch iniziale)")
# In pratica: train con tutti gli embedding prima di usare
self.index.train(emb_norm)
self.index.add(emb_norm)
self.id_to_name[self.next_id] = name
self.next_id += 1
return self.next_id - 1
def add_person(self, name: str,
embeddings: list[np.ndarray]) -> int:
"""Aggiunge più embedding per la stessa persona."""
for emb in embeddings:
self.add_embedding(name, emb)
return len(embeddings)
def search(self, query_embedding: np.ndarray,
k: int = 1,
min_similarity: float = 0.5) -> list[dict]:
"""
Cerca i k embedding più simili nel database.
Returns: lista di {name, similarity, faiss_id}
Ordinate per similarità decrescente.
"""
if self.next_id == 0:
return []
emb_norm = query_embedding / (np.linalg.norm(query_embedding) + 1e-10)
emb_norm = emb_norm.astype(np.float32).reshape(1, -1)
actual_k = min(k, self.next_id)
similarities, indices = self.index.search(emb_norm, actual_k)
results = []
for sim, idx in zip(similarities[0], indices[0]):
if idx == -1 or sim < min_similarity:
continue
results.append({
'name': self.id_to_name.get(int(idx), 'unknown'),
'similarity': float(sim),
'faiss_id': int(idx)
})
return results
def identify(self, query_embedding: np.ndarray,
threshold: float = 0.5) -> tuple[str, float]:
"""Identifica la persona con maggiore similarità."""
results = self.search(query_embedding, k=3)
if not results:
return 'unknown', 0.0
# Voto di maggioranza tra top-3 (robustezza)
from collections import Counter
names = [r['name'] for r in results]
best_name = Counter(names).most_common(1)[0][0]
best_sim = max(r['similarity'] for r in results
if r['name'] == best_name)
if best_sim < threshold:
return 'unknown', best_sim
return best_name, best_sim
def save(self) -> None:
"""Salva indice FAISS e mapping ID->nome su disco."""
faiss.write_index(self.index,
str(self.db_path / 'index.faiss'))
with open(self.db_path / 'id_map.pkl', 'wb') as f:
pickle.dump({'id_to_name': self.id_to_name,
'next_id': self.next_id}, f)
print(f"Database salvato: {self.next_id} embedding")
def _load(self) -> None:
"""Carica indice FAISS e mapping da disco."""
self.index = faiss.read_index(str(self.db_path / 'index.faiss'))
with open(self.db_path / 'id_map.pkl', 'rb') as f:
data = pickle.load(f)
self.id_to_name = data['id_to_name']
self.next_id = data['next_id']
print(f"Database caricato: {self.next_id} embedding, "
f"{len(set(self.id_to_name.values()))} identità")
def stats(self) -> dict:
"""Statistiche del database."""
names = list(self.id_to_name.values())
from collections import Counter
name_counts = Counter(names)
return {
'total_embeddings': self.next_id,
'total_identities': len(name_counts),
'avg_embeddings_per_person': np.mean(list(name_counts.values()))
if name_counts else 0,
'index_type': self.index_type
}
# Benchmark: KNN sklearn vs FAISS per database di varie dimensioni
def benchmark_search_backends(n_identities: int = 10000,
embs_per_person: int = 5) -> None:
"""Confronta tempi di ricerca KNN vs FAISS."""
import time
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import normalize
n_total = n_identities * embs_per_person
dim = 512
# Genera embedding sintetici
embeddings = np.random.randn(n_total, dim).astype(np.float32)
embeddings = normalize(embeddings)
labels = np.repeat(np.arange(n_identities), embs_per_person)
query = np.random.randn(1, dim).astype(np.float32)
query = normalize(query)
# KNN sklearn
knn = KNeighborsClassifier(n_neighbors=3, metric='cosine', algorithm='brute')
knn.fit(embeddings, labels)
t0 = time.perf_counter()
for _ in range(100):
knn.predict(query)
knn_time = (time.perf_counter() - t0) / 100 * 1000
# FAISS HNSW
index = faiss.IndexHNSWFlat(dim, 32)
index.add(embeddings)
t0 = time.perf_counter()
for _ in range(100):
index.search(query, 3)
faiss_time = (time.perf_counter() - t0) / 100 * 1000
print(f"\nBenchmark ricerca ({n_total:,} embedding, dim={dim}):")
print(f" KNN sklearn: {knn_time:.2f} ms/query")
print(f" FAISS HNSW: {faiss_time:.3f} ms/query")
print(f" Speedup: {knn_time/faiss_time:.0f}x")
8. 倫理的および法的考慮事項
注意: GDPR に基づく生体認証データ
顔データは、GDPR (第 9 条) およびその処理に基づく生体データです。 そして厳しい制限を受けることになります。イタリアとEUでは:
- 必須の明示的な同意: 目的ごとに特定のインフォームドコンセントがなければ生体認証データを収集することはできません
- データの最小化: 元の画像ではなく、必要な埋め込みのみを保持します
- 消去する権利: 個人のデータをすべて削除するエンドポイントを実装する
- 限定された目的: ログイン システム用に収集されたデータはマーケティング分析には使用できません
- 必須のバイアステスト: 導入前に、さまざまな人口統計グループの指標 (性別、年齢、民族別の EER) を確認してください。
- 公衆監視なし: AI法EU 2024年規制により、公共空間での顔認識がほぼ完全に禁止される
バイアステストアルゴリズム
各展開の前に、必ず人口統計のサブアセスメントを実行してください。 総 EER が 2% のシステムでは、サブグループの EER が 5% になる可能性があります 特定 - これは倫理的にも法的にも受け入れられません。
9. ベストプラクティス
実稼働対応の顔認識システムのチェックリスト
- リアルタイムには MediaPipe を使用し、高精度には MTCNN を使用します。 競合するものではありません - コンテキストに基づいて選択してください
- 1 人あたり最低 5 ~ 10 枚の画像: さまざまな条件(光、角度、表現)で。画像が 1 つしかないため、システムは脆弱です
- 埋め込みを常に正規化します。
emb = emb / np.linalg.norm(emb)。正規化しないとコサイン距離は正しく機能しません - 実際のデータでしきい値を調整します。 データセットで検証せずに、デフォルトのしきい値として 0.5 を使用しないでください。特定のシナリオで EER を計算する
- スプーフィング対策: 活性検出機能のないシステムは、写真やビデオに対して脆弱です。活性検出モデルを統合 (スプーフィング データセットに合わせて微調整された MobileNetV2)
- 時間の経過とともに埋め込みを更新します。 人は見た目を変える。埋め込みの定期的な再登録またはオンライン更新を計画する
- プライバシーを保ったロギング: 個人データを公開せずにデバッグ用の ID ハッシュを使用して、埋め込み (画像ではなく) のみをログに記録します
結論
モジュール式でアクセスしやすい最新の堅牢な顔認識パイプライン。私たちは持っています 本番環境に対応したシステムのすべての層をカバーしました。
- メディアパイプ: CPU での超高速検出。リソース制約のあるリアルタイムに最適です。最新のラップトップで 200+ FPS。
- MTCNN + フェイスアライメント: 正確な認識システムのための強固な基盤。 5 つのランドマークは、標準的な 112x112 の位置合わせの基本です。
- インサイトフェイス/アークフェイス: LFW で 99.83% の精度を備えた 512D 埋め込み - pip 経由でアクセスできる最先端の技術。
- ROC/EERで閾値を校正: 堅牢なシステムと信頼性の低いシステムの違い。検証せずにデフォルトで 0.5 を使用しないでください。
- スプーフィング対策 + 生存検出: セキュリティ システムに不可欠です。プリント/リプレイ攻撃に対する耐性のためのテクスチャ CNN + マイクロモーション分析。
- スケーリング用の FAISS: KNN scikit-learn (10K 埋め込み) から FAISS HNSW (1M+ 埋め込み) まで、100 ~ 1000 倍のスピードアップ。
- 倫理とGDPR: オプションではなく、基本的な要件です。 EU AI 法 2024 では、公共の場での顔認識が禁止されています。
シリーズナビゲーション
シリーズ間のリソース
- MLOps: 本番環境で提供されるモデル - REST API 上にモデルをデプロイする
- 高度な深層学習: ビジョン トランスフォーマー







