ケーススタディ: 本番環境における MLOps - ゼロから完全なパイプラインまで
概要から基本概念、CI/CD パイプラインまで、MLOps シリーズ全体をカバーしました。 GitHub Actions と Docker、DVC によるバージョン管理、MLflow による実験追跡、ドリフト検出、 FastAPI での提供、Kubernetes でのスケーリング、A/B テスト、AI 法によるガバナンスまで。 今こそ、実際の具体的なケースにすべてをまとめるべき時です。
この事例ではシステムを一から構築します チャーン予測 のために すべての MLOps 原則に従う架空の電気通信会社 (TelecomIT S.p.A.) シリーズで学びました。ビジネス上の問題からスタートし、エンドツーエンドのアーキテクチャを設計し、 動作する Python コードを使用して各コンポーネントを実装し、監視および監視する方法を見ていきます。 システムを長期にわたって運用状態に保ちます。最終結果は MLOps パイプラインになります 完全で再現可能で、実際の環境に対応できます。
何を学ぶか
- ビジネス上の問題をエンドツーエンドの MLOps アーキテクチャに変換する方法
- チャーン予測のための特定の特徴エンジニアリング (RFM、時間的動作)
- 完全な DVC パイプライン: データ、前処理、トレーニング、評価
- MLflow による構造化された実験追跡と体系的なモデル比較
- FastAPI + Uvicorn で提供されるモデルはコンテナ化され、Kubernetes に対応可能です
- Prometheus + Grafana によるリアルタイム監視: ドリフト、レイテンシー、ビジネス KPI
- チャレンジャーモデルとチャンピオンモデル間の統計的に厳密なA/Bテスト
- 完全なガバナンス: モデルカード、監査証跡、導入前の公平性チェック
- イタリアの中小企業向け MLOps プロジェクトの実際のコストと ROI の見積もり
ビジネス上の問題: 通信サービスのチャーン
TelecomIT S.p.A. は、210 万人のアクティブな顧客を持つ電気通信事業者です。 月間解約率は 2.3% で、これは毎月約 48,000 人の顧客を失っていることを意味します。 新規顧客を獲得するための平均コストは 180 ユーロですが、 既存顧客の維持と 35 ユーロ。離脱のリスクがある顧客を特定する 前に 彼らが去ることは、利用可能な最も強力な経済手段の 1 つです。
プロジェクトの経済的価値
リスクのある解約者の 70% を正確に特定するモデルを使用 (再現率 = 0.70) 精度は 65% (誤検知は 35%、顧客は不必要に連絡)、 月間 48,000 人の解約者:
- チャーン者は次のように特定しました。 48,000 × 0.70 = 33,600
- 維持キャンペーン費用: 33,600 / 0.65 × 35 ユーロ = ~180 万ユーロ/月
- 回避された取得コストの削減: 33,600 × 0.40 × 180 ユーロ = ~240 万ユーロ/月
- 推定純ROI: +60万ユーロ/月、または720万ユーロ/年
このパイプラインの MLOps インフラストラクチャのコスト: クラウドで約 3,500 ユーロ/年。
技術的要件とビジネス要件
コードを記述する前に、アーキテクチャの各選択の指針となる具体的な要件を定義します。 これらの要件は、ビジネス チームとの会話や技術的な背景から生じます。 存在する。
| 要件 | 仕様 | 制約 |
|---|---|---|
| 予測頻度 | 毎月のバッチ、すべてのアクティブな顧客のスコアリング | CRMキャンペーンは毎月1日まで |
| バッチスコアリングのレイテンシー | 4 時間以内に 210 万人の顧客 | CRMスロット時間: 02:00-06:00 |
| リアルタイム遅延 | コールセンターエージェントの単一顧客スコアリング | < 200ms p99 |
| 目標指標 | AUC-ROC ≥ 0.80、リコール ≥ 0.65 | 月次ホールドアウトセットで検証済み |
| 再訓練 | 毎月、新しいデータで自動 | ドリフト検出時にもトリガー |
| ガバナンス | モデルカード、監査証跡、公平性チェック | AI 法: リスクは限定的 (電気通信分野) |
| スタック | 100% オープンソース、オンプレミス + ハイブリッド クラウド | インフラストラクチャ予算: < 5,000 ユーロ/年 |
エンドツーエンドのアーキテクチャ
システム アーキテクチャでは、すべての MLOps シリーズ ツールが一貫したフローに統合されています。 各コンポーネントには正確な役割があり、明確に定義されたインターフェイスを通じて他のコンポーネントと通信します。 指針となる原則は、 責任の分離: データ、トレーニング 他のレイヤーに影響を与えることなく更新できる独立したレイヤーでライブで提供されます。
+------------------+ +------------------+ +-------------------+
| DATA SOURCES | | DATA PIPELINE | | TRAINING LAYER |
| | | | | |
| - CRM Database |-->| - DVC Pipeline |-->| - MLflow Tracking |
| - CDR (Call Data)| | - Great Expects | | - XGBoost/RF/LGB |
| - Billing System | | - Feature Store | | - Hyperopt Tuning |
| - App Usage Logs | | - DVC Remote | | - Model Registry |
+------------------+ +------------------+ +-------------------+
|
(model promoted)
|
+------------------+ +------------------+ +-------------------+
| MONITORING | | SERVING | | CI/CD PIPELINE |
| | | | | |
| - Prometheus |<--| - FastAPI REST |<--| - GitHub Actions |
| - Grafana Dash | | - Batch Scorer | | - Docker Build |
| - Drift Alerts | | - Kubernetes | | - Auto-tests |
| - Retrain Trig. | | - Load Balancer | | - Auto-deploy |
+------------------+ +------------------+ +-------------------+
|
+------+-------+
| GOVERNANCE |
| |
| - Model Card |
| - Audit Log |
| - Fairness |
+--------------+
完全なテクノロジースタック
| レイヤー | ツール | バージョン | 料金 |
|---|---|---|---|
| データのバージョン管理 | DVC + MinIO (S3 互換) | DVC 3.x | オープンソース |
| 実験の追跡 | MLflow セルフホスト型 | MLflow 2.x | オープンソース |
| データの検証 | 大きな期待 | GX0.18 | オープンソース |
| CI/CD | GitHub アクション | - | 無料枠 |
| コンテナ化 | Docker + Docker Compose | ドッカー 24.x | オープンソース |
| モデルの提供 | FastAPI + ユビコーン | FastAPI 0.110 | オープンソース |
| オーケストレーション | Kubernetes (k3s) | k3s 1.28 | オープンソース |
| 監視 | プロメテウス + グラファナ | プロム 2.47 | オープンソース |
| ドリフト検出 | 明らかにAI | 0.4.x | オープンソース |
| ガバナンス・公平性 | フェアラーン + SHAP | フロリダ0.10 | オープンソース |
| ML フレームワーク | XGBoost + LightGBM + scikit-learn | XGB2.0 | オープンソース |
| クラウドインフラストラクチャ | Hetzner クラウド (VPS 2 コア / 4GB RAM) | - | ~360 ユーロ/年 |
リポジトリの構造と構成
適切に整理されたリポジトリ構造は、すべての MLOps プロジェクトの基盤です メンテナンス可能。次の構造では、コード、構成、テスト、およびテストが明確に分離されています。 高凝集性と低結合性の原則に従って、ドキュメントを作成します。
telecomit-churn/
├── .github/
│ └── workflows/
│ ├── ml-pipeline.yml # Pipeline CI/CD principale
│ ├── retrain-trigger.yml # Trigger retraining automatico
│ └── pr-validation.yml # Validazione PR
├── config/
│ ├── model_config.yaml # Iperparametri e configurazione modello
│ ├── feature_config.yaml # Feature set e preprocessing
│ └── serving_config.yaml # Configurazione FastAPI
├── data/
│ ├── raw/ # Dati grezzi (tracciati da DVC)
│ ├── processed/ # Dati preprocessati (tracciati da DVC)
│ └── features/ # Feature engineered (tracciati da DVC)
├── models/
│ └── registry/ # Modelli registrati (tracciati da DVC)
├── src/
│ ├── data/
│ │ ├── ingestion.py # Estrazione dati da CRM/CDR
│ │ ├── validation.py # Great Expectations checks
│ │ └── preprocessing.py # Pulizia e trasformazione
│ ├── features/
│ │ ├── rfm_features.py # Feature RFM (Recency, Frequency, Monetary)
│ │ ├── behavioral.py # Feature comportamentali
│ │ └── feature_store.py # Feature store locale
│ ├── training/
│ │ ├── train.py # Training loop con MLflow
│ │ ├── evaluate.py # Evaluation e confronto col production model
│ │ └── hyperopt_search.py # Tuning iperparametri
│ ├── serving/
│ │ ├── api.py # FastAPI app
│ │ ├── batch_scorer.py # Scoring batch mensile
│ │ └── middleware.py # Logging, rate limiting, metrics
│ ├── monitoring/
│ │ ├── drift_detector.py # Evidently drift detection
│ │ ├── metrics_exporter.py # Prometheus metrics
│ │ └── alert_manager.py # Alerting e trigger retraining
│ └── governance/
│ ├── model_card.py # Generatore model card
│ ├── fairness_checker.py # Analisi fairness con Fairlearn
│ └── audit_logger.py # Audit trail immodificabile
├── tests/
│ ├── unit/ # Test unitari per ogni modulo
│ ├── integration/ # Test integrazione pipeline
│ └── smoke/ # Smoke test API post-deploy
├── dvc.yaml # Pipeline DVC stages
├── params.yaml # Parametri DVC
├── docker/
│ ├── Dockerfile.training # Immagine training
│ ├── Dockerfile.serving # Immagine serving
│ └── docker-compose.yml # Stack locale completo
├── k8s/
│ ├── deployment.yaml # Kubernetes deployment
│ ├── service.yaml # Kubernetes service
│ └── hpa.yaml # Horizontal Pod Autoscaler
└── notebooks/
└── exploration/ # Notebook EDA (non in pipeline)
チャーン予測のための特徴エンジニアリング
多くの場合、特徴量エンジニアリングが平凡なモデルと優れたモデルの違いになります。 通信分野における解約予測の場合、最も予測可能な機能は次のとおりです。 時間の経過に伴う顧客の行動。フレームワークを使用します RFM (最新性、頻度、金額) を出発点として、豊富な機能を追加 通信ドメインに特有の動作特性。
# rfm_features.py
# Feature engineering per churn prediction nel settore telecom
# Produce un dataset tabellare con 45 feature predittive
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Tuple
import mlflow
def compute_rfm_features(
df_transactions: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Calcola le feature RFM (Recency, Frequency, Monetary) per ogni cliente.
Args:
df_transactions: DataFrame con transazioni/ricariche
snapshot_date: Data di riferimento per il calcolo
customer_id_col: Nome colonna identificativo cliente
Returns:
DataFrame con una riga per cliente e le feature RFM
"""
df = df_transactions.copy()
df["transaction_date"] = pd.to_datetime(df["transaction_date"])
rfm = df.groupby(customer_id_col).agg(
recency_days=("transaction_date", lambda x: (snapshot_date - x.max()).days),
frequency=("transaction_date", "count"),
monetary_total=("amount", "sum"),
monetary_avg=("amount", "mean"),
monetary_std=("amount", "std"),
first_transaction=("transaction_date", "min"),
last_transaction=("transaction_date", "max"),
).reset_index()
# Tenure in giorni (durata rapporto col cliente)
rfm["tenure_days"] = (snapshot_date - rfm["first_transaction"]).dt.days
# Variazione dell'attivita (ultimi 30 gg vs media storica)
last_30_days = snapshot_date - timedelta(days=30)
recent = df[df["transaction_date"] >= last_30_days].groupby(customer_id_col).agg(
recent_frequency=("transaction_date", "count"),
recent_monetary=("amount", "sum")
).reset_index()
rfm = rfm.merge(recent, on=customer_id_col, how="left")
rfm["recent_frequency"] = rfm["recent_frequency"].fillna(0)
rfm["recent_monetary"] = rfm["recent_monetary"].fillna(0)
# Trend: rapporto attivita recente vs attivita media mensile
avg_monthly_freq = rfm["frequency"] / (rfm["tenure_days"] / 30).clip(lower=1)
rfm["activity_trend"] = rfm["recent_frequency"] / avg_monthly_freq.clip(lower=0.01)
return rfm
def compute_behavioral_features(
df_cdr: pd.DataFrame,
df_support: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Feature comportamentali da CDR (call detail records) e ticket di supporto.
Queste feature catturano segnali predittivi di churn specifici del dominio telecom.
"""
# ---- Feature CDR: comportamento nelle chiamate ----
cdr_30d = df_cdr[
df_cdr["call_date"] >= snapshot_date - timedelta(days=30)
]
cdr_features = cdr_30d.groupby(customer_id_col).agg(
calls_count_30d=("call_id", "count"),
calls_duration_avg_30d=("duration_seconds", "mean"),
calls_duration_total_30d=("duration_seconds", "sum"),
data_usage_gb_30d=("data_usage_mb", lambda x: x.sum() / 1024),
international_calls_ratio=("is_international", "mean"),
peak_hour_ratio=("is_peak_hour", "mean"),
unique_contacts_30d=("called_number_hash", "nunique"),
).reset_index()
# ---- Feature supporto: interazioni negative ----
support_30d = df_support[
df_support["ticket_date"] >= snapshot_date - timedelta(days=90)
]
support_features = support_30d.groupby(customer_id_col).agg(
support_tickets_90d=("ticket_id", "count"),
support_escalations_90d=("is_escalated", "sum"),
avg_resolution_time_h=("resolution_hours", "mean"),
complaint_ratio=("is_complaint", "mean"),
unresolved_tickets=("is_resolved", lambda x: (~x).sum()),
).reset_index()
behavioral = cdr_features.merge(support_features, on=customer_id_col, how="left")
behavioral = behavioral.fillna(0)
# Feature derivata: "frustration index" - proxy di insoddisfazione
behavioral["frustration_index"] = (
behavioral["support_tickets_90d"] * 2 +
behavioral["support_escalations_90d"] * 5 +
behavioral["unresolved_tickets"] * 3
).clip(upper=20) / 20 # Normalizzato 0-1
return behavioral
def build_feature_matrix(
df_customers: pd.DataFrame,
df_rfm: pd.DataFrame,
df_behavioral: pd.DataFrame,
label_col: str = "churned"
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Assembla la matrice delle feature finale unendo tutte le sorgenti.
Returns:
(X, y) - feature matrix e label vector
"""
features = df_customers[[
"customer_id", "contract_type", "payment_method",
"age_group", "region", "plan_monthly_eur", label_col
]].merge(df_rfm, on="customer_id", how="left") \
.merge(df_behavioral, on="customer_id", how="left")
# Encoding categoriche
cat_cols = ["contract_type", "payment_method", "age_group", "region"]
features = pd.get_dummies(features, columns=cat_cols, drop_first=True)
y = features[label_col].astype(int)
X = features.drop(columns=["customer_id", label_col,
"first_transaction", "last_transaction"])
return X, y
DVC パイプライン: エンドツーエンドの再現性
DVC パイプラインは、プロセスの各ステージを有向非巡回グラフ (DAG) として定義します。 各ステージは、独自の入力 (deps)、出力 (outs)、およびパラメーターを宣言します。 DVC は、 依存関係を保持し、変更によって無効になったステージのみを自動的に再実行します。 Make と似ていますが、データとモデルを含む ML パイプラインが対象です。
stages:
# Stage 1: Ingestion - estrae dati da CRM/CDR e salva snapshot
data_ingestion:
cmd: python src/data/ingestion.py
deps:
- src/data/ingestion.py
- config/feature_config.yaml
params:
- snapshot_date
- data.source_db_table
outs:
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
metrics:
- data/raw/ingestion_report.json:
cache: false
# Stage 2: Validation - Great Expectations checks sui dati grezzi
data_validation:
cmd: python src/data/validation.py
deps:
- src/data/validation.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
params:
- validation.min_rows
- validation.max_null_ratio
outs:
- data/raw/validation_report.html:
cache: false
metrics:
- data/raw/validation_summary.json:
cache: false
# Stage 3: Preprocessing - pulizia e trasformazione
preprocessing:
cmd: python src/data/preprocessing.py
deps:
- src/data/preprocessing.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
- config/feature_config.yaml
params:
- preprocessing.imputation_strategy
- preprocessing.outlier_threshold
outs:
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
# Stage 4: Feature Engineering - RFM + behavioral
feature_engineering:
cmd: python src/features/rfm_features.py
deps:
- src/features/rfm_features.py
- src/features/behavioral.py
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
params:
- features.lookback_days
- features.rfm_bins
outs:
- data/features/feature_matrix.parquet
- data/features/feature_names.json
# Stage 5: Training - XGBoost con MLflow tracking
training:
cmd: python src/training/train.py
deps:
- src/training/train.py
- data/features/feature_matrix.parquet
- config/model_config.yaml
params:
- model.algorithm
- model.n_estimators
- model.max_depth
- model.learning_rate
- model.subsample
- model.scale_pos_weight
- training.test_size
- training.random_seed
outs:
- models/registry/challenger_model.pkl
- models/registry/preprocessor.pkl
metrics:
- models/registry/metrics.json:
cache: false
plots:
- models/registry/confusion_matrix.json:
cache: false
- models/registry/roc_curve.json:
cache: false
# Stage 6: Evaluation - confronto challenger vs champion
evaluation:
cmd: python src/training/evaluate.py
deps:
- src/training/evaluate.py
- models/registry/challenger_model.pkl
- data/features/feature_matrix.parquet
params:
- evaluation.min_auc_roc
- evaluation.min_recall
- evaluation.promotion_strategy
outs:
- models/registry/evaluation_report.json:
cache: false
- models/registry/model_card.json:
cache: false
MLflow を使用したトレーニング: 完全な実験追跡
トレーニング モジュールには、各実験を体系的に追跡するための MLflow が統合されています。 私たちが使用するのは XGブースト 優れたバランスによりメインアルゴリズムとして採用 早期停止メカニズムを使用して、表形式データのパフォーマンスと解釈可能性の間の調整を行う Hyperopt によるハイパーパラメータ検索。
# train.py
# Training pipeline per churn prediction con MLflow tracking
# Eseguito dalla pipeline DVC: python src/training/train.py
import os
import json
import pickle
import logging
from pathlib import Path
from typing import Dict, Tuple
import mlflow
import mlflow.xgboost
import pandas as pd
import numpy as np
import xgboost as xgb
import yaml
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
roc_auc_score, f1_score, precision_score,
recall_score, accuracy_score, confusion_matrix
)
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_config(config_path: str = "config/model_config.yaml") -> dict:
"""Carica configurazione modello da file YAML."""
with open(config_path, "r") as f:
return yaml.safe_load(f)
def load_features(features_path: str = "data/features/feature_matrix.parquet") -> Tuple:
"""Carica feature matrix e prepara train/val/test split."""
df = pd.read_parquet(features_path)
# Leggi i parametri DVC
with open("params.yaml") as f:
params = yaml.safe_load(f)
test_size = params["training"]["test_size"]
seed = params["training"]["random_seed"]
label_col = "churned"
X = df.drop(columns=[label_col])
y = df[label_col].astype(int)
# Stratified split per preservare il bilanciamento delle classi
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=seed, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_train, y_train, test_size=0.15, random_state=seed, stratify=y_train
)
logger.info(f"Train: {len(X_train)} | Val: {len(X_val)} | Test: {len(X_test)}")
logger.info(f"Churn rate - Train: {y_train.mean():.3f} | Test: {y_test.mean():.3f}")
return X_train, X_val, X_test, y_train, y_val, y_test
def train_xgboost(
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
params: dict
) -> xgb.XGBClassifier:
"""Addestra un modello XGBoost con early stopping."""
model = xgb.XGBClassifier(
n_estimators=params.get("n_estimators", 500),
max_depth=params.get("max_depth", 6),
learning_rate=params.get("learning_rate", 0.05),
subsample=params.get("subsample", 0.8),
colsample_bytree=params.get("colsample_bytree", 0.8),
scale_pos_weight=params.get("scale_pos_weight", 4.0), # Bilancia classi sbilanciate
min_child_weight=params.get("min_child_weight", 5),
reg_alpha=params.get("reg_alpha", 0.1),
reg_lambda=params.get("reg_lambda", 1.0),
use_label_encoder=False,
eval_metric="auc",
random_state=42,
n_jobs=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
verbose=100
)
return model
def compute_metrics(
model: xgb.XGBClassifier,
X: pd.DataFrame,
y: pd.Series,
threshold: float = 0.5
) -> Dict[str, float]:
"""Calcola metriche complete di valutazione."""
y_proba = model.predict_proba(X)[:, 1]
y_pred = (y_proba >= threshold).astype(int)
cm = confusion_matrix(y, y_pred)
return {
"auc_roc": roc_auc_score(y, y_proba),
"accuracy": accuracy_score(y, y_pred),
"precision": precision_score(y, y_pred, zero_division=0),
"recall": recall_score(y, y_pred, zero_division=0),
"f1": f1_score(y, y_pred, zero_division=0),
"tn": int(cm[0][0]),
"fp": int(cm[0][1]),
"fn": int(cm[1][0]),
"tp": int(cm[1][1]),
}
def hyperopt_search(
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
max_evals: int = 30
) -> dict:
"""Ricerca iperparametri con Hyperopt (Tree-structured Parzen Estimator)."""
space = {
"max_depth": hp.choice("max_depth", [4, 5, 6, 7, 8]),
"learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.2)),
"n_estimators": hp.choice("n_estimators", [200, 300, 500, 700]),
"subsample": hp.uniform("subsample", 0.6, 1.0),
"colsample_bytree": hp.uniform("colsample_bytree", 0.6, 1.0),
"scale_pos_weight": hp.uniform("scale_pos_weight", 2.0, 8.0),
"min_child_weight": hp.choice("min_child_weight", [3, 5, 7, 10]),
"reg_alpha": hp.loguniform("reg_alpha", np.log(0.01), np.log(1.0)),
}
def objective(params):
model = train_xgboost(X_train, y_train, X_val, y_val, params)
metrics = compute_metrics(model, X_val, y_val)
# Ottimizziamo su F1 per bilanciare precision e recall
return {"loss": -metrics["f1"], "status": STATUS_OK}
trials = Trials()
best = fmin(objective, space, algo=tpe.suggest, max_evals=max_evals, trials=trials)
return best
def run_training_pipeline():
"""Pipeline di training principale con MLflow tracking completo."""
config = load_config()
X_train, X_val, X_test, y_train, y_val, y_test = load_features()
# Configura MLflow
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"))
mlflow.set_experiment("telecomit-churn-prediction")
with mlflow.start_run(run_name=f"xgb-training-{pd.Timestamp.now().strftime('%Y%m%d-%H%M')}") as run:
# Log parametri di sistema
mlflow.log_params({
"algorithm": "XGBoostClassifier",
"train_size": len(X_train),
"val_size": len(X_val),
"test_size": len(X_test),
"churn_rate_train": float(y_train.mean()),
"feature_count": X_train.shape[1],
"hyperopt_evals": config.get("hyperopt_evals", 30),
})
# Ricerca iperparametri
logger.info("Avvio ricerca iperparametri con Hyperopt...")
best_params = hyperopt_search(X_train, y_train, X_val, y_val,
max_evals=config.get("hyperopt_evals", 30))
mlflow.log_params(best_params)
# Training finale con best params
logger.info("Training modello finale con migliori iperparametri...")
model = train_xgboost(X_train, y_train, X_val, y_val, best_params)
# Calcola e log metriche su tutti i set
val_metrics = compute_metrics(model, X_val, y_val)
test_metrics = compute_metrics(model, X_test, y_test)
mlflow.log_metrics({f"val_{k}": v for k, v in val_metrics.items()})
mlflow.log_metrics({f"test_{k}": v for k, v in test_metrics.items()})
logger.info(f"Test AUC-ROC: {test_metrics['auc_roc']:.4f}")
logger.info(f"Test Recall: {test_metrics['recall']:.4f}")
logger.info(f"Test F1: {test_metrics['f1']:.4f}")
# Log modello in MLflow model registry
mlflow.xgboost.log_model(
model,
artifact_path="model",
registered_model_name="telecomit-churn-xgb"
)
# Salva metriche per DVC
metrics_output = {
"auc_roc": test_metrics["auc_roc"],
"recall": test_metrics["recall"],
"precision": test_metrics["precision"],
"f1": test_metrics["f1"],
"run_id": run.info.run_id,
}
Path("models/registry").mkdir(parents=True, exist_ok=True)
with open("models/registry/metrics.json", "w") as f:
json.dump(metrics_output, f, indent=2)
# Salva modello per serving
with open("models/registry/challenger_model.pkl", "wb") as f:
pickle.dump(model, f)
logger.info(f"Training completato. MLflow run ID: {run.info.run_id}")
if __name__ == "__main__":
run_training_pipeline()
モデル提供: リアルタイムおよびバッチ用の FastAPI
サービス提供システムは 2 つのモードをサポートしています。 リアルタイム クエリ用 コール センター エージェントの数 (単一顧客、<200ms) e バッチ のために 顧客データベース全体の月次スコア (4 時間未満で 210 万)。両方のモード 一貫性を確保するために同じモデルと前処理ロジックを使用します。
# api.py
# FastAPI serving per churn prediction - real-time e batch
# Espone /predict (singolo), /predict/batch, /health, /metrics
import os
import pickle
import time
import logging
from contextlib import asynccontextmanager
from typing import List, Optional
from pathlib import Path
import mlflow
import pandas as pd
import numpy as np
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from prometheus_client import (
Counter, Histogram, Gauge,
generate_latest, CONTENT_TYPE_LATEST
)
from starlette.responses import Response
import uvicorn
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- Prometheus Metrics ----
PREDICTIONS_TOTAL = Counter(
"churn_predictions_total",
"Numero totale di predizioni",
["mode", "model_version"]
)
PREDICTION_DURATION = Histogram(
"churn_prediction_duration_seconds",
"Durata predizione in secondi",
["mode"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1.0, 5.0]
)
CHURN_PROBABILITY_GAUGE = Gauge(
"churn_prediction_avg_probability",
"Probabilità media di churn nelle ultime predizioni"
)
MODEL_VERSION_INFO = Gauge(
"model_version_info",
"Informazioni sulla versione del modello caricato",
["version", "run_id"]
)
# ---- Schema Pydantic ----
class CustomerFeatures(BaseModel):
"""Feature di un singolo cliente per la predizione real-time."""
customer_id: str = Field(..., description="ID univoco del cliente")
recency_days: float = Field(..., ge=0, le=3650, description="Giorni dall'ultima transazione")
frequency: int = Field(..., ge=0, description="Numero di transazioni storiche")
monetary_total: float = Field(..., ge=0, description="Valore monetario totale EUR")
monetary_avg: float = Field(..., ge=0)
tenure_days: int = Field(..., ge=0)
activity_trend: float = Field(..., ge=0)
calls_count_30d: int = Field(..., ge=0)
data_usage_gb_30d: float = Field(..., ge=0)
support_tickets_90d: int = Field(..., ge=0)
frustration_index: float = Field(..., ge=0.0, le=1.0)
plan_monthly_eur: float = Field(..., ge=0)
contract_type_monthly: bool = Field(False)
contract_type_annual: bool = Field(False)
payment_method_auto: bool = Field(False)
age_group_18_35: bool = Field(False)
age_group_36_55: bool = Field(False)
@validator("monetary_total")
def monetary_must_be_reasonable(cls, v):
if v > 1_000_000:
raise ValueError("Valore monetario non plausibile (> 1M EUR)")
return v
class PredictionResponse(BaseModel):
customer_id: str
churn_probability: float
churn_prediction: bool
confidence: str # "high", "medium", "low"
model_version: str
class BatchPredictionRequest(BaseModel):
customers: List[CustomerFeatures]
threshold: Optional[float] = Field(0.5, ge=0.0, le=1.0)
# ---- Global Model State ----
model_state: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
logger.info("Caricamento modello da MLflow registry...")
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://mlflow:5000"))
model_name = os.getenv("MODEL_NAME", "telecomit-churn-xgb")
model_alias = os.getenv("MODEL_ALIAS", "champion")
try:
model_uri = f"models:/{model_name}@{model_alias}"
model_state["model"] = mlflow.xgboost.load_model(model_uri)
model_state["version"] = os.getenv("MODEL_VERSION", "unknown")
model_state["run_id"] = os.getenv("MLFLOW_RUN_ID", "unknown")
MODEL_VERSION_INFO.labels(
version=model_state["version"],
run_id=model_state["run_id"]
).set(1)
logger.info(f"Modello caricato: {model_name} @ {model_alias} v{model_state['version']}")
except Exception as e:
logger.error(f"Errore caricamento modello: {e}")
raise
yield # Server in esecuzione
logger.info("Shutdown: deallocazione modello...")
model_state.clear()
# ---- FastAPI App ----
app = FastAPI(
title="TelecomIT Churn Prediction API",
description="API per predizione churn clienti - TelecomIT S.p.A.",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://crm.telecomit.internal"],
allow_methods=["POST", "GET"],
allow_headers=["*"],
)
def features_to_dataframe(customer: CustomerFeatures) -> pd.DataFrame:
"""Converte un oggetto CustomerFeatures in DataFrame per il modello."""
data = customer.dict(exclude={"customer_id"})
return pd.DataFrame([data])
def get_confidence(probability: float) -> str:
"""Classifica la confidenza della predizione."""
if probability >= 0.75 or probability <= 0.25:
return "high"
elif probability >= 0.60 or probability <= 0.40:
return "medium"
return "low"
@app.get("/health")
async def health_check():
"""Health check per Kubernetes liveness/readiness probe."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non caricato")
return {
"status": "healthy",
"model_version": model_state.get("version", "unknown"),
"model_loaded": True
}
@app.get("/metrics")
async def metrics():
"""Endpoint Prometheus metrics."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.post("/predict", response_model=PredictionResponse)
async def predict_single(customer: CustomerFeatures):
"""Predizione real-time per singolo cliente (<200ms p99)."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non disponibile")
start_time = time.time()
try:
X = features_to_dataframe(customer)
proba = float(model_state["model"].predict_proba(X)[0, 1])
prediction = proba >= 0.5
PREDICTIONS_TOTAL.labels(mode="realtime", model_version=model_state["version"]).inc()
CHURN_PROBABILITY_GAUGE.set(proba)
return PredictionResponse(
customer_id=customer.customer_id,
churn_probability=round(proba, 4),
churn_prediction=bool(prediction),
confidence=get_confidence(proba),
model_version=model_state.get("version", "unknown")
)
except Exception as e:
logger.error(f"Errore predizione per customer {customer.customer_id}: {e}")
raise HTTPException(status_code=500, detail=f"Errore interno: {str(e)}")
finally:
duration = time.time() - start_time
PREDICTION_DURATION.labels(mode="realtime").observe(duration)
@app.post("/predict/batch")
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch - ottimizzata per alti volumi (2.1M clienti/notte)."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non disponibile")
start_time = time.time()
n_customers = len(request.customers)
try:
# Converti in DataFrame vettorizzato (molto più efficiente del loop)
rows = [c.dict(exclude={"customer_id"}) for c in request.customers]
customer_ids = [c.customer_id for c in request.customers]
X = pd.DataFrame(rows)
probas = model_state["model"].predict_proba(X)[:, 1]
predictions = (probas >= request.threshold).astype(bool)
results = [
{
"customer_id": cid,
"churn_probability": round(float(p), 4),
"churn_prediction": bool(pred),
"confidence": get_confidence(float(p))
}
for cid, p, pred in zip(customer_ids, probas, predictions)
]
PREDICTIONS_TOTAL.labels(mode="batch", model_version=model_state["version"]).inc(n_customers)
CHURN_PROBABILITY_GAUGE.set(float(probas.mean()))
duration = time.time() - start_time
PREDICTION_DURATION.labels(mode="batch").observe(duration)
return {
"results": results,
"total_customers": n_customers,
"churn_count": int(predictions.sum()),
"churn_rate": round(float(predictions.mean()), 4),
"processing_time_seconds": round(duration, 3),
"model_version": model_state.get("version", "unknown")
}
except Exception as e:
logger.error(f"Errore batch prediction: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080, workers=4)
モニタリング: ドリフトの検出と警告
モニタリングはインフラストラクチャのメトリクス (レイテンシー、稼働時間) に限定されません。モニタリングには次のものが含まれる必要があります。 時間の経過に伴う予測の品質。私たちが使用するのは 明らかにAI のために ドリフト検出 プロメテウス + グラファナ リアルタイム監視用。 重要なコンポーネントは、モデルが劣化したときの自動再トレーニング トリガーです。
# drift_detector.py
# Rilevamento drift su dati e predizioni con Evidently AI
# Eseguito ogni settimana via cron job o GitHub Actions scheduled
import json
import logging
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from pathlib import Path
from typing import Optional
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import (
DatasetDriftMetric,
DatasetMissingValuesSummary,
ColumnDriftMetric
)
logger = logging.getLogger(__name__)
class ChurnDriftDetector:
"""
Monitora il drift nei dati e nelle predizioni del modello churn.
Confronta la distribuzione dei dati di produzione con il reference dataset
(i dati su cui il modello e stato addestrato).
"""
def __init__(
self,
reference_path: str = "data/features/feature_matrix_reference.parquet",
drift_threshold: float = 0.2,
alert_webhook: Optional[str] = None
):
self.reference_df = pd.read_parquet(reference_path)
self.drift_threshold = drift_threshold
self.alert_webhook = alert_webhook
def analyze_drift(
self,
current_df: pd.DataFrame,
analysis_date: datetime = None
) -> dict:
"""
Esegue analisi completa di drift sul dataset corrente.
Args:
current_df: Dati di produzione del periodo corrente
analysis_date: Data di riferimento dell'analisi
Returns:
dict con risultati drift e flag retraining_required
"""
analysis_date = analysis_date or datetime.utcnow()
# ---- Report Evidently ----
report = Report(metrics=[
DatasetDriftMetric(),
DatasetMissingValuesSummary(),
# Monitoriamo le feature più predittive
ColumnDriftMetric(column_name="recency_days"),
ColumnDriftMetric(column_name="frustration_index"),
ColumnDriftMetric(column_name="activity_trend"),
ColumnDriftMetric(column_name="data_usage_gb_30d"),
])
report.run(
reference_data=self.reference_df,
current_data=current_df
)
report_dict = report.as_dict()
drift_results = report_dict["metrics"][0]["result"]
dataset_drift_detected = drift_results["dataset_drift"]
drift_share = drift_results["drift_share"]
n_drifted_features = drift_results["number_of_drifted_columns"]
# ---- Analisi predizioni (churn rate trend) ----
ref_churn_rate = self.reference_df["churned"].mean() if "churned" in self.reference_df.columns else None
curr_churn_rate = current_df["churn_prediction"].mean() if "churn_prediction" in current_df.columns else None
churn_rate_drift = None
if ref_churn_rate and curr_churn_rate:
churn_rate_drift = abs(curr_churn_rate - ref_churn_rate) / ref_churn_rate
# ---- Decision logic per retraining ----
retraining_required = (
dataset_drift_detected and drift_share > self.drift_threshold
) or (
churn_rate_drift is not None and churn_rate_drift > 0.30
)
results = {
"analysis_date": analysis_date.isoformat(),
"dataset_drift_detected": bool(dataset_drift_detected),
"drift_share": float(drift_share),
"n_drifted_features": int(n_drifted_features),
"drift_threshold": self.drift_threshold,
"reference_churn_rate": float(ref_churn_rate) if ref_churn_rate else None,
"current_churn_rate": float(curr_churn_rate) if curr_churn_rate else None,
"churn_rate_drift_pct": float(churn_rate_drift * 100) if churn_rate_drift else None,
"retraining_required": bool(retraining_required),
"severity": self._compute_severity(drift_share, churn_rate_drift)
}
# Salva report HTML
report_path = f"monitoring/drift_report_{analysis_date.strftime('%Y%m%d')}.html"
Path("monitoring").mkdir(exist_ok=True)
report.save_html(report_path)
logger.info(f"Report drift salvato: {report_path}")
if retraining_required:
self._trigger_retraining_alert(results)
return results
def _compute_severity(self, drift_share: float, churn_rate_drift: Optional[float]) -> str:
if drift_share > 0.5 or (churn_rate_drift and churn_rate_drift > 0.5):
return "critical"
elif drift_share > 0.3 or (churn_rate_drift and churn_rate_drift > 0.3):
return "high"
elif drift_share > self.drift_threshold:
return "medium"
return "low"
def _trigger_retraining_alert(self, results: dict):
"""Notifica il team e trigghera il retraining via GitHub Actions dispatch."""
logger.warning(f"DRIFT CRITICO rilevato - Retraining richiesto! {results}")
if self.alert_webhook:
import requests
payload = {
"event_type": "retrain-trigger",
"client_payload": {
"reason": "drift_detected",
"drift_share": results["drift_share"],
"analysis_date": results["analysis_date"]
}
}
resp = requests.post(
self.alert_webhook,
json=payload,
headers={"Authorization": f"token ${GITHUB_TOKEN}"}
)
logger.info(f"GitHub Actions dispatch: HTTP {resp.status_code}")
CI/CD パイプライン: GitHub アクションの完了
GitHub Actions パイプラインは、データ検証からすべてのコンポーネントを統合します。 トレーニングから評価、Kubernetes へのデプロイまで。ワークフローが設計されている メトリクスが満たさない場合にデプロイメントをブロックすることで迅速に失敗する 定義されたしきい値を超えています。
name: ML Training Pipeline - TelecomIT Churn
on:
schedule:
- cron: '0 3 1 * *' # Ogni 1° del mese alle 03:00 UTC
workflow_dispatch:
inputs:
reason:
description: 'Motivo del retraining manuale'
required: false
default: 'manual'
force_deploy:
description: 'Forza deployment anche se metriche non migliorano'
type: boolean
default: false
repository_dispatch:
types: [retrain-trigger]
env:
PYTHON_VERSION: "3.11"
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
DVC_REMOTE_URL: ${{ secrets.DVC_REMOTE_URL }}
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/churn-serving
jobs:
# ---- JOB 1: Data Validation ----
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Configure DVC remote
run: |
dvc remote modify myremote access_key_id ${{ secrets.DVC_ACCESS_KEY }}
dvc remote modify myremote secret_access_key ${{ secrets.DVC_SECRET_KEY }}
- name: Pull latest data
run: dvc pull data/raw/
- name: Run data validation
run: dvc repro data_validation
id: validation
- name: Check validation passed
run: |
RESULT=$(cat data/raw/validation_summary.json | python -c "import sys,json; d=json.load(sys.stdin); sys.exit(0 if d['all_checks_passed'] else 1)")
echo "Validation result: $RESULT"
- name: Upload validation report
uses: actions/upload-artifact@v4
with:
name: validation-report
path: data/raw/validation_report.html
# ---- JOB 2: Training ----
training:
runs-on: ubuntu-latest
needs: data-validation
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Configure DVC and pull features
run: |
dvc remote modify myremote access_key_id ${{ secrets.DVC_ACCESS_KEY }}
dvc remote modify myremote secret_access_key ${{ secrets.DVC_SECRET_KEY }}
dvc pull data/features/
- name: Run preprocessing + feature engineering
run: dvc repro preprocessing feature_engineering
- name: Run training
run: dvc repro training
env:
MLFLOW_TRACKING_URI: ${{ env.MLFLOW_TRACKING_URI }}
- name: Push artifacts to DVC remote
run: dvc push models/
- name: Output metrics
id: metrics
run: |
AUC=$(cat models/registry/metrics.json | python -c "import sys,json; print(json.load(sys.stdin)['auc_roc'])")
RECALL=$(cat models/registry/metrics.json | python -c "import sys,json; print(json.load(sys.stdin)['recall'])")
echo "auc_roc=$AUC" >> $GITHUB_OUTPUT
echo "recall=$RECALL" >> $GITHUB_OUTPUT
echo "Training completato - AUC: $AUC, Recall: $RECALL"
outputs:
auc_roc: ${{ steps.metrics.outputs.auc_roc }}
recall: ${{ steps.metrics.outputs.recall }}
# ---- JOB 3: Evaluation Gate ----
evaluation-gate:
runs-on: ubuntu-latest
needs: training
steps:
- name: Check metrics threshold
run: |
AUC=${{ needs.training.outputs.auc_roc }}
RECALL=${{ needs.training.outputs.recall }}
MIN_AUC=0.80
MIN_RECALL=0.65
echo "AUC: $AUC (min: $MIN_AUC)"
echo "Recall: $RECALL (min: $MIN_RECALL)"
python -c "
auc = float('$AUC')
recall = float('$RECALL')
if auc < $MIN_AUC:
print(f'FAIL: AUC {auc:.4f} sotto soglia {$MIN_AUC}')
exit(1)
if recall < $MIN_RECALL:
print(f'FAIL: Recall {recall:.4f} sotto soglia {$MIN_RECALL}')
exit(1)
print('PASS: Metriche sopra soglia - deployment approvato')
"
# ---- JOB 4: Build & Push Docker ----
build-push:
runs-on: ubuntu-latest
needs: evaluation-gate
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push serving image
uses: docker/build-push-action@v5
with:
context: .
file: docker/Dockerfile.serving
push: true
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ---- JOB 5: Deploy to Kubernetes ----
deploy:
runs-on: ubuntu-latest
needs: build-push
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
kubeconfig: ${{ secrets.KUBECONFIG }}
- name: Deploy to Kubernetes (rolling update)
run: |
kubectl set image deployment/churn-serving \
churn-api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
-n ml-production
kubectl rollout status deployment/churn-serving -n ml-production --timeout=300s
- name: Run smoke tests
run: |
python tests/smoke/test_api_smoke.py \
--endpoint ${{ secrets.API_ENDPOINT }}
- name: Promote model to champion in MLflow
run: |
python -c "
import mlflow
client = mlflow.MlflowClient('${{ env.MLFLOW_TRACKING_URI }}')
# Recupera l'ultima versione Staging
versions = client.get_latest_versions('telecomit-churn-xgb', stages=['Staging'])
if versions:
client.transition_model_version_stage(
name='telecomit-churn-xgb',
version=versions[0].version,
stage='Production',
archive_existing_versions=True
)
print(f'Modello v{versions[0].version} promosso a Production')
"
結果、指標、ROI
TelecomIT チャーン予測システムは、実稼働環境で 6 か月間稼働した後、 測定可能で文書化された結果が得られました。以下のデータは代表的なものです 中規模の電気通信会社における実際のシナリオを説明します。
| メトリック | ターゲット | 結果 M1 | 結果 M6 | トレンド |
|---|---|---|---|---|
| AUC-ROC | ≧ 0.80 | 0.823 | 0.841 | + |
| リコール(チャーン) | ≧ 0.65 | 0.703 | 0.718 | + |
| 精度 (チャーン) | ≧ 0.60 | 0.634 | 0.672 | + |
| p99 リアルタイム レイテンシー | < 200ms | 87ミリ秒 | 82ミリ秒 | + |
| バッチのスループット | 2.1M で 4 時間未満 | 2時間47分で2.1メートル | 2時間31分で2.1メートル | + |
| アップタイムAPI | ≥ 99.9% | 99.94% | 99.97% | + |
| ドリフト警報イベント | - | 2 | 1 (M3、修正済み) | + |
実際のインフラストラクチャコスト
| 成分 | 月額費用 | 年間コスト | 注意事項 |
|---|---|---|---|
| VPS Hetzner (2 コア、4GB) - サービス提供 | 5.83ユーロ | 70ユーロ | FastAPI + k3s |
| VPS Hetzner (4 コア、8GB) - MLflow + MinIO | 15.90ユーロ | 191ユーロ | 追跡+保管 |
| VPS Hetzner (2 コア、4GB) - モニタリング | 5.83ユーロ | 70ユーロ | プロメテウス + グラファナ |
| GitHub アクション (無料枠) | 0ユーロ | 0ユーロ | 2000分/月無料 |
| バックアップストレージ (Hetzner Object) | 2.50ユーロ | 30ユーロ | DVCリモートストレージ |
| 合計 | 30.06ユーロ | 361ユーロ | 年間5,000ユーロ未満 |
MLOps プロジェクトの ROI
初年度の総コスト (開発 + インフラストラクチャ): ~45,000 ユーロ (7 か月の開発者 + インフラストラクチャ)。 推定年間純節約額: 720 万ユーロ。 12 か月の ROI: ~158 倍。 回収期間は 3 週間でした。
学んだ教訓と回避すべきアンチパターン
アンチパターン 1: 「最初にモデル化して、MLOps は後でしましょう」
チームの 60% はテンプレートから開始し、本番環境に移行するときにのみインフラストラクチャを追加します。 このアプローチでは、リファクタリングのコストが 2 倍になります。 解決策: DVCを設定し、 MLflow と、プレースホルダーを含む、初日からの最小限の CI/CD パイプライン。限界費用 低ければ、その利点は非常に大きいです。
アンチパターン 2: インフラストラクチャ監視のみ
ML システムでは、遅延と稼働時間を監視するだけでは十分ではありません。モデルは次のとおりです 技術的には「健全」 (50 ミリ秒で応答、HTTP 500 エラーなし) ですが、予測が生成されます。 データドリフトにより劣化しました。 解決策: 常に配信を監視する システム メトリクスだけでなく、モデルの入力と出力も含めます。
アンチパターン 3: ブラインド再トレーニング
すべての再トレーニングでモデルが改善されるわけではありません。トリガーが適切に調整されていない場合、次のような危険があります。 実稼働環境では、より悪いモデルが作成されることになります。 解決策: ごとに 再トレーニングは、挑戦者とチャンピオンを明確に比較して評価ゲートを通過する必要があります 導入前。自動ロールバックは常に利用可能である必要があります。
導入前の最終チェックリスト
## Checklist Deploy Modello ML - TelecomIT Churn
### qualità del Modello
- [ ] AUC-ROC test set >= 0.80
- [ ] Recall test set >= 0.65
- [ ] Performance >= champion model attuale
- [ ] Nessuna feature con importanza anomala (segnale data leakage)
- [ ] Test su holdout temporale (dati fuori dal training period)
### Governance
- [ ] Model card generata e approvata da ML Lead
- [ ] Fairness check eseguito (demographic parity < 0.10)
- [ ] SHAP analysis disponibile per top-10 feature
- [ ] Audit trail registrato in MLflow con annotazioni
### Infrastruttura
- [ ] Dockerfile building senza warning di sicurezza
- [ ] Health check /health risponde 200
- [ ] Smoke test /predict con payload reale
- [ ] Latenza p99 < 200ms verificata con k6 o locust
- [ ] Rollback plan documentato e testato
### Monitoring
- [ ] Dashboard Grafana aggiornata con versione modello
- [ ] Alert Prometheus configurati per nuova versione
- [ ] Reference dataset Evidently aggiornato
- [ ] On-call aggiornato sul deploy
### Documenti
- [ ] JIRA ticket di deployment chiuso con evidence
- [ ] Post-deploy review schedulata a T+7 giorni
結論: MLOps はプロジェクトではなく、実践です
このケーススタディでは、チャーン予測のための完全な MLOps システムを構築しました。 ビジネス上の問題から監視を伴う Kubernetes へのデプロイメントまで、 ドリフトの検出とガバナンス。システムはすべての要件を満たしています: 遅延 <200ms リアルタイム、バッチ処理で 3 時間未満で 210 万人の顧客、AUC-ROC 0.84、コスト 年間 361 ユーロのインフラストラクチャ。
このシリーズの重要なメッセージは次のとおりです。 MLOps はテクノロジーではありません。 そして規律。一度インストールすればすべてが解決するツールはありません 問題。そして、ML モデルを可能にする一連の実践、プロセス、文化 時間が経っても確実に機能します。この中で書いたコードは シリーズは出発点です。実際の仕事は観察、測定、学習です。 そして継続的に改善します。
継続するリソース
- この完全なシリーズ: MLOps: 実験から実稼働へ → CI/CD パイプライン → DVC のバージョン管理 → MLフロー → ドリフト検出 → FastAPI の提供 → Kubernetes → A/B テスト → ガバナンス
- 高度な深層学習: 高度な深層学習シリーズ - LoRA、量子化、エッジAI
- AIエンジニアリング: AIエンジニアリングシリーズ - RAG、ベクトルデータベース、LangChain
- GitHub リポジトリ: このケーススタディの完全なコードは利用可能です github.com/federicolo/telecomit-churn-mlops で (実際のデータではなく構造)
MLOps 市場は、2026 年の 43 億 8000 万ドルから 2035 年には 891 億 8000 万ドルに成長すると予想されます (CAGR 39.8%)。 このシリーズで習得したスキルにより、あなたは優れた立場に立つことができます この成長に対処するために。幸せな MLOps。







