Face Detection and Recognition: MediaPipe, MTCNN, and FaceNet
Facial recognition is one of the most mature and widespread computer vision applications: from security systems to smartphones, from access control to retail demographic analytics. Yet implementing it correctly - with attention to accuracy, speed, and above all ethics - requires deep understanding of the techniques involved.
In this article we'll explore the entire stack: face detection (finding faces in an image), face alignment (geometric normalization), face embedding (vector representation), and face verification/identification. We'll use MediaPipe for real-time scenarios, MTCNN for precision, and FaceNet/ArcFace for recognition.
What You'll Learn
- Face detection vs face recognition pipeline: differences and use cases
- MediaPipe Face Detection: fast, lightweight, cross-platform
- MediaPipe Face Mesh: 468 facial landmarks in real-time
- MTCNN: Multi-task Cascaded CNN for precise detection
- Face alignment: geometric normalization with landmarks
- Face embedding: FaceNet and ArcFace for compact representations
- Face verification (1:1) and identification (1:N)
- Building a recognition system from scratch with a face database
- Ethical and legal considerations: GDPR, bias, consent
1. Face Detection vs Face Recognition: The Complete Pipeline
The term "facial recognition" often bundles two distinct tasks with very different technical requirements. Understanding this distinction is crucial for system design:
Facial Pipeline Components
| Stage | Task | Output | Typical Model |
|---|---|---|---|
| Detection | Find face positions | Bounding boxes | MediaPipe, MTCNN, RetinaFace |
| Alignment | Normalize geometry | Normalized 112x112 image | Affine transform with landmarks |
| Embedding | Extract feature descriptor | 128-512D vector | FaceNet, ArcFace, AdaFace |
| Verification | Same person? (1:1) | Similarity score, boolean | Cosine distance between embeddings |
| Identification | Who is it? (1:N) | Identity + confidence | KNN on embedding database |
2. MediaPipe: Face Detection and Face Mesh
Google's MediaPipe is the most practical framework for real-time face detection on CPU. The BlazeFace model is specifically optimized for speed on mobile and embedded devices, achieving 200+ FPS on a modern laptop. For production real-time scenarios - surveillance, access control, driver monitoring - MediaPipe is the right starting point.
2.1 Face Detection with MediaPipe
import mediapipe as mp
import cv2
import numpy as np
from dataclasses import dataclass
from typing import Optional
@dataclass
class FaceDetection:
"""Detection result for a single face."""
bbox: tuple[int, int, int, int] # x1, y1, x2, y2
confidence: float
keypoints: dict[str, tuple[int, int]] # name -> (x, y) in pixels
class MediaPipeFaceDetector:
"""
Face detector based on MediaPipe BlazeFace.
Very fast on CPU: 200+ FPS on 640x480 images.
Great for real-time, not ideal for high face density images.
"""
KEYPOINT_NAMES = [
'right_eye', 'left_eye', 'nose_tip',
'mouth_center', 'right_ear_tragion', 'left_ear_tragion'
]
def __init__(self, min_confidence: float = 0.5,
model_selection: int = 0):
"""
model_selection:
0 = short range (within 2m, faster)
1 = full range (up to 5m, more accurate)
"""
self.mp_face = mp.solutions.face_detection
self.detector = self.mp_face.FaceDetection(
model_selection=model_selection,
min_detection_confidence=min_confidence
)
self.mp_draw = mp.solutions.drawing_utils
def detect(self, img_bgr: np.ndarray) -> list[FaceDetection]:
"""Detect faces in a BGR image."""
h, w = img_bgr.shape[:2]
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
results = self.detector.process(img_rgb)
faces = []
if not results.detections:
return faces
for detection in results.detections:
score = detection.score[0]
bbox_rel = detection.location_data.relative_bounding_box
# Relative coordinates -> pixels
x1 = max(0, int(bbox_rel.xmin * w))
y1 = max(0, int(bbox_rel.ymin * h))
x2 = min(w, int((bbox_rel.xmin + bbox_rel.width) * w))
y2 = min(h, int((bbox_rel.ymin + bbox_rel.height) * h))
# Keypoints (eyes, nose, mouth, ears)
keypoints = {}
for idx, name in enumerate(self.KEYPOINT_NAMES):
kp = detection.location_data.relative_keypoints[idx]
keypoints[name] = (int(kp.x * w), int(kp.y * h))
faces.append(FaceDetection(
bbox=(x1, y1, x2, y2),
confidence=float(score),
keypoints=keypoints
))
return faces
def draw(self, img_bgr: np.ndarray,
faces: list[FaceDetection]) -> np.ndarray:
"""Annotate image with detection results."""
annotated = img_bgr.copy()
for face in faces:
x1, y1, x2, y2 = face.bbox
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(annotated, f"{face.confidence:.2f}",
(x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (0, 255, 0), 2)
# Draw keypoints
for name, (kx, ky) in face.keypoints.items():
color = (0, 0, 255) if 'eye' in name else (255, 0, 0)
cv2.circle(annotated, (kx, ky), 4, color, -1)
return annotated
def run_face_detection_webcam() -> None:
"""Real-time detection on webcam."""
detector = MediaPipeFaceDetector(min_confidence=0.5)
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
faces = detector.detect(frame)
annotated = detector.draw(frame, faces)
cv2.putText(annotated, f"Faces: {len(faces)}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('MediaPipe Face Detection', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
2.2 Face Mesh: 468 Landmarks in Real-Time
MediaPipe's Face Mesh model extracts 468 3D landmarks (x, y, z) from a face. Useful for face alignment, emotion estimation, AR filters, eye gaze tracking, and drowsiness detection (eye aspect ratio). The optional landmark refinement mode extends this to 478 points including iris landmarks.
import mediapipe as mp
import cv2
import numpy as np
class FaceMeshAnalyzer:
"""
MediaPipe Face Mesh: 468 3D landmarks in real-time.
Includes utilities: eye aspect ratio (drowsiness), head pose, etc.
"""
# MediaPipe landmark indices for eyes
LEFT_EYE_IDX = [362, 385, 387, 263, 373, 380]
RIGHT_EYE_IDX = [33, 160, 158, 133, 153, 144]
def __init__(self, max_faces: int = 1,
refine_landmarks: bool = True):
"""
refine_landmarks=True: adds landmarks around eyes and irises
(468 -> 478 total points)
"""
self.mp_mesh = mp.solutions.face_mesh
self.face_mesh = self.mp_mesh.FaceMesh(
max_num_faces=max_faces,
refine_landmarks=refine_landmarks,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
self.mp_styles = mp.solutions.drawing_styles
def process(self, img_bgr: np.ndarray) -> Optional[list]:
"""Process image and return landmark list per face."""
h, w = img_bgr.shape[:2]
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(img_rgb)
if not results.multi_face_landmarks:
return None
all_faces_lm = []
for face_landmarks in results.multi_face_landmarks:
lm_pixels = []
for lm in face_landmarks.landmark:
lm_pixels.append((int(lm.x * w), int(lm.y * h), lm.z))
all_faces_lm.append(lm_pixels)
return all_faces_lm
def eye_aspect_ratio(self, landmarks: list,
eye_indices: list) -> float:
"""
Eye Aspect Ratio (EAR) - drowsiness indicator.
EAR < 0.2 for 20+ consecutive frames = closed eye.
Formula: EAR = (|p2-p6| + |p3-p5|) / (2 * |p1-p4|)
"""
pts = [np.array(landmarks[i][:2]) for i in eye_indices]
# Vertical distances
A = np.linalg.norm(pts[1] - pts[5])
B = np.linalg.norm(pts[2] - pts[4])
# Horizontal distance
C = np.linalg.norm(pts[0] - pts[3])
return (A + B) / (2.0 * C) if C > 0 else 0.0
def drowsiness_detector(threshold: float = 0.22,
consec_frames: int = 20) -> None:
"""Drowsiness alert system based on EAR."""
analyzer = FaceMeshAnalyzer(max_faces=1)
cap = cv2.VideoCapture(0)
ear_counter = 0
while True:
ret, frame = cap.read()
if not ret:
break
landmarks_list = analyzer.process(frame)
if landmarks_list:
lms = landmarks_list[0] # first face
ear_l = analyzer.eye_aspect_ratio(lms, analyzer.LEFT_EYE_IDX)
ear_r = analyzer.eye_aspect_ratio(lms, analyzer.RIGHT_EYE_IDX)
avg_ear = (ear_l + ear_r) / 2.0
if avg_ear < threshold:
ear_counter += 1
if ear_counter >= consec_frames:
cv2.putText(frame, "ALERT: DROWSINESS!",
(50, 200), cv2.FONT_HERSHEY_SIMPLEX,
1.5, (0, 0, 255), 3)
else:
ear_counter = 0
cv2.putText(frame, f"EAR: {avg_ear:.3f}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.8, (0, 255, 0), 2)
cv2.imshow('Drowsiness Detector', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
3. MTCNN: Multi-task Cascaded CNN
MTCNN is a three-stage detector (P-Net, R-Net, O-Net) that balances speed and precision. It's the gold standard for accurate detection in recognition systems: it detects faces with 5 landmarks (eyes, nose, mouth corners), required for face alignment. Slower than MediaPipe but significantly more robust in difficult conditions (low lighting, partial occlusion, extreme angles).
from mtcnn import MTCNN
import cv2
import numpy as np
class MTCNNFaceProcessor:
"""
MTCNN for precise detection + face alignment.
Produces normalized 112x112 images, optimal for FaceNet/ArcFace.
"""
def __init__(self, min_face_size: int = 40,
thresholds: list = None,
scale_factor: float = 0.709):
self.detector = MTCNN(
min_face_size=min_face_size,
thresholds=thresholds or [0.6, 0.7, 0.7],
scale_factor=scale_factor
)
def detect_and_align(self, img_bgr: np.ndarray,
output_size: int = 112) -> list[np.ndarray]:
"""
Detect faces and return them aligned (112x112 by default).
Alignment uses an affine transform on 5 landmarks to bring
eyes into canonical position.
Returns: list of aligned face images (BGR, float32 [0,1])
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
detections = self.detector.detect_faces(img_rgb)
aligned_faces = []
for det in detections:
if det['confidence'] < 0.90:
continue
keypoints = det['keypoints']
src_pts = np.array([
keypoints['left_eye'],
keypoints['right_eye'],
keypoints['nose'],
keypoints['mouth_left'],
keypoints['mouth_right']
], dtype=np.float32)
# Canonical destination points for 112x112
dst_pts = np.array([
[38.2946, 51.6963],
[73.5318, 51.6963],
[56.0252, 71.7366],
[41.5493, 92.3655],
[70.7299, 92.3655]
], dtype=np.float32)
# Scale for output sizes other than 112
scale = output_size / 112.0
dst_pts *= scale
# Affine transform -> aligned image
M = cv2.estimateAffinePartial2D(src_pts, dst_pts)[0]
aligned = cv2.warpAffine(img_bgr, M,
(output_size, output_size))
aligned_faces.append(aligned.astype(np.float32) / 255.0)
return aligned_faces
4. Face Recognition: FaceNet and ArcFace
After detection and alignment, the heart of the recognition system is the face embedding model: a neural network that transforms a 112x112 image into a 128-512 dimensional vector. Faces of the same person produce nearby vectors in the space; faces of different people are far apart. This is what enables both 1:1 verification and 1:N identification.
Face Embedding Models Comparison
| Model | Embedding Dim | Loss | LFW Acc. | Size |
|---|---|---|---|---|
| FaceNet (Google) | 128 | Triplet Loss | 99.63% | 90 MB |
| ArcFace (InsightFace) | 512 | ArcFace Loss | 99.83% | 249 MB |
| AdaFace | 512 | AdaFace Loss | 99.82% | 249 MB |
| MobileFaceNet (edge) | 128 | ArcFace Loss | 99.55% | 4 MB |
import insightface
from insightface.app import FaceAnalysis
import numpy as np
import cv2
import pickle
from pathlib import Path
from sklearn.preprocessing import normalize
from sklearn.neighbors import KNeighborsClassifier
from typing import Optional
class FaceRecognitionSystem:
"""
Complete face recognition system based on InsightFace (ArcFace).
Supports registration of new identities and real-time recognition.
Install: pip install insightface onnxruntime scikit-learn
"""
def __init__(self, db_path: str = 'face_db.pkl',
recognition_threshold: float = 0.5):
"""
recognition_threshold: cosine threshold for a valid match
(0.5 is a good default for 512D ArcFace embeddings)
"""
self.app = FaceAnalysis(
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
self.app.prepare(ctx_id=0, det_size=(640, 640))
self.db_path = Path(db_path)
self.threshold = recognition_threshold
self.database: dict[str, list[np.ndarray]] = {}
self.knn: Optional[KNeighborsClassifier] = None
if self.db_path.exists():
self._load_database()
def register_person(self, name: str,
images: list[np.ndarray],
max_faces_per_image: int = 1) -> int:
"""
Register a new person in the database.
name: person identifier
images: list of BGR images (at least 5 for robustness)
Returns: number of embeddings successfully registered
"""
embeddings = []
for img in images:
faces = self.app.get(img)
if not faces:
continue
# Take the largest face (for single-person images)
face = max(faces,
key=lambda f: (f.bbox[2]-f.bbox[0]) *
(f.bbox[3]-f.bbox[1]))
emb = normalize(face.embedding.reshape(1, -1))[0]
embeddings.append(emb)
if not embeddings:
print(f"[WARN] No face detected for {name}")
return 0
if name not in self.database:
self.database[name] = []
self.database[name].extend(embeddings)
self._rebuild_knn()
self._save_database()
print(f"Registered {name}: {len(embeddings)} embeddings")
return len(embeddings)
def recognize(self, img_bgr: np.ndarray) -> list[dict]:
"""
Recognize all faces in an image.
Returns: list of dicts with bbox, identity, confidence per face
"""
faces = self.app.get(img_bgr)
results = []
for face in faces:
emb = normalize(face.embedding.reshape(1, -1))[0]
identity, confidence = self._match_embedding(emb)
x1, y1, x2, y2 = face.bbox.astype(int)
results.append({
'bbox': (x1, y1, x2, y2),
'identity': identity,
'confidence': confidence,
'is_known': confidence >= self.threshold
})
return results
def _match_embedding(self, emb: np.ndarray) -> tuple[str, float]:
"""Find the best match in the database."""
if not self.database or self.knn is None:
return ('unknown', 0.0)
dist, idx = self.knn.kneighbors([emb], n_neighbors=1)
labels = [name for name, embs in self.database.items()
for _ in embs]
best_name = labels[idx[0][0]]
similarity = 1.0 - dist[0][0]
return (best_name, float(similarity))
def _rebuild_knn(self) -> None:
"""Rebuild KNN classifier after database updates."""
all_embs = []
all_labels = []
for name, embs in self.database.items():
all_embs.extend(embs)
all_labels.extend([name] * len(embs))
if len(all_embs) < 2:
return
self.knn = KNeighborsClassifier(
n_neighbors=min(3, len(all_embs)),
metric='cosine',
algorithm='brute'
)
self.knn.fit(np.array(all_embs), all_labels)
def _save_database(self) -> None:
with open(self.db_path, 'wb') as f:
pickle.dump(self.database, f)
def _load_database(self) -> None:
with open(self.db_path, 'rb') as f:
self.database = pickle.load(f)
self._rebuild_knn()
print(f"Database loaded: {len(self.database)} identities")
def annotate(self, img_bgr: np.ndarray,
results: list[dict]) -> np.ndarray:
"""Annotate image with recognition results."""
annotated = img_bgr.copy()
for r in results:
x1, y1, x2, y2 = r['bbox']
color = (0, 255, 0) if r['is_known'] else (0, 0, 255)
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = (f"{r['identity']} ({r['confidence']:.2f})"
if r['is_known'] else "Unknown")
cv2.putText(annotated, label, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return annotated
5. Face Verification: Threshold and ROC Curve
Face verification answers: "do these two photos show the same person?". It's a 1:1 matching problem, different from identification (1:N). The key is choosing the right similarity threshold by analyzing the ROC curve. The Equal Error Rate (EER) - where False Accept Rate equals False Reject Rate - provides a principled starting point.
import numpy as np
from sklearn.metrics import roc_curve, auc
def cosine_similarity(emb1: np.ndarray, emb2: np.ndarray) -> float:
"""Cosine similarity between two normalized embeddings."""
emb1_n = emb1 / (np.linalg.norm(emb1) + 1e-10)
emb2_n = emb2 / (np.linalg.norm(emb2) + 1e-10)
return float(np.dot(emb1_n, emb2_n))
def find_optimal_threshold(same_person_pairs: list[tuple],
diff_person_pairs: list[tuple]) -> dict:
"""
Find optimal threshold by analyzing the ROC curve.
same_person_pairs: list of (emb1, emb2) pairs - same person
diff_person_pairs: list of (emb1, emb2) pairs - different people
Returns: {threshold, eer, auc, far, frr}
"""
scores = []
labels = []
for emb1, emb2 in same_person_pairs:
scores.append(cosine_similarity(emb1, emb2))
labels.append(1) # same person
for emb1, emb2 in diff_person_pairs:
scores.append(cosine_similarity(emb1, emb2))
labels.append(0) # different people
scores_arr = np.array(scores)
labels_arr = np.array(labels)
# ROC curve
fpr, tpr, thresholds = roc_curve(labels_arr, scores_arr)
roc_auc = auc(fpr, tpr)
# Equal Error Rate (EER): point where FAR = FRR
fnr = 1 - tpr
eer_idx = np.argmin(np.abs(fpr - fnr))
eer = (fpr[eer_idx] + fnr[eer_idx]) / 2.0
optimal_threshold = thresholds[eer_idx]
# Metrics at optimal threshold
predictions = (scores_arr >= optimal_threshold).astype(int)
tp = np.sum((predictions == 1) & (labels_arr == 1))
fp = np.sum((predictions == 1) & (labels_arr == 0))
fn = np.sum((predictions == 0) & (labels_arr == 1))
tn = np.sum((predictions == 0) & (labels_arr == 0))
far = fp / (fp + tn) if (fp + tn) > 0 else 0 # False Accept Rate
frr = fn / (fn + tp) if (fn + tp) > 0 else 0 # False Reject Rate
print(f"=== Face Verification Metrics ===")
print(f"AUC-ROC: {roc_auc:.4f}")
print(f"EER: {eer*100:.2f}%")
print(f"Optimal threshold: {optimal_threshold:.4f}")
print(f"FAR @ EER: {far*100:.2f}%")
print(f"FRR @ EER: {frr*100:.2f}%")
return {
'threshold': float(optimal_threshold),
'eer': float(eer),
'auc': float(roc_auc),
'far': float(far),
'frr': float(frr)
}
6. Anti-Spoofing and Liveness Detection
A face recognition system without liveness detection is vulnerable to spoofing attacks: a printed photo, a video on a smartphone screen, or a 3D mask can fool most detectors. Liveness detection distinguishes a real face from an artifact - it's a security prerequisite, not an optional feature.
Types of Spoofing Attacks
| Attack Type | Description | Defense Difficulty | Mitigation Technique |
|---|---|---|---|
| Print Attack | Printed photo on paper/glossy surface | Low | Texture analysis, moaré pattern detection |
| Replay Attack | Face video on screen | Medium | Screen reflection detection, 3D depth |
| 3D Mask | Realistic 3D-printed mask | High | IR sensor, challenge-response, micromotion |
| Deepfake Video | AI-generated synthetic video | Very High | Deepfake detector, blood flow analysis (rPPG) |
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from typing import Optional
class LivenessDetector:
"""
Liveness detection based on two complementary signals:
1. Texture analysis (CNN) - detects print attacks
2. Micro-motion analysis - detects replay attacks (static videos lack natural micro-movements)
For serious deployment, consider: SilentFace, FAS-SGTD, CentralDiff-CNN
Datasets: CelebA-Spoof, OULU-NPU, MSU-MFSD
"""
def __init__(self, model_path: Optional[str] = None,
device: str = 'auto'):
self.device = torch.device(
'cuda' if torch.cuda.is_available() and device == 'auto'
else 'cpu'
)
self.model = self._build_model(model_path)
self.model.eval()
self.transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
self.frame_buffer: list[np.ndarray] = []
self.buffer_size = 10 # 10 frames ~= 333ms @ 30FPS
def _build_model(self, model_path: Optional[str]) -> nn.Module:
"""
MobileNetV2 fine-tuned for binary classification: real vs spoof.
Lightweight (3.4M params) - suitable for edge deployment.
"""
model = models.mobilenet_v2(pretrained=False)
model.classifier = nn.Sequential(
nn.Dropout(0.2),
nn.Linear(model.last_channel, 2) # [spoof, real]
)
if model_path:
state_dict = torch.load(model_path, map_location=self.device)
model.load_state_dict(state_dict)
return model.to(self.device)
def is_live_texture(self, face_roi: np.ndarray,
threshold: float = 0.7) -> tuple[bool, float]:
"""
CNN texture analysis: classify face as real or spoof.
face_roi: BGR face crop [H, W, 3]
Returns: (is_live, confidence_score)
"""
img_rgb = cv2.cvtColor(face_roi, cv2.COLOR_BGR2RGB)
tensor = self.transform(img_rgb).unsqueeze(0).to(self.device)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.softmax(logits, dim=1)
live_prob = probs[0, 1].item() # index 1 = "real"
return live_prob >= threshold, live_prob
def analyze_micro_motion(self, frame_bgr: np.ndarray) -> tuple[bool, float]:
"""
Micro-motion analysis: detects natural face movements (micro-expressions,
breathing, eye blinks) absent in photos/static videos.
Returns: (has_micro_motion, motion_score)
A replay video typically has motion_score < 0.5
"""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
self.frame_buffer.append(gray)
if len(self.frame_buffer) > self.buffer_size:
self.frame_buffer.pop(0)
if len(self.frame_buffer) < 3:
return True, 1.0 # Not enough frames, assume live
if len(self.frame_buffer) >= self.buffer_size:
all_flows = []
for i in range(len(self.frame_buffer) - 1):
flow = cv2.calcOpticalFlowFarneback(
self.frame_buffer[i], self.frame_buffer[i+1],
None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
all_flows.append(np.mean(magnitude))
# High variance = irregular natural movements = live
motion_variance = np.var(all_flows)
motion_score = min(1.0, motion_variance * 100)
else:
motion_score = 0.5
return motion_score > 0.3, float(motion_score)
def predict(self, face_roi: np.ndarray,
frame_bgr: np.ndarray) -> dict:
"""
Combined prediction: texture CNN + micro-motion.
Conservative AND fusion rule for security.
"""
is_live_tex, tex_score = self.is_live_texture(face_roi)
has_motion, motion_score = self.analyze_micro_motion(frame_bgr)
combined_score = 0.6 * tex_score + 0.4 * motion_score
is_live = is_live_tex and (motion_score > 0.2)
return {
'is_live': is_live,
'combined_score': combined_score,
'texture_score': tex_score,
'motion_score': motion_score,
'verdict': 'LIVE' if is_live else 'SPOOF'
}
7. Scalable Database: FAISS for Millions of Embeddings
The scikit-learn KNN system works well up to ~10,000 embeddings. Beyond that, brute-force search becomes a bottleneck. FAISS (Facebook AI Similarity Search) scales to billions of vectors with approximate nearest-neighbor search in microseconds.
import faiss
import numpy as np
import pickle
from pathlib import Path
from collections import Counter
class FAISSFaceDatabase:
"""
Scalable face embedding database using FAISS.
Approximate search (HNSW) for 1M+ embeddings in < 1ms.
Install: pip install faiss-cpu (or faiss-gpu for GPU)
"""
def __init__(self, embedding_dim: int = 512,
db_path: str = 'faiss_face_db',
index_type: str = 'hnsw'):
"""
index_type:
'flat' - Exact search, O(n), for < 100K embeddings
'hnsw' - Approximate HNSW, for 100K - 10M embeddings
'ivf' - Inverted File Index, for 10M+ embeddings
"""
self.embedding_dim = embedding_dim
self.db_path = Path(db_path)
self.db_path.mkdir(exist_ok=True)
self.index_type = index_type
self.index = self._build_index()
self.id_to_name: dict[int, str] = {}
self.next_id = 0
if (self.db_path / 'index.faiss').exists():
self._load()
def _build_index(self) -> faiss.Index:
"""Build appropriate FAISS index."""
if self.index_type == 'flat':
return faiss.IndexFlatIP(self.embedding_dim)
elif self.index_type == 'hnsw':
# M=32: connections per node (higher = more accurate but more RAM)
# efConstruction=200: index quality during build
index = faiss.IndexHNSWFlat(self.embedding_dim, 32)
index.hnsw.efConstruction = 200
index.hnsw.efSearch = 64 # accuracy/speed tradeoff at query time
return index
elif self.index_type == 'ivf':
n_lists = 100 # number of clusters
quantizer = faiss.IndexFlatIP(self.embedding_dim)
return faiss.IndexIVFFlat(quantizer, self.embedding_dim,
n_lists, faiss.METRIC_INNER_PRODUCT)
raise ValueError(f"Unknown index type: {self.index_type}")
def add_person(self, name: str,
embeddings: list[np.ndarray]) -> int:
"""Add multiple embeddings for the same person."""
for emb in embeddings:
emb_norm = emb / (np.linalg.norm(emb) + 1e-10)
self.index.add(emb_norm.astype(np.float32).reshape(1, -1))
self.id_to_name[self.next_id] = name
self.next_id += 1
return len(embeddings)
def identify(self, query_embedding: np.ndarray,
threshold: float = 0.5) -> tuple[str, float]:
"""Identify person with highest similarity (top-3 majority vote)."""
if self.next_id == 0:
return 'unknown', 0.0
emb_norm = query_embedding / (np.linalg.norm(query_embedding) + 1e-10)
emb_norm = emb_norm.astype(np.float32).reshape(1, -1)
k = min(3, self.next_id)
similarities, indices = self.index.search(emb_norm, k)
candidates = []
for sim, idx in zip(similarities[0], indices[0]):
if idx != -1 and sim >= threshold:
candidates.append((self.id_to_name[int(idx)], float(sim)))
if not candidates:
return 'unknown', 0.0
names = [c[0] for c in candidates]
best_name = Counter(names).most_common(1)[0][0]
best_sim = max(c[1] for c in candidates if c[0] == best_name)
return best_name, best_sim
def save(self) -> None:
"""Save FAISS index and ID->name mapping to disk."""
faiss.write_index(self.index,
str(self.db_path / 'index.faiss'))
with open(self.db_path / 'id_map.pkl', 'wb') as f:
pickle.dump({'id_to_name': self.id_to_name,
'next_id': self.next_id}, f)
def _load(self) -> None:
"""Load FAISS index and mapping from disk."""
self.index = faiss.read_index(str(self.db_path / 'index.faiss'))
with open(self.db_path / 'id_map.pkl', 'rb') as f:
data = pickle.load(f)
self.id_to_name = data['id_to_name']
self.next_id = data['next_id']
print(f"Database loaded: {self.next_id} embeddings, "
f"{len(set(self.id_to_name.values()))} identities")
def stats(self) -> dict:
names = list(self.id_to_name.values())
name_counts = Counter(names)
return {
'total_embeddings': self.next_id,
'total_identities': len(name_counts),
'avg_per_person': np.mean(list(name_counts.values()))
if name_counts else 0,
'index_type': self.index_type
}
# Benchmark: sklearn KNN vs FAISS
def benchmark_backends(n_identities: int = 10000,
embs_per_person: int = 5) -> None:
"""Compare search times: sklearn KNN vs FAISS HNSW."""
import time
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import normalize
n_total = n_identities * embs_per_person
dim = 512
embeddings = np.random.randn(n_total, dim).astype(np.float32)
embeddings = normalize(embeddings)
labels = np.repeat(np.arange(n_identities), embs_per_person)
query = normalize(np.random.randn(1, dim).astype(np.float32))
# sklearn KNN
knn = KNeighborsClassifier(n_neighbors=3, metric='cosine',
algorithm='brute')
knn.fit(embeddings, labels)
t0 = time.perf_counter()
for _ in range(100):
knn.predict(query)
knn_ms = (time.perf_counter() - t0) / 100 * 1000
# FAISS HNSW
index = faiss.IndexHNSWFlat(dim, 32)
index.add(embeddings)
t0 = time.perf_counter()
for _ in range(100):
index.search(query, 3)
faiss_ms = (time.perf_counter() - t0) / 100 * 1000
print(f"Search benchmark ({n_total:,} embeddings, dim={dim}):")
print(f" sklearn KNN: {knn_ms:.2f} ms/query")
print(f" FAISS HNSW: {faiss_ms:.3f} ms/query")
print(f" Speedup: {knn_ms/faiss_ms:.0f}x")
8. Ethical and Legal Considerations
Warning: Biometric Data under GDPR
Facial data is biometric data under GDPR (Art. 9) and its processing is subject to strict restrictions in the EU:
- Explicit consent required: Biometric data cannot be collected without specific informed consent for each purpose
- Data minimization: Store only the necessary embeddings, not the original images
- Right to erasure: Implement an endpoint to delete all data for a specific person
- Purpose limitation: Data collected for an access system cannot be used for marketing analytics
- Mandatory bias testing: Before deployment, verify metrics across demographic groups (EER by gender, age, ethnicity)
- No public surveillance: The EU AI Act 2024 nearly completely bans facial recognition in public spaces
9. Best Practices
Production-Ready Face Recognition Checklist
- Use MediaPipe for real-time, MTCNN for high precision: they're complementary - choose based on context and constraints
- Minimum 5-10 images per person: under different conditions (lighting, angle, expression). With a single image, the system is brittle
- ALWAYS normalize embeddings:
emb = emb / np.linalg.norm(emb). Without normalization, cosine distance doesn't work correctly - Calibrate threshold on real data: don't use 0.5 as default without validating it on your dataset. Calculate EER on your specific scenario
- Anti-spoofing: systems without liveness detection are vulnerable to photos and videos. Integrate a liveness detection model (MobileNetV2 fine-tuned on spoofing datasets)
- Update embeddings over time: people change appearance. Plan periodic re-enrollment or online embedding updates
- Privacy-preserving logging: log only embeddings (not images), with identity hashing for debugging without exposing personal data
Conclusions
The modern face recognition pipeline is robust, modular, and accessible. We covered every layer of a production-ready system:
- MediaPipe: ultra-fast detection on CPU, great for real-time with resource constraints. 200+ FPS on a modern laptop.
- MTCNN + Face Alignment: solid foundation for precise recognition systems. The 5 landmarks are essential for canonical 112x112 alignment.
- InsightFace/ArcFace: 512D embeddings with 99.83% accuracy on LFW - accessible state-of-the-art via pip install.
- ROC/EER threshold calibration: the difference between a robust and an unreliable system. Never use 0.5 as default without validation on your specific data.
- Anti-Spoofing + Liveness Detection: essential for security systems. CNN texture analysis + micro-motion variance for resistance to print and replay attacks.
- FAISS for scaling: from sklearn KNN (10K embeddings) to FAISS HNSW (1M+ embeddings) with 100-1000x speedup in search latency.
- Ethics and GDPR compliance: not optional but a fundamental requirement. The EU AI Act 2024 nearly completely bans facial recognition in public spaces.
Series Navigation
Cross-Series Resources
- MLOps: Model Serving in Production - deploy models on REST APIs
- Deep Learning Advanced: Vision Transformers







