ML モデルの A/B テスト: 方法論、指標、実装
推奨モデルの 2 つのバージョンをトレーニングしました。新しいモデルは、それをベースに、 トランスでは、ホールドアウト セットで 3% 高い AUC が示されています。明らかに改善されているように見えますが、 しかし、この違いは本当に実際のユーザーにとってプラスの影響をもたらすのでしょうか?モデル 特定の人口統計コホートではパフォーマンスが良くても、他のコホートではパフォーマンスが悪くなる可能性があります。削減できる可能性があります 長期的な満足度を高めながらクリックスルー率を向上させます。遅延が発生する可能性があります 高くすると精度の利点が相殺されます。オフラインの指標は嘘をつきません。 話のほんの一部。
L'ML モデルの A/B テスト そしてそれに対応するための方法論 これらの質問を厳密に検討し、実際のトラフィック上のモデルのバージョンを比較します。 実際のユーザーを対象にして、本当に重要なビジネス指標を測定します。による調査によると、 構造化された A/B テスト戦略を採用する組織、Aimpoint Digital Labs による 2025 年 ML モデルの場合、次のように削減されます。 本番環境の後退のリスクが 40% オフラインのメトリクスのみに基づく直接デプロイメントとの比較。それだけの価値があるMLOps市場 2026 年には 43 億 8000 万ドル、2035 年までに 891 億 8000 万ドルに達すると予想され、A/B テストは、この成長の基本的な構成要素の 1 つです。
このガイドでは、理論に基づいて、ML モデル用の完全な A/B テスト システムを構築します。 FastAPI ルーターへの統計情報、カナリア デプロイメントからシャドウ モードへの情報、Frequentist テストからの情報 トンプソンサンプリングを使用したベイジアンアプローチから、測定中のメトリクスのモニタリングまで Prometheus と Grafana でテストします。
何を学ぶか
- ML A/B テストと従来の Web A/B テストの違い
- 実験計画法: サンプルサイズ、統計検出力、成功指標
- FastAPI ルーターとプログレッシブ カナリア デプロイメントによるトラフィック分割
- シャドウ モード: ユーザーに影響を与えずにテストします。
- 従来の A/B テストの代替としての多腕バンディットとトンプソン サンプリング
- 統計分析: p 値、信頼区間、効果量
- より迅速な意思決定のためのベイジアン A/B テスト
- Prometheus と Grafana を使用したテスト中のモニタリング
- 避けるべきベストプラクティスとアンチパターン
ML A/B テストと Web A/B テスト: 重要な違い
A/B テストは、ランディング ページ、ボタン、およびランディング ページのバリエーションを比較するためにウェブ分析で生まれました。 コピーする。基本的な統計フレームワークは同じですが、ML モデルの A/B テストには次のような特徴があります。 実際には大幅に異なる複雑さが追加されます。
Web テストでは、バリアント A とバリアント B の個別の視覚エクスペリエンスが比較されます。 それらは明確に分けられています。 ML モデルでは、予測は連続的、分散的であり、 多くの場合、時間の経過とともに相関します。同じユーザーにサービスを提供するレコメンデーション モデル 異なるセッションでは独立した予測は生成されません。時間的な相関関係があります。 これは、古典的な統計検定の独立性の仮定に違反します。
ML と Web A/B テストの主な違い
- メトリクス: ウェブでは、CTR またはコンバージョン率が最適化されます。 MLでははい オフライン指標 (AUC、RMSE) とビジネス指標を同時に最適化 (収益、解約率、NPS)、多くの場合矛盾します。
- フィードバックの遅延: Web では結果がすぐに表示されます (クリック)。機械学習で 数日から数週間かかる場合もあります (解約は 30 日後、収益は四半期後に)。
- 効果分布: モデルのパフォーマンスが平均して優れている可能性があります しかし、特定のコホート(年齢差別、地理的偏見)ではさらに悪化し、セグメント化された分析が必要になります。
- システム効果: フィードバック ループ システム (推奨事項、 動的価格設定)、モデル B がデータに影響を与え、その後モデル C をトレーニングします。
- 運用上のリスク: Web バリアントのバグは悪い UX を引き起こします。 不正検出 ML モデルのバグは、重大な経済的損失を引き起こす可能性があります。
実験計画法: コードの前に
適切に設計されていない A/B テストは、A/B テストを行わないよりも悪く、誤った認識を与えます。 科学的厳密さを追求しながら、誤った結論を導き出すこと。実験の計画は次のように行う必要があります。 技術的な実装の前に。
成功指標の定義
すべての実験には必ず 主要な指標 唯一決定するのは 勝者、プラス 0 ~ 2 ガードレールのメトリクス そのモデルB A より悪くなってはなりません。プライマリ メトリックは直接次のようにする必要があります。 ビジネス目標と因果関係がある。
さまざまなシナリオのメトリクスの例:
- チャーンモデル: プライマリ = 30 日の保持率。ガードレール = P95 レイテンシー、キャンペーンコスト
- 推奨モデル: プライマリ = セッションあたりの収益。ガードレール = CTR、多様性の推奨事項
- 詐欺モデル: プライマリ = 検出されていない不正行為の割合。ガードレール = 誤検知率、遅延
- 価格モデル: プライマリー = 粗利益;ガードレール = コンバージョン率、NPS
サンプルサイズの計算
必要なサンプル サイズは、次の 3 つの要素によって決まります。効果の大きさ 最低限の 検出したいもの (最小検出可能効果、MDE)、 重要度のレベル alpha (通常は 0.05) と la 統計的検出力 1-ベータ (通常は 0.80)。
# sample_size_calculator.py
# Calcolo del sample size per A/B test ML
import numpy as np
from scipy import stats
from scipy.stats import norm
import math
def calculate_sample_size(
baseline_rate: float,
minimum_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.80,
two_tailed: bool = True
) -> int:
"""
Calcola il sample size per un A/B test su proporzioni (es. conversion rate).
Args:
baseline_rate: Tasso attuale del modello A (es. 0.15 per 15% churn)
minimum_detectable_effect: Variazione minima relativa da rilevare (es. 0.05 per +5%)
alpha: Livello di significativita (type I error rate)
power: Potenza statistica (1 - type II error rate)
two_tailed: True per test bidirezionale (default raccomandato)
Returns:
Sample size per ciascuna delle due varianti
"""
p1 = baseline_rate
p2 = baseline_rate * (1 + minimum_detectable_effect)
# Calcolo basato su formula di Cohen
z_alpha = norm.ppf(1 - alpha / (2 if two_tailed else 1))
z_beta = norm.ppf(power)
p_avg = (p1 + p2) / 2
q_avg = 1 - p_avg
numerator = (z_alpha * math.sqrt(2 * p_avg * q_avg) + z_beta * math.sqrt(p1 * (1-p1) + p2 * (1-p2))) ** 2
denominator = (p2 - p1) ** 2
n = math.ceil(numerator / denominator)
return n
def calculate_duration_days(
sample_size_per_variant: int,
daily_requests: int,
traffic_split: float = 0.5
) -> float:
"""Stima la durata del test in giorni."""
requests_per_variant_per_day = daily_requests * traffic_split
return sample_size_per_variant / requests_per_variant_per_day
# --- Esempio pratico: modello churn ---
baseline_churn_rate = 0.18 # 18% churn attuale (modello A)
mde = 0.10 # vogliamo rilevare un miglioramento del 10% relativo
# (da 18% a 16.2%)
n_per_variant = calculate_sample_size(
baseline_rate=baseline_churn_rate,
minimum_detectable_effect=-mde, # negativo = riduzione del churn
alpha=0.05,
power=0.80
)
daily_traffic = 5000 # richieste al giorno
test_duration = calculate_duration_days(n_per_variant, daily_traffic, 0.5)
print(f"Sample size per variante: {n_per_variant:,} campioni")
print(f"Durata stimata del test: {test_duration:.1f} giorni")
print(f"Traffico totale necessario: {n_per_variant * 2:,} richieste")
# Output tipico:
# Sample size per variante: 8,744 campioni
# Durata stimata del test: 3.5 giorni
# Traffico totale necessario: 17,488 richieste
覗き見トラップ: 結果をすぐに確認しないでください
「覗き見」(またはオプションの停止) 問題は、A/B テストで最も一般的なエラーの 1 つです。 中間結果を確認し、有意性に達したらすぐにテストを停止します 統計。これにより、誤検知率が劇的に増加します。データを見ると、 重要な結果が偶然見つかる確率は毎日、 2 つのバリエーションが同一であっても 30%。常にサンプルサイズを使用する あらかじめ決めておき、試験終了時にのみ結果を確認する、または方法を採用する 逐次確率比テスト (SPRT) などの逐次テストの。
FastAPI によるトラフィック分割
A/B テスト ルーターは、インフラストラクチャの中心的なコンポーネントです。配布する必要があります 決定論的な方法でトラフィックを転送します (同じユーザーは常に同じものを使用する必要があります) テスト期間全体を通じてバリアントを使用します)、どのバリアントに割り当てられたかを記録します すべてのユーザーとすべての予測を実行し、レイテンシーを追加しないように非常に高速です クリティカルパスへ。
# ab_router.py
# Router A/B testing per modelli ML con FastAPI
from fastapi import FastAPI, Request, Header
from pydantic import BaseModel
import hashlib
import json
import time
import logging
from typing import Optional, Literal
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
logger = logging.getLogger(__name__)
app = FastAPI(title="ML A/B Testing Router")
# --- Prometheus Metrics ---
AB_REQUESTS = Counter(
"ab_test_requests_total",
"Numero totale di richieste per variante",
labelnames=["experiment_id", "variant", "model_version"]
)
AB_LATENCY = Histogram(
"ab_test_latency_seconds",
"Latenza inference per variante",
labelnames=["experiment_id", "variant"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0]
)
AB_PREDICTIONS = Counter(
"ab_test_predictions_total",
"Distribuzione delle predizioni per variante",
labelnames=["experiment_id", "variant", "prediction_bucket"]
)
# --- Configurazione esperimento ---
ACTIVE_EXPERIMENT = {
"experiment_id": "churn_model_v2_vs_v3",
"model_a": {
"name": "churn-model-v2",
"endpoint": "http://model-a-service:8080/predict",
"traffic_weight": 0.5
},
"model_b": {
"name": "churn-model-v3",
"endpoint": "http://model-b-service:8080/predict",
"traffic_weight": 0.5
},
"start_time": "2025-03-01T00:00:00Z",
"end_time": "2025-03-15T00:00:00Z"
}
class PredictionRequest(BaseModel):
user_id: str
features: dict
def assign_variant(user_id: str, experiment_id: str, traffic_split: float = 0.5) -> str:
"""
Assegna deterministicamente un utente a una variante.
Lo stesso user_id + experiment_id producono sempre lo stesso risultato.
"""
hash_input = f"{user_id}:{experiment_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
normalized = (hash_value % 10000) / 10000.0
if normalized < traffic_split:
return "A"
else:
return "B"
async def call_model(endpoint: str, features: dict) -> dict:
"""Chiama il servizio del modello."""
import httpx
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.post(endpoint, json=features)
response.raise_for_status()
return response.json()
@app.post("/predict")
async def predict(request: PredictionRequest):
"""
Endpoint principale: smista le richieste alle varianti A/B.
"""
exp = ACTIVE_EXPERIMENT
exp_id = exp["experiment_id"]
# Assegna variante in modo deterministico
variant = assign_variant(
user_id=request.user_id,
experiment_id=exp_id,
traffic_split=exp["model_a"]["traffic_weight"]
)
# Seleziona il modello corretto
model_config = exp["model_a"] if variant == "A" else exp["model_b"]
# Registra richiesta
AB_REQUESTS.labels(
experiment_id=exp_id,
variant=variant,
model_version=model_config["name"]
).inc()
# Chiama il modello con misura della latenza
start_time = time.time()
try:
result = await call_model(model_config["endpoint"], request.features)
except Exception as e:
logger.error(f"Errore chiamata modello {variant}: {e}")
raise
latency = time.time() - start_time
AB_LATENCY.labels(experiment_id=exp_id, variant=variant).observe(latency)
# Bucketing della predizione per distribuzione
score = result.get("churn_probability", 0)
bucket = "high" if score > 0.7 else ("medium" if score > 0.3 else "low")
AB_PREDICTIONS.labels(
experiment_id=exp_id, variant=variant, prediction_bucket=bucket
).inc()
return {
"prediction": result,
"variant": variant,
"model_version": model_config["name"],
"experiment_id": exp_id,
"latency_ms": round(latency * 1000, 2)
}
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.get("/experiment/status")
async def experiment_status():
"""Ritorna lo stato corrente dell'esperimento."""
return {
"experiment": ACTIVE_EXPERIMENT["experiment_id"],
"active": True
}
カナリア デプロイメント: プログレッシブ リリース
Il カナリア展開 そしてプログレッシブリリース戦略では、 新しいモデル (「カナリア」) は、最初はほんのわずかな割合しか受け取りません 本番トラフィックの 1 ~ 5%。メトリクスが安定している場合、 割合は5%→10%→25%→50%→100%と徐々に増加します。 異常が発生した場合、ロールバックが即座に行われ、すべてのトラフィックが安定したモデルに戻されます。
古典的な 50/50 A/B テストとは異なり、カナリアは方向性を重視しています リスクの軽減 以上 差異の統計的検出。目的はデモンストレーションではありません 新しいモデルは統計的に有意な点で優れているが、そうではないことを検証する デプロイメントを拡張する前に、顕著な技術的問題や回帰が発生する可能性があります。
# canary_deployment.py
# Implementazione canary deployment con rollback automatico
import asyncio
import time
import logging
from dataclasses import dataclass, field
from typing import Optional
from prometheus_client import Gauge
logger = logging.getLogger(__name__)
# Gauge per monitorare la percentuale di traffico canary
CANARY_TRAFFIC_WEIGHT = Gauge(
"canary_traffic_weight_percent",
"Percentuale di traffico al modello canary",
labelnames=["experiment_id"]
)
ERROR_RATE_GAUGE = Gauge(
"canary_error_rate",
"Tasso di errore del modello canary",
labelnames=["experiment_id"]
)
@dataclass
class CanaryConfig:
experiment_id: str
stable_model_endpoint: str
canary_model_endpoint: str
initial_canary_weight: float = 0.05 # Inizia al 5%
max_canary_weight: float = 1.0 # Target finale: 100%
step_size: float = 0.10 # Incremento per ogni step
step_interval_minutes: int = 30 # Ogni 30 minuti aumenta
max_error_rate: float = 0.02 # Rollback se errori > 2%
max_latency_p99_ms: float = 500.0 # Rollback se P99 > 500ms
current_weight: float = field(init=False)
def __post_init__(self):
self.current_weight = self.initial_canary_weight
class CanaryController:
"""
Controlla progressivamente il traffico al modello canary.
Esegue rollback automatico se le metriche superano le soglie.
"""
def __init__(self, config: CanaryConfig):
self.config = config
self.error_count = 0
self.total_count = 0
self.latencies = []
self.is_rolled_back = False
self.is_promoted = False
def should_route_to_canary(self, user_id: str) -> bool:
"""Determina se questa richiesta va al canary."""
if self.is_rolled_back:
return False
hash_val = int(hashlib.md5(
f"{user_id}:{self.config.experiment_id}".encode()
).hexdigest(), 16)
normalized = (hash_val % 10000) / 10000.0
return normalized < self.config.current_weight
def record_outcome(self, is_canary: bool, success: bool, latency_ms: float):
"""Registra l'esito di una chiamata al canary."""
if not is_canary:
return
self.total_count += 1
if not success:
self.error_count += 1
self.latencies.append(latency_ms)
# Aggiorna metriche Prometheus
error_rate = self.error_count / max(self.total_count, 1)
ERROR_RATE_GAUGE.labels(
experiment_id=self.config.experiment_id
).set(error_rate)
# Controlla soglie per rollback automatico
if error_rate > self.config.max_error_rate and self.total_count > 100:
logger.critical(
f"Error rate {error_rate:.2%} exceeded threshold "
f"{self.config.max_error_rate:.2%}. Initiating rollback."
)
self.rollback()
if len(self.latencies) >= 100:
p99 = sorted(self.latencies)[-1] # semplificato
if p99 > self.config.max_latency_p99_ms:
logger.critical(f"P99 latency {p99:.0f}ms exceeded threshold. Rollback.")
self.rollback()
def advance_canary(self):
"""Incrementa il peso del canary se le metriche sono OK."""
if self.is_rolled_back or self.is_promoted:
return
new_weight = min(
self.config.current_weight + self.config.step_size,
self.config.max_canary_weight
)
self.config.current_weight = new_weight
CANARY_TRAFFIC_WEIGHT.labels(
experiment_id=self.config.experiment_id
).set(new_weight * 100)
logger.info(
f"Canary weight increased to {new_weight:.0%} "
f"for experiment {self.config.experiment_id}"
)
if new_weight >= self.config.max_canary_weight:
self.is_promoted = True
logger.info("Canary fully promoted to production!")
def rollback(self):
"""Esegue rollback immediato al modello stabile."""
self.config.current_weight = 0.0
self.is_rolled_back = True
CANARY_TRAFFIC_WEIGHT.labels(
experiment_id=self.config.experiment_id
).set(0)
logger.warning(f"ROLLBACK executed for {self.config.experiment_id}")
シャドウ モード: ユーザーに影響を与えないテスト
Lo シャドウモード (またはシャドウ展開) は最も保守的な手法です 同時に、新しいモデルをユーザーに公開する前に検証するのに最も強力です。 運用トラフィックが複製される: モデル A は実際のリクエストと独自のリクエストを処理します。 予測はユーザーに返されますが、モデル B は同じリクエストを受け取ります 並行して、しかし彼の予測は来る 破棄されたかログインのみ.
このアプローチにより、何もせずに実際のトラフィックで 2 つのモデルを比較できます。 ユーザーまたはビジネスに対するリスク。新しいモデルが次のことを行うことを検証するのに最適です。 重大なバグがなく、実際の負荷の下でレイテンシー要件を満たし、 異常な予測や分布から外れた予測を生成せず、期待どおりに動作します すべてのユーザーセグメントにわたって。
# shadow_mode.py
# Implementazione shadow deployment con logging asincrono
import asyncio
import httpx
import logging
import json
from datetime import datetime
from typing import Any
logger = logging.getLogger(__name__)
class ShadowModeRouter:
"""
Router che invia le richieste sia al modello produzione che al modello shadow.
Il modello produzione risponde agli utenti; il shadow solo logga.
"""
def __init__(
self,
production_endpoint: str,
shadow_endpoint: str,
shadow_log_file: str = "shadow_predictions.jsonl"
):
self.production_endpoint = production_endpoint
self.shadow_endpoint = shadow_endpoint
self.shadow_log_file = shadow_log_file
async def predict(self, request_data: dict, request_id: str) -> dict:
"""
Invia la richiesta al modello produzione e in parallelo al shadow.
Restituisce solo la risposta del modello produzione.
"""
# Esegui produzione e shadow in parallelo
prod_task = asyncio.create_task(
self._call_model(self.production_endpoint, request_data, "production")
)
shadow_task = asyncio.create_task(
self._call_model(self.shadow_endpoint, request_data, "shadow")
)
# Aspetta la risposta produzione (non bloccante per shadow)
prod_result = await prod_task
# Logga la risposta shadow in background senza bloccare
asyncio.create_task(
self._log_shadow_result(shadow_task, request_id, request_data, prod_result)
)
return prod_result
async def _call_model(
self, endpoint: str, data: dict, label: str
) -> dict:
"""Chiama un endpoint modello con gestione degli errori."""
start = asyncio.get_event_loop().time()
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.post(endpoint, json=data)
response.raise_for_status()
result = response.json()
result["_latency_ms"] = (asyncio.get_event_loop().time() - start) * 1000
result["_model"] = label
return result
except Exception as e:
logger.error(f"Error calling {label} model: {e}")
return {"error": str(e), "_model": label, "_latency_ms": -1}
async def _log_shadow_result(
self,
shadow_task: asyncio.Task,
request_id: str,
input_data: dict,
production_result: dict
):
"""Logga la risposta shadow per analisi offline."""
try:
shadow_result = await shadow_task
except Exception as e:
shadow_result = {"error": str(e)}
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id,
"input_features": input_data,
"production_prediction": production_result.get("prediction"),
"production_latency_ms": production_result.get("_latency_ms"),
"shadow_prediction": shadow_result.get("prediction"),
"shadow_latency_ms": shadow_result.get("_latency_ms"),
"shadow_error": shadow_result.get("error"),
"predictions_agree": (
production_result.get("prediction") == shadow_result.get("prediction")
)
}
# Scrivi su file JSONL per analisi batch
with open(self.shadow_log_file, "a") as f:
f.write(json.dumps(log_entry) + "\n")
# --- Analisi dei risultati shadow ---
def analyze_shadow_results(log_file: str):
"""Analizza i log shadow per validare il nuovo modello."""
import pandas as pd
records = []
with open(log_file) as f:
for line in f:
records.append(json.loads(line))
df = pd.DataFrame(records)
total = len(df)
agreement_rate = df["predictions_agree"].mean()
shadow_errors = df["shadow_error"].notna().sum()
print(f"Totale richieste analizzate: {total:,}")
print(f"Tasso di accordo produzione/shadow: {agreement_rate:.1%}")
print(f"Errori modello shadow: {shadow_errors} ({shadow_errors/total:.1%})")
print(f"Latenza media produzione: {df['production_latency_ms'].mean():.1f}ms")
print(f"Latenza media shadow: {df['shadow_latency_ms'].mean():.1f}ms")
# Identifica i casi di disaccordo per analisi manuale
disagreements = df[~df["predictions_agree"]]
print(f"\nCasi di disaccordo: {len(disagreements)}")
return df
多腕の盗賊: 従来の A/B テストを超えて
従来の A/B テストの主な制限は次のとおりです。 探査費用: テスト期間中、一部のユーザーがモデルを受け取る可能性があります もっと悪い。モデル B が明らかに優れている場合、変換を「無駄に」していることになります。 テスト週に A に割り当てられたユーザーの数。
I 多腕盗賊 (MAB) 彼らは問題を解決します 探査-悪用: テスト期間全体にわたって固定分割を維持する代わりに、 アルゴリズム 動的に適応する 実行中のモデルへのトラフィック テスト自体中の総コンバージョンを最大化することで、より良い結果が得られます。検索 Aimpoint Digital Labs による 2025 年の調査では、トンプソン サンプリングなどの盗賊によるアプローチが実証されています。 ~の蓄積された後悔を減らすことができる 従来の A/B テストと比較して 20 ~ 35% 強力な効果を持つシナリオで。
# thompson_sampling_bandit.py
# Multi-Armed Bandit con Thompson Sampling per selezione modello ML
import numpy as np
from dataclasses import dataclass, field
from typing import List, Tuple
import json
import logging
logger = logging.getLogger(__name__)
@dataclass
class ModelArm:
"""Rappresenta un modello come braccio del bandit."""
name: str
endpoint: str
alpha: float = 1.0 # Successi (Beta distribution prior)
beta: float = 1.0 # Fallimenti (Beta distribution prior)
@property
def estimated_success_rate(self) -> float:
"""Stima puntuale del tasso di successo (media della distribuzione Beta)."""
return self.alpha / (self.alpha + self.beta)
@property
def total_observations(self) -> int:
return int(self.alpha + self.beta - 2) # sottrai i prior
def sample(self) -> float:
"""Campiona dalla distribuzione Beta posteriore (Thompson Sampling)."""
return np.random.beta(self.alpha, self.beta)
def update(self, reward: float):
"""
Aggiorna la distribuzione con il nuovo outcome.
reward = 1.0 per successo (churn evitato, conversione, etc.)
reward = 0.0 per fallimento
"""
if reward >= 0.5: # successo
self.alpha += 1
else: # fallimento
self.beta += 1
class ThompsonSamplingBandit:
"""
Multi-Armed Bandit con Thompson Sampling.
Ottimale per selezione adattiva di modelli ML.
"""
def __init__(self, models: List[ModelArm]):
self.models = models
self.selection_history = []
def select_model(self) -> Tuple[int, ModelArm]:
"""
Seleziona il modello campionando dalle distribuzioni Beta.
Il modello con il sample più alto viene selezionato.
"""
samples = [arm.sample() for arm in self.models]
best_idx = int(np.argmax(samples))
self.selection_history.append(best_idx)
return best_idx, self.models[best_idx]
def update(self, arm_idx: int, reward: float):
"""Aggiorna la distribuzione del braccio selezionato."""
self.models[arm_idx].update(reward)
def get_traffic_allocation(self) -> dict:
"""
Stima la distribuzione del traffico corrente
basata sulla storia delle selezioni recenti.
"""
if not self.selection_history:
return {arm.name: 1/len(self.models) for arm in self.models}
recent = self.selection_history[-1000:] # ultime 1000 selezioni
total = len(recent)
allocation = {}
for i, arm in enumerate(self.models):
allocation[arm.name] = recent.count(i) / total
return allocation
def get_status(self) -> dict:
"""Ritorna lo stato corrente del bandit."""
return {
"models": [
{
"name": arm.name,
"estimated_rate": round(arm.estimated_success_rate, 4),
"alpha": arm.alpha,
"beta": arm.beta,
"observations": arm.total_observations
}
for arm in self.models
],
"traffic_allocation": self.get_traffic_allocation(),
"total_selections": len(self.selection_history)
}
def check_convergence(self, min_observations: int = 500) -> Optional[str]:
"""
Verifica se il bandit e convergito verso un vincitore chiaro.
Restituisce il nome del modello vincitore o None se ancora incerto.
"""
for arm in self.models:
if arm.total_observations < min_observations:
return None # Non abbastanza dati
# Controlla se un modello domina chiaramente
rates = [(arm.name, arm.estimated_success_rate) for arm in self.models]
rates.sort(key=lambda x: x[1], reverse=True)
best_name, best_rate = rates[0]
second_name, second_rate = rates[1]
# Margine di 3% di distanza per dichiarare un vincitore
if best_rate - second_rate > 0.03:
logger.info(f"Bandit converged: {best_name} wins ({best_rate:.2%} vs {second_rate:.2%})")
return best_name
return None
# --- Esempio di utilizzo ---
models = [
ModelArm(name="churn-model-v2", endpoint="http://model-v2:8080/predict"),
ModelArm(name="churn-model-v3", endpoint="http://model-v3:8080/predict"),
]
bandit = ThompsonSamplingBandit(models)
# Simulazione di 1000 interazioni
np.random.seed(42)
true_rates = {"churn-model-v2": 0.72, "churn-model-v3": 0.78} # v3 e migliore
for i in range(1000):
arm_idx, selected_model = bandit.select_model()
# Simula outcome (in produzione viene dal feedback reale)
reward = float(np.random.random() < true_rates[selected_model.name])
bandit.update(arm_idx, reward)
if (i + 1) % 200 == 0:
status = bandit.get_status()
print(f"\nStep {i+1}:")
for m in status["models"]:
print(f" {m['name']}: rate={m['estimated_rate']:.3f}, obs={m['observations']}")
winner = bandit.check_convergence(min_observations=100)
if winner:
print(f" => WINNER: {winner}")
統計分析: p 値、信頼区間、および効果量
テスト期間の終了時には、統計分析によって次の 3 つの異なる質問に答える必要があります。 観察された差は統計的に有意ですか?効果はどれくらいですか? その効果はビジネスに実質的に関連していますか?
# statistical_analysis.py
# Analisi statistica dei risultati di un A/B test ML
import numpy as np
import pandas as pd
from scipy import stats
from scipy.stats import norm, t
import math
from typing import Tuple, Optional
def analyze_ab_test_results(
conversions_a: int,
total_a: int,
conversions_b: int,
total_b: int,
alpha: float = 0.05
) -> dict:
"""
Analisi statistica completa di un A/B test su proporzioni.
Returns:
Dizionario con tutti i risultati statistici
"""
p_a = conversions_a / total_a
p_b = conversions_b / total_b
# --- Test z per differenza di proporzioni ---
# Pooled proportion sotto H0 (le due proporzioni sono uguali)
p_pool = (conversions_a + conversions_b) / (total_a + total_b)
se_pool = math.sqrt(p_pool * (1 - p_pool) * (1/total_a + 1/total_b))
z_statistic = (p_b - p_a) / se_pool
p_value = 2 * (1 - norm.cdf(abs(z_statistic))) # two-tailed
# --- Confidence Interval per la differenza ---
se_diff = math.sqrt(p_a * (1-p_a)/total_a + p_b * (1-p_b)/total_b)
z_critical = norm.ppf(1 - alpha/2)
diff = p_b - p_a
ci_lower = diff - z_critical * se_diff
ci_upper = diff + z_critical * se_diff
# --- Effect size (Cohen's h per proporzioni) ---
phi_a = 2 * math.asin(math.sqrt(p_a))
phi_b = 2 * math.asin(math.sqrt(p_b))
cohens_h = phi_b - phi_a
effect_magnitude = (
"negligible" if abs(cohens_h) < 0.2
else "small" if abs(cohens_h) < 0.5
else "medium" if abs(cohens_h) < 0.8
else "large"
)
# --- Relative lift ---
relative_lift = (p_b - p_a) / p_a if p_a > 0 else 0
# --- Potenza statistica osservata ---
z_beta = (abs(z_statistic) - z_critical)
observed_power = norm.cdf(z_beta)
is_significant = p_value < alpha
return {
"variant_a": {
"conversions": conversions_a,
"total": total_a,
"rate": round(p_a, 4),
"rate_pct": f"{p_a:.2%}"
},
"variant_b": {
"conversions": conversions_b,
"total": total_b,
"rate": round(p_b, 4),
"rate_pct": f"{p_b:.2%}"
},
"difference": {
"absolute": round(diff, 4),
"relative_lift": round(relative_lift, 4),
"relative_lift_pct": f"{relative_lift:.2%}",
"confidence_interval_95": (round(ci_lower, 4), round(ci_upper, 4))
},
"statistics": {
"z_statistic": round(z_statistic, 4),
"p_value": round(p_value, 6),
"is_significant": is_significant,
"alpha": alpha,
"cohens_h": round(cohens_h, 4),
"effect_magnitude": effect_magnitude,
"observed_power": round(observed_power, 4)
},
"conclusion": (
f"Modello B e statisticamente migliore (p={p_value:.4f}, lift={relative_lift:.2%})"
if is_significant and diff > 0
else f"Nessuna differenza significativa rilevata (p={p_value:.4f})"
)
}
# --- Esempio pratico ---
results = analyze_ab_test_results(
conversions_a=1380, # Modello A: 1380 churn evitati
total_a=8500, # su 8500 utenti a rischio
conversions_b=1545, # Modello B: 1545 churn evitati
total_b=8200 # su 8200 utenti
)
print("=== RISULTATI A/B TEST ===")
print(f"Modello A: {results['variant_a']['rate_pct']} retention rate")
print(f"Modello B: {results['variant_b']['rate_pct']} retention rate")
print(f"Lift relativo: {results['difference']['relative_lift_pct']}")
print(f"CI 95%: {results['difference']['confidence_interval_95']}")
print(f"p-value: {results['statistics']['p_value']}")
print(f"Significativo: {results['statistics']['is_significant']}")
print(f"Effect size: {results['statistics']['effect_magnitude']} (h={results['statistics']['cohens_h']})")
print(f"\nConclusione: {results['conclusion']}")
ベイジアン A/B テスト
頻度主義の p 値アプローチには既知の制限があります。p 値は確率ではありません。 モデル B の方が優れているということ (そして、このような極端なデータが観察される確率) H0 が true の場合)。アプローチ ベイジアン 直接反応する 私たちが興味を持っている質問: モデル B が A よりも優れている確率はどれくらいですか、 そしてどれくらいですか?
ベイジアン アプローチでは、テストが限界に達したときにテストを停止することもできます。 モデルが最適である確率が十分に高い (例: 95%)、 頻繁に起こりがちな覗き見の問題もありません。
# bayesian_ab_test.py
# A/B Testing Bayesiano per modelli ML
import numpy as np
from scipy import stats
from scipy.stats import beta as beta_dist
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
def bayesian_ab_test(
successes_a: int, trials_a: int,
successes_b: int, trials_b: int,
prior_alpha: float = 1.0,
prior_beta: float = 1.0,
n_samples: int = 100_000,
credible_interval: float = 0.95
) -> dict:
"""
A/B test bayesiano usando distribuzione Beta come prior/posterior.
Modella il tasso di successo come Beta(alpha, beta).
Returns:
Risultati con probabilità che B > A e credible intervals
"""
# Aggiorna i prior con i dati osservati (prior Beta + dati binomiali = posterior Beta)
alpha_a = prior_alpha + successes_a
beta_a = prior_beta + (trials_a - successes_a)
alpha_b = prior_alpha + successes_b
beta_b = prior_beta + (trials_b - successes_b)
# Campiona dalle distribuzioni posterior
samples_a = np.random.beta(alpha_a, beta_a, n_samples)
samples_b = np.random.beta(alpha_b, beta_b, n_samples)
# Probabilità che B sia migliore di A
prob_b_better = np.mean(samples_b > samples_a)
# Distribuzione del lift relativo
lift_samples = (samples_b - samples_a) / samples_a
lift_mean = np.mean(lift_samples)
lift_std = np.std(lift_samples)
# Credible interval per il lift
ci_lower = float(np.percentile(lift_samples, (1 - credible_interval) / 2 * 100))
ci_upper = float(np.percentile(lift_samples, (1 - (1 - credible_interval) / 2) * 100))
# Probabilità di un lift minimo (es. almeno +2%)
prob_lift_2pct = np.mean(lift_samples > 0.02)
# Expected loss: quanto perdiamo se scegliamo il modello sbagliato
expected_loss_a = np.mean(np.maximum(samples_b - samples_a, 0)) # perdita se scegliamo A
expected_loss_b = np.mean(np.maximum(samples_a - samples_b, 0)) # perdita se scegliamo B
return {
"posterior_a": {"alpha": alpha_a, "beta": beta_a, "mean": alpha_a/(alpha_a+beta_a)},
"posterior_b": {"alpha": alpha_b, "beta": beta_b, "mean": alpha_b/(alpha_b+beta_b)},
"prob_b_better_than_a": round(float(prob_b_better), 4),
"lift": {
"mean": round(float(lift_mean), 4),
"std": round(float(lift_std), 4),
f"credible_interval_{int(credible_interval*100)}pct": (
round(ci_lower, 4), round(ci_upper, 4)
),
"prob_lift_above_2pct": round(float(prob_lift_2pct), 4)
},
"expected_loss": {
"choose_a": round(float(expected_loss_a), 6),
"choose_b": round(float(expected_loss_b), 6),
"recommended_choice": "B" if expected_loss_b < expected_loss_a else "A"
},
"decision": (
"Scegli B" if prob_b_better > 0.95
else "Scegli A" if prob_b_better < 0.05
else f"Incerto (P(B>A) = {prob_b_better:.1%}) - continua a raccogliere dati"
)
}
# --- Esempio ---
result = bayesian_ab_test(
successes_a=1380, trials_a=8500,
successes_b=1545, trials_b=8200,
credible_interval=0.95
)
print("=== A/B TEST BAYESIANO ===")
print(f"P(B > A) = {result['prob_b_better_than_a']:.1%}")
print(f"Lift medio: {result['lift']['mean']:.2%}")
print(f"Credible interval 95%: {result['lift']['credible_interval_95pct']}")
print(f"P(lift > 2%): {result['lift']['prob_lift_above_2pct']:.1%}")
print(f"Expected loss se scegli A: {result['expected_loss']['choose_a']:.6f}")
print(f"Expected loss se scegli B: {result['expected_loss']['choose_b']:.6f}")
print(f"Decisione: {result['decision']}")
Prometheus と Grafana によるテスト中のモニタリング
実稼働環境でのアクティブな A/B テストは継続的に監視する必要があります。十分ではありません テストが終了するまで待って結果を分析します。両方の結果が保証される必要があります。 バリアントは技術レベル (遅延、エラー率、可用性) で正しく動作します。 そして、ビジネス指標が当初の期待と一致していること。
# ab_test_monitoring.yml
# Dashboard Grafana per A/B test ML - configurazione panel
# Esempio di PromQL queries per i panel Grafana:
# 1. Distribuzione del traffico tra varianti (dovrebbe essere ~50/50)
# sum by (variant) (rate(ab_test_requests_total[5m]))
# 2. Latenza P95 per variante
# histogram_quantile(0.95, sum by (variant, le) (rate(ab_test_latency_seconds_bucket[5m])))
# 3. Error rate per variante
# sum by (variant) (rate(ab_test_errors_total[5m])) /
# sum by (variant) (rate(ab_test_requests_total[5m]))
# 4. Distribuzione delle predizioni per variante (prediction drift indicator)
# sum by (variant, prediction_bucket) (rate(ab_test_predictions_total[1h]))
---
# prometheus_ab_alerts.yml
groups:
- name: ab_test_alerts
rules:
# Alert se il traffico non e bilanciato (sbilanciamento > 10%)
- alert: ABTestTrafficImbalance
expr: |
abs(
sum(rate(ab_test_requests_total{variant="A"}[10m]))
/
sum(rate(ab_test_requests_total[10m]))
- 0.5
) > 0.10
for: 5m
labels:
severity: warning
annotations:
summary: "A/B test traffic imbalance detected"
description: "Traffic split deviates more than 10% from 50/50"
# Alert se error rate variante B supera il doppio di A
- alert: ABTestVariantBHighErrors
expr: |
(
sum(rate(ab_test_errors_total{variant="B"}[5m]))
/
sum(rate(ab_test_requests_total{variant="B"}[5m]))
) > 2 * (
sum(rate(ab_test_errors_total{variant="A"}[5m]))
/
sum(rate(ab_test_requests_total{variant="A"}[5m]))
)
for: 10m
labels:
severity: critical
annotations:
summary: "Variant B has significantly higher error rate than A"
description: "Consider rolling back variant B"
# Alert se latenza P95 di B supera 200ms più di A
- alert: ABTestVariantBHighLatency
expr: |
(
histogram_quantile(0.95, sum by (le) (
rate(ab_test_latency_seconds_bucket{variant="B"}[5m])
))
-
histogram_quantile(0.95, sum by (le) (
rate(ab_test_latency_seconds_bucket{variant="A"}[5m])
))
) > 0.2
for: 5m
labels:
severity: warning
annotations:
summary: "Variant B P95 latency is 200ms+ higher than A"
中小企業向け予算 <5,000 ユーロ/年: 完全な A/B スタック テスト
ML モデルの完全な A/B テスト システムには企業の予算は必要ありません。 オープンソース スタックと小規模 VPS を使用すると、必要なものがすべて手に入ります。
- FastAPI ルーター + Python の統計: オープンソース、無料
- プロメテウス + グラファナ: オープンソース、無料
- ホスティング用 VPS (Hetzner/OVH): 20~40 ユーロ/月 (240~480 ユーロ/年)
- 機能フラグ サービス (Unleash セルフホスト型): オープンソース、無料
- モデル レジストリの MLflow: オープンソース、無料
- 推定インフラストラクチャの合計: 300~600ユーロ/年
ベストプラクティスとアンチパターン
実験前のチェックリスト
- 開始する前にサンプル サイズを計算します。 「までは決してテストを受けないでください」 何も面白いものは見当たりません。」サンプルサイズは固定されており、交渉の余地はありません。
- プライマリ メトリックを 1 つ定義します。 2 つの指標に合わせて最適化する 同時に決定を曖昧にします。ガードレール メトリクスが存在するのは、 勝者を選出することではなく、後退を防ぐことです。
- 割り当ての有効性をテストします。 実際のテストを開始する前に、 A/A テスト (両方のバリエーションで同じモデル) を実行して、そうでないことを確認します。 割り当てには人為的な違いを引き起こすバグがあります。
- 仮説を文書化します。 モデル B を期待する理由に注目してください どれくらい優れているか。これにより、「答えを見たので今すぐ」というバイアスが回避されます 説明を考えてみます。」
- 新しいユーザーを個別に確認します。 新規ユーザーにはありません どのモデルにも以前の履歴がなく、動作が異なります。分析する それらの結果は個別に示します。
絶対に避けるべきアンチパターン
- 継続的な覗き見: 毎日結果をチェックする 最初の統計的有意性でテストを停止すると偽陽性が増加します 最大30%。早期に停止する必要がある場合は、逐次テスト (SPRT) を使用してください。
- HARKing (結果がわかった後に仮説を立てる): 分析する データから大きな違いを見つけてストーリーを伝える あたかもそれがアプリオリに仮説されていたかのように。 20 個のセグメントをテストすると、1 個のセグメントが際立ちます α = 0.05 で偶然にのみ有意です。
- メトリックの差異を無視する: のようないくつかの指標 ユーザーあたりの収益には非常に重いキューが必要です。 1 人のクジラ ユーザーがこれを行うことができます 存在しない効果が重要であるように見えます。ブートストラップまたは非テストを使用する 非ガウス分布のメトリクスのパラメトリック。
- テストが短すぎます: 毎週の効果 ( 月曜日のみのサービス)とノベルティ効果(ユーザーの反応が良い) 1 ~ 2 日間新規性を認めた後、ベースラインに戻ります)少なくとも以下のテストが必要です 補償期間は2週間。
- ML システムのフィードバック ループ: フィードバックループを備えたシステムで (ユーザーの行動を変える推奨事項)、 2 つのバリアントは独立していません。この相関関係を明示的にモデル化します。
いつどのアプローチを使用するか
戦略を選択するためのガイド
- シャドウモード: モデルが完全に新しい場合に使用します。 まだ検証されていない場合、またはバグのリスクが高すぎる場合。そしていつも、 実際のユーザーによるテストの前の最初のステップ。
- カナリア展開: 運用リスクを軽減したい場合に使用します 新しい展開の。重要なモデル (詐欺、価格設定) に最適です。 後退は直ちに経済的な影響を与えるでしょう。
- クラシック A/B テスト (50/50): 効果を測定したい場合に使用します 最大限の統計力と低い運用リスクを備えたビジネス。 十分なサンプル サイズと高速フィードバック ループが必要です。
- 多腕の盗賊: フィードバックが早い場合(数時間または数日以内)に使用します。 探索コストが高いため、コンバージョンを最大化したいと考えている テスト中。フィードバックが遅い小さなエフェクトには理想的ではありません。
- ベイジアン A/B: いつでも柔軟な停止ルールを使用できます。 確率を直接解釈するか、実験からの事前情報を得る 前例。 p 値がわかりにくいと感じるチームに最適です。
結論と次のステップ
ML モデルの A/B テストは、単純なトラフィック分割をはるかに超えています。 各実装、選択の前に厳密な統計的設計が必要です 状況に基づいた適切な戦略 (シャドウ、カナリア、50/50、バンディット)、 テスト中の継続的なモニタリングと、終了時の正しい統計分析。
A/B テストを正しく行うチームと不十分に行うチームの違い それはコードの複雑さではなく、プロセスの規律にあります。 最初に仮説を立て、途中でデータを見ずに、後ですべてを正しく分析します。 このガイドで説明されているオープンソース スタック (FastAPI、Prometheus、Grafana、 scipy、numpy) を使用すると、最小限の予算で運用グレードのシステムを実装できます。
自然な次のステップは、A/B テストと ML ガバナンスを統合することです。 モデルを運用環境に移行するためのすべての決定は文書化する必要があります。 監査可能であり、倫理基準および規制基準に準拠しています。記事の中で見ていきます 次に ML ガバナンスについて説明します。
MLOps シリーズは継続します
- 前の記事: Kubernetes での ML のスケーリング - KubeFlow と Seldon Core を使用してデプロイメントを調整します
- 次の記事: ML ガバナンス: コンプライアンス、監査、倫理 - EU 法 AI、説明可能性と公平性
- 関連している: モデルのドリフト検出と自動再トレーニング - モデルの劣化を検出して対応する
- 関連している: サービス提供モデル: FastAPI + Uvicorn の実稼働環境 - スケーラブルな推論 API を構築する
- 関連シリーズ: 高度なディープラーニング - 複雑なニューラル モデルの A/B テスト







