사례 연구: 프로덕션의 MLOps - 제로에서 완전한 파이프라인까지
우리는 소개부터 기본 개념, CI/CD 파이프라인까지 전체 MLOps 시리즈를 다루었습니다. GitHub Actions 및 Docker, DVC를 사용한 버전 관리, MLflow를 사용한 실험 추적, 드리프트 감지, FastAPI 제공, Kubernetes 확장, A/B 테스트 및 AI Act를 통한 거버넌스까지. 이제 이 모든 것을 실제적이고 구체적인 사례로 정리할 때입니다.
이 사례 연구에서는 처음부터 시스템을 구축하겠습니다. 이탈 예측 에 대한 모든 MLOps 원칙을 따르는 가상의 통신 회사(TelecomIT S.p.A.) 시리즈에서 배웠습니다. 우리는 비즈니스 문제부터 시작하여 엔드투엔드 아키텍처를 설계하고, 작동하는 Python 코드를 사용하여 각 구성 요소를 구현하고 모니터링하는 방법을 살펴보겠습니다. 시간이 지나도 시스템을 생산 상태로 유지합니다. 최종 결과는 MLOps 파이프라인이 됩니다. 완전하고 재현 가능하며 실제 환경에 적합합니다.
무엇을 배울 것인가
- 비즈니스 문제를 엔드투엔드 MLOps 아키텍처로 변환하는 방법
- 이탈 예측을 위한 특정 기능 엔지니어링(RFM, 시간적 동작)
- 완전한 DVC 파이프라인: 데이터, 전처리, 교육, 평가
- MLflow를 통한 구조화된 실험 추적 및 체계적인 모델 비교
- FastAPI + Uvicorn을 사용하여 컨테이너화되고 Kubernetes에 준비된 모델 제공
- Prometheus + Grafana를 사용한 실시간 모니터링: 드리프트, 대기 시간, 비즈니스 KPI
- 도전자 모델과 챔피언 모델 간의 통계적으로 엄격한 A/B 테스트
- 완전한 거버넌스: 모델 카드, 감사 추적, 배포 전 공정성 확인
- 이탈리아 SME를 위한 MLOps 프로젝트의 실제 비용 및 ROI 추정
비즈니스 문제: 통신 서비스 이탈
TelecomIT S.p.A.와 210만 명의 활성 고객을 보유한 통신 사업자입니다. 월별 이탈률은 2.3%로 매달 약 48,000명의 고객이 이탈한다는 의미입니다. 신규 고객을 확보하는 데 드는 평균 비용은 180 EUR입니다. 기존 고객 유지 및 35 EUR. 이탈 위험이 있는 고객 식별 전에 그들이 떠나는 것은 이용 가능한 가장 강력한 경제적 지렛대 중 하나입니다.
프로젝트의 경제적 가치
위험에 처한 이탈자의 70%를 정확하게 식별하는 모델 사용(재현율 = 0.70) 정밀도가 65%(오탐률 35%, 고객에게 불필요하게 연락함)를 갖고 있으며, 월간 이탈자 48,000명:
- 식별된 이탈자: 48,000 × 0.70 = 33,600
- 유지 캠페인 비용: 33,600 / 0.65 × 35 EUR = ~180만 EUR/월
- 구매 비용 절감: 33,600 × 0.40 × 180 EUR = ~240만 EUR/월
- 예상 순 ROI: +600K EUR/월 또는 720만 EUR/년
이 파이프라인의 MLOps 인프라 비용: 클라우드 기준 연간 약 3,500 EUR입니다.
기술 및 비즈니스 요구 사항
코드를 작성하기 전에 각 아키텍처 선택을 안내할 구체적인 요구 사항을 정의합니다. 이러한 요구 사항은 비즈니스 팀과의 대화 및 기술적인 상황에서 발생합니다. 기존.
| 요구 사항 | 사양 | 강제 |
|---|---|---|
| 예측 빈도 | 월별 배치, 모든 활성 고객에 대한 점수 매기기 | CRM 캠페인을 위해 매월 1일까지 |
| 일괄 채점 대기 시간 | 4시간 이내에 210만 고객 확보 | CRM 시간대 : 02:00~06:00 |
| 실시간 대기 시간 | 콜센터 상담원에 대한 단일 고객 점수 | < 200ms p99 |
| 목표 지표 | AUC-ROC ≥ 0.80, 재현율 ≥ 0.65 | 월간 홀드아웃 세트에서 검증됨 |
| 재교육 | 월별, 새로운 데이터에 대해 자동 | 드리프트 감지 시에도 트리거 |
| 통치 | 모델카드, 감사추적, 공정성 확인 | AI법: 제한된 위험(통신 부문) |
| 스택 | 100% 오픈 소스, 온프레미스 + 하이브리드 클라우드 | 인프라 예산: < 5,000 EUR/년 |
엔드투엔드 아키텍처
시스템 아키텍처는 모든 MLOps 시리즈 도구를 응집력 있는 흐름으로 통합합니다. 각 구성 요소는 정확한 역할을 가지며 잘 정의된 인터페이스를 통해 다른 구성 요소와 통신합니다. 지도 원칙은 책임의 분리: 데이터, 훈련 다른 레이어에 영향을 주지 않고 업데이트할 수 있는 독립적인 레이어에서 실시간으로 서비스를 제공합니다.
+------------------+ +------------------+ +-------------------+
| DATA SOURCES | | DATA PIPELINE | | TRAINING LAYER |
| | | | | |
| - CRM Database |-->| - DVC Pipeline |-->| - MLflow Tracking |
| - CDR (Call Data)| | - Great Expects | | - XGBoost/RF/LGB |
| - Billing System | | - Feature Store | | - Hyperopt Tuning |
| - App Usage Logs | | - DVC Remote | | - Model Registry |
+------------------+ +------------------+ +-------------------+
|
(model promoted)
|
+------------------+ +------------------+ +-------------------+
| MONITORING | | SERVING | | CI/CD PIPELINE |
| | | | | |
| - Prometheus |<--| - FastAPI REST |<--| - GitHub Actions |
| - Grafana Dash | | - Batch Scorer | | - Docker Build |
| - Drift Alerts | | - Kubernetes | | - Auto-tests |
| - Retrain Trig. | | - Load Balancer | | - Auto-deploy |
+------------------+ +------------------+ +-------------------+
|
+------+-------+
| GOVERNANCE |
| |
| - Model Card |
| - Audit Log |
| - Fairness |
+--------------+
완벽한 기술 스택
| 레이어 | 도구 | 버전 | 비용 |
|---|---|---|---|
| 데이터 버전 관리 | DVC + MinIO(S3 호환) | DVC 3.x | 오픈 소스 |
| 실험 추적 | MLflow 자체 호스팅 | MLflow 2.x | 오픈 소스 |
| 데이터 검증 | 큰 기대 | GX 0.18 | 오픈 소스 |
| CI/CD | GitHub 작업 | - | 무료 등급 |
| 컨테이너화 | 도커 + 도커 작성 | 도커 24.x | 오픈 소스 |
| 모델 제공 | FastAPI + 유비콘 | FastAPI 0.110 | 오픈 소스 |
| 관현악법 | 쿠버네티스(k3s) | k3s 1.28 | 오픈 소스 |
| 모니터링 | 프로메테우스 + 그라파나 | 프롬 2.47 | 오픈 소스 |
| 드리프트 감지 | 분명히 AI | 0.4.x | 오픈 소스 |
| 거버넌스/공정성 | 페어런 + SHAP | 플로리다 0.10 | 오픈 소스 |
| ML 프레임워크 | XGBoost + LightGBM + scikit-learn | XGB 2.0 | 오픈 소스 |
| 클라우드 인프라 | 헤츠너 클라우드(VPS 2코어 / 4GB RAM) | - | ~360 EUR/년 |
저장소 구조 및 구성
잘 구성된 저장소 구조는 모든 MLOps 프로젝트의 기초입니다. 유지 관리 가능. 다음 구조는 코드, 구성, 테스트 및 높은 응집력과 낮은 결합의 원칙을 따르는 문서화.
telecomit-churn/
├── .github/
│ └── workflows/
│ ├── ml-pipeline.yml # Pipeline CI/CD principale
│ ├── retrain-trigger.yml # Trigger retraining automatico
│ └── pr-validation.yml # Validazione PR
├── config/
│ ├── model_config.yaml # Iperparametri e configurazione modello
│ ├── feature_config.yaml # Feature set e preprocessing
│ └── serving_config.yaml # Configurazione FastAPI
├── data/
│ ├── raw/ # Dati grezzi (tracciati da DVC)
│ ├── processed/ # Dati preprocessati (tracciati da DVC)
│ └── features/ # Feature engineered (tracciati da DVC)
├── models/
│ └── registry/ # Modelli registrati (tracciati da DVC)
├── src/
│ ├── data/
│ │ ├── ingestion.py # Estrazione dati da CRM/CDR
│ │ ├── validation.py # Great Expectations checks
│ │ └── preprocessing.py # Pulizia e trasformazione
│ ├── features/
│ │ ├── rfm_features.py # Feature RFM (Recency, Frequency, Monetary)
│ │ ├── behavioral.py # Feature comportamentali
│ │ └── feature_store.py # Feature store locale
│ ├── training/
│ │ ├── train.py # Training loop con MLflow
│ │ ├── evaluate.py # Evaluation e confronto col production model
│ │ └── hyperopt_search.py # Tuning iperparametri
│ ├── serving/
│ │ ├── api.py # FastAPI app
│ │ ├── batch_scorer.py # Scoring batch mensile
│ │ └── middleware.py # Logging, rate limiting, metrics
│ ├── monitoring/
│ │ ├── drift_detector.py # Evidently drift detection
│ │ ├── metrics_exporter.py # Prometheus metrics
│ │ └── alert_manager.py # Alerting e trigger retraining
│ └── governance/
│ ├── model_card.py # Generatore model card
│ ├── fairness_checker.py # Analisi fairness con Fairlearn
│ └── audit_logger.py # Audit trail immodificabile
├── tests/
│ ├── unit/ # Test unitari per ogni modulo
│ ├── integration/ # Test integrazione pipeline
│ └── smoke/ # Smoke test API post-deploy
├── dvc.yaml # Pipeline DVC stages
├── params.yaml # Parametri DVC
├── docker/
│ ├── Dockerfile.training # Immagine training
│ ├── Dockerfile.serving # Immagine serving
│ └── docker-compose.yml # Stack locale completo
├── k8s/
│ ├── deployment.yaml # Kubernetes deployment
│ ├── service.yaml # Kubernetes service
│ └── hpa.yaml # Horizontal Pod Autoscaler
└── notebooks/
└── exploration/ # Notebook EDA (non in pipeline)
이탈 예측을 위한 특성 엔지니어링
기능 엔지니어링은 보통의 모델과 우수한 모델의 차이를 결정하는 경우가 많습니다. 통신 부문의 이탈 예측에 있어 가장 예측 가능한 특징은 다음과 같습니다. 시간에 따른 고객 행동. 우리는 프레임워크를 사용합니다. RFM (최근성, 빈도, 화폐)를 출발점으로 삼아 다양한 기능 제공 통신 도메인에 특정한 행동 특성.
# rfm_features.py
# Feature engineering per churn prediction nel settore telecom
# Produce un dataset tabellare con 45 feature predittive
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Tuple
import mlflow
def compute_rfm_features(
df_transactions: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Calcola le feature RFM (Recency, Frequency, Monetary) per ogni cliente.
Args:
df_transactions: DataFrame con transazioni/ricariche
snapshot_date: Data di riferimento per il calcolo
customer_id_col: Nome colonna identificativo cliente
Returns:
DataFrame con una riga per cliente e le feature RFM
"""
df = df_transactions.copy()
df["transaction_date"] = pd.to_datetime(df["transaction_date"])
rfm = df.groupby(customer_id_col).agg(
recency_days=("transaction_date", lambda x: (snapshot_date - x.max()).days),
frequency=("transaction_date", "count"),
monetary_total=("amount", "sum"),
monetary_avg=("amount", "mean"),
monetary_std=("amount", "std"),
first_transaction=("transaction_date", "min"),
last_transaction=("transaction_date", "max"),
).reset_index()
# Tenure in giorni (durata rapporto col cliente)
rfm["tenure_days"] = (snapshot_date - rfm["first_transaction"]).dt.days
# Variazione dell'attivita (ultimi 30 gg vs media storica)
last_30_days = snapshot_date - timedelta(days=30)
recent = df[df["transaction_date"] >= last_30_days].groupby(customer_id_col).agg(
recent_frequency=("transaction_date", "count"),
recent_monetary=("amount", "sum")
).reset_index()
rfm = rfm.merge(recent, on=customer_id_col, how="left")
rfm["recent_frequency"] = rfm["recent_frequency"].fillna(0)
rfm["recent_monetary"] = rfm["recent_monetary"].fillna(0)
# Trend: rapporto attivita recente vs attivita media mensile
avg_monthly_freq = rfm["frequency"] / (rfm["tenure_days"] / 30).clip(lower=1)
rfm["activity_trend"] = rfm["recent_frequency"] / avg_monthly_freq.clip(lower=0.01)
return rfm
def compute_behavioral_features(
df_cdr: pd.DataFrame,
df_support: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Feature comportamentali da CDR (call detail records) e ticket di supporto.
Queste feature catturano segnali predittivi di churn specifici del dominio telecom.
"""
# ---- Feature CDR: comportamento nelle chiamate ----
cdr_30d = df_cdr[
df_cdr["call_date"] >= snapshot_date - timedelta(days=30)
]
cdr_features = cdr_30d.groupby(customer_id_col).agg(
calls_count_30d=("call_id", "count"),
calls_duration_avg_30d=("duration_seconds", "mean"),
calls_duration_total_30d=("duration_seconds", "sum"),
data_usage_gb_30d=("data_usage_mb", lambda x: x.sum() / 1024),
international_calls_ratio=("is_international", "mean"),
peak_hour_ratio=("is_peak_hour", "mean"),
unique_contacts_30d=("called_number_hash", "nunique"),
).reset_index()
# ---- Feature supporto: interazioni negative ----
support_30d = df_support[
df_support["ticket_date"] >= snapshot_date - timedelta(days=90)
]
support_features = support_30d.groupby(customer_id_col).agg(
support_tickets_90d=("ticket_id", "count"),
support_escalations_90d=("is_escalated", "sum"),
avg_resolution_time_h=("resolution_hours", "mean"),
complaint_ratio=("is_complaint", "mean"),
unresolved_tickets=("is_resolved", lambda x: (~x).sum()),
).reset_index()
behavioral = cdr_features.merge(support_features, on=customer_id_col, how="left")
behavioral = behavioral.fillna(0)
# Feature derivata: "frustration index" - proxy di insoddisfazione
behavioral["frustration_index"] = (
behavioral["support_tickets_90d"] * 2 +
behavioral["support_escalations_90d"] * 5 +
behavioral["unresolved_tickets"] * 3
).clip(upper=20) / 20 # Normalizzato 0-1
return behavioral
def build_feature_matrix(
df_customers: pd.DataFrame,
df_rfm: pd.DataFrame,
df_behavioral: pd.DataFrame,
label_col: str = "churned"
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Assembla la matrice delle feature finale unendo tutte le sorgenti.
Returns:
(X, y) - feature matrix e label vector
"""
features = df_customers[[
"customer_id", "contract_type", "payment_method",
"age_group", "region", "plan_monthly_eur", label_col
]].merge(df_rfm, on="customer_id", how="left") \
.merge(df_behavioral, on="customer_id", how="left")
# Encoding categoriche
cat_cols = ["contract_type", "payment_method", "age_group", "region"]
features = pd.get_dummies(features, columns=cat_cols, drop_first=True)
y = features[label_col].astype(int)
X = features.drop(columns=["customer_id", label_col,
"first_transaction", "last_transaction"])
return X, y
DVC 파이프라인: 엔드투엔드 재현성
DVC 파이프라인은 프로세스의 각 단계를 DAG(방향성 비순환 그래프)로 정의합니다. 각 단계는 자체 입력(dep), 출력(out) 및 매개변수를 선언합니다. DVC는 종속성을 제거하고 변경으로 인해 무효화된 단계만 자동으로 다시 실행합니다. Make와 비슷하지만 데이터와 모델이 포함된 ML 파이프라인용입니다.
stages:
# Stage 1: Ingestion - estrae dati da CRM/CDR e salva snapshot
data_ingestion:
cmd: python src/data/ingestion.py
deps:
- src/data/ingestion.py
- config/feature_config.yaml
params:
- snapshot_date
- data.source_db_table
outs:
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
metrics:
- data/raw/ingestion_report.json:
cache: false
# Stage 2: Validation - Great Expectations checks sui dati grezzi
data_validation:
cmd: python src/data/validation.py
deps:
- src/data/validation.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
params:
- validation.min_rows
- validation.max_null_ratio
outs:
- data/raw/validation_report.html:
cache: false
metrics:
- data/raw/validation_summary.json:
cache: false
# Stage 3: Preprocessing - pulizia e trasformazione
preprocessing:
cmd: python src/data/preprocessing.py
deps:
- src/data/preprocessing.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
- config/feature_config.yaml
params:
- preprocessing.imputation_strategy
- preprocessing.outlier_threshold
outs:
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
# Stage 4: Feature Engineering - RFM + behavioral
feature_engineering:
cmd: python src/features/rfm_features.py
deps:
- src/features/rfm_features.py
- src/features/behavioral.py
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
params:
- features.lookback_days
- features.rfm_bins
outs:
- data/features/feature_matrix.parquet
- data/features/feature_names.json
# Stage 5: Training - XGBoost con MLflow tracking
training:
cmd: python src/training/train.py
deps:
- src/training/train.py
- data/features/feature_matrix.parquet
- config/model_config.yaml
params:
- model.algorithm
- model.n_estimators
- model.max_depth
- model.learning_rate
- model.subsample
- model.scale_pos_weight
- training.test_size
- training.random_seed
outs:
- models/registry/challenger_model.pkl
- models/registry/preprocessor.pkl
metrics:
- models/registry/metrics.json:
cache: false
plots:
- models/registry/confusion_matrix.json:
cache: false
- models/registry/roc_curve.json:
cache: false
# Stage 6: Evaluation - confronto challenger vs champion
evaluation:
cmd: python src/training/evaluate.py
deps:
- src/training/evaluate.py
- models/registry/challenger_model.pkl
- data/features/feature_matrix.parquet
params:
- evaluation.min_auc_roc
- evaluation.min_recall
- evaluation.promotion_strategy
outs:
- models/registry/evaluation_report.json:
cache: false
- models/registry/model_card.json:
cache: false
MLflow를 사용한 교육: 완벽한 실험 추적
교육 모듈은 각 실험을 체계적으로 추적하기 위해 MLflow를 통합합니다. 우리는 XGBoost 뛰어난 밸런스로 인해 주요 알고리즘으로 사용됨 조기 중지 메커니즘을 통해 표 형식 데이터의 성능과 해석 가능성 간 Hyperopt를 통한 하이퍼파라미터 검색.
# train.py
# Training pipeline per churn prediction con MLflow tracking
# Eseguito dalla pipeline DVC: python src/training/train.py
import os
import json
import pickle
import logging
from pathlib import Path
from typing import Dict, Tuple
import mlflow
import mlflow.xgboost
import pandas as pd
import numpy as np
import xgboost as xgb
import yaml
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
roc_auc_score, f1_score, precision_score,
recall_score, accuracy_score, confusion_matrix
)
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_config(config_path: str = "config/model_config.yaml") -> dict:
"""Carica configurazione modello da file YAML."""
with open(config_path, "r") as f:
return yaml.safe_load(f)
def load_features(features_path: str = "data/features/feature_matrix.parquet") -> Tuple:
"""Carica feature matrix e prepara train/val/test split."""
df = pd.read_parquet(features_path)
# Leggi i parametri DVC
with open("params.yaml") as f:
params = yaml.safe_load(f)
test_size = params["training"]["test_size"]
seed = params["training"]["random_seed"]
label_col = "churned"
X = df.drop(columns=[label_col])
y = df[label_col].astype(int)
# Stratified split per preservare il bilanciamento delle classi
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=seed, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_train, y_train, test_size=0.15, random_state=seed, stratify=y_train
)
logger.info(f"Train: {len(X_train)} | Val: {len(X_val)} | Test: {len(X_test)}")
logger.info(f"Churn rate - Train: {y_train.mean():.3f} | Test: {y_test.mean():.3f}")
return X_train, X_val, X_test, y_train, y_val, y_test
def train_xgboost(
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
params: dict
) -> xgb.XGBClassifier:
"""Addestra un modello XGBoost con early stopping."""
model = xgb.XGBClassifier(
n_estimators=params.get("n_estimators", 500),
max_depth=params.get("max_depth", 6),
learning_rate=params.get("learning_rate", 0.05),
subsample=params.get("subsample", 0.8),
colsample_bytree=params.get("colsample_bytree", 0.8),
scale_pos_weight=params.get("scale_pos_weight", 4.0), # Bilancia classi sbilanciate
min_child_weight=params.get("min_child_weight", 5),
reg_alpha=params.get("reg_alpha", 0.1),
reg_lambda=params.get("reg_lambda", 1.0),
use_label_encoder=False,
eval_metric="auc",
random_state=42,
n_jobs=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
verbose=100
)
return model
def compute_metrics(
model: xgb.XGBClassifier,
X: pd.DataFrame,
y: pd.Series,
threshold: float = 0.5
) -> Dict[str, float]:
"""Calcola metriche complete di valutazione."""
y_proba = model.predict_proba(X)[:, 1]
y_pred = (y_proba >= threshold).astype(int)
cm = confusion_matrix(y, y_pred)
return {
"auc_roc": roc_auc_score(y, y_proba),
"accuracy": accuracy_score(y, y_pred),
"precision": precision_score(y, y_pred, zero_division=0),
"recall": recall_score(y, y_pred, zero_division=0),
"f1": f1_score(y, y_pred, zero_division=0),
"tn": int(cm[0][0]),
"fp": int(cm[0][1]),
"fn": int(cm[1][0]),
"tp": int(cm[1][1]),
}
def hyperopt_search(
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
max_evals: int = 30
) -> dict:
"""Ricerca iperparametri con Hyperopt (Tree-structured Parzen Estimator)."""
space = {
"max_depth": hp.choice("max_depth", [4, 5, 6, 7, 8]),
"learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.2)),
"n_estimators": hp.choice("n_estimators", [200, 300, 500, 700]),
"subsample": hp.uniform("subsample", 0.6, 1.0),
"colsample_bytree": hp.uniform("colsample_bytree", 0.6, 1.0),
"scale_pos_weight": hp.uniform("scale_pos_weight", 2.0, 8.0),
"min_child_weight": hp.choice("min_child_weight", [3, 5, 7, 10]),
"reg_alpha": hp.loguniform("reg_alpha", np.log(0.01), np.log(1.0)),
}
def objective(params):
model = train_xgboost(X_train, y_train, X_val, y_val, params)
metrics = compute_metrics(model, X_val, y_val)
# Ottimizziamo su F1 per bilanciare precision e recall
return {"loss": -metrics["f1"], "status": STATUS_OK}
trials = Trials()
best = fmin(objective, space, algo=tpe.suggest, max_evals=max_evals, trials=trials)
return best
def run_training_pipeline():
"""Pipeline di training principale con MLflow tracking completo."""
config = load_config()
X_train, X_val, X_test, y_train, y_val, y_test = load_features()
# Configura MLflow
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"))
mlflow.set_experiment("telecomit-churn-prediction")
with mlflow.start_run(run_name=f"xgb-training-{pd.Timestamp.now().strftime('%Y%m%d-%H%M')}") as run:
# Log parametri di sistema
mlflow.log_params({
"algorithm": "XGBoostClassifier",
"train_size": len(X_train),
"val_size": len(X_val),
"test_size": len(X_test),
"churn_rate_train": float(y_train.mean()),
"feature_count": X_train.shape[1],
"hyperopt_evals": config.get("hyperopt_evals", 30),
})
# Ricerca iperparametri
logger.info("Avvio ricerca iperparametri con Hyperopt...")
best_params = hyperopt_search(X_train, y_train, X_val, y_val,
max_evals=config.get("hyperopt_evals", 30))
mlflow.log_params(best_params)
# Training finale con best params
logger.info("Training modello finale con migliori iperparametri...")
model = train_xgboost(X_train, y_train, X_val, y_val, best_params)
# Calcola e log metriche su tutti i set
val_metrics = compute_metrics(model, X_val, y_val)
test_metrics = compute_metrics(model, X_test, y_test)
mlflow.log_metrics({f"val_{k}": v for k, v in val_metrics.items()})
mlflow.log_metrics({f"test_{k}": v for k, v in test_metrics.items()})
logger.info(f"Test AUC-ROC: {test_metrics['auc_roc']:.4f}")
logger.info(f"Test Recall: {test_metrics['recall']:.4f}")
logger.info(f"Test F1: {test_metrics['f1']:.4f}")
# Log modello in MLflow model registry
mlflow.xgboost.log_model(
model,
artifact_path="model",
registered_model_name="telecomit-churn-xgb"
)
# Salva metriche per DVC
metrics_output = {
"auc_roc": test_metrics["auc_roc"],
"recall": test_metrics["recall"],
"precision": test_metrics["precision"],
"f1": test_metrics["f1"],
"run_id": run.info.run_id,
}
Path("models/registry").mkdir(parents=True, exist_ok=True)
with open("models/registry/metrics.json", "w") as f:
json.dump(metrics_output, f, indent=2)
# Salva modello per serving
with open("models/registry/challenger_model.pkl", "wb") as f:
pickle.dump(model, f)
logger.info(f"Training completato. MLflow run ID: {run.info.run_id}")
if __name__ == "__main__":
run_training_pipeline()
모델 제공: 실시간 및 일괄 처리를 위한 FastAPI
서빙 시스템은 두 가지 모드를 지원합니다. 실시간 쿼리용 콜센터 상담원 수(단일 고객, <200ms) e 일괄 에 대한 전체 고객 데이터베이스의 월간 점수(4시간 미만 210만). 두 모드 모두 일관성을 보장하기 위해 동일한 모델과 전처리 논리를 사용합니다.
# api.py
# FastAPI serving per churn prediction - real-time e batch
# Espone /predict (singolo), /predict/batch, /health, /metrics
import os
import pickle
import time
import logging
from contextlib import asynccontextmanager
from typing import List, Optional
from pathlib import Path
import mlflow
import pandas as pd
import numpy as np
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from prometheus_client import (
Counter, Histogram, Gauge,
generate_latest, CONTENT_TYPE_LATEST
)
from starlette.responses import Response
import uvicorn
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- Prometheus Metrics ----
PREDICTIONS_TOTAL = Counter(
"churn_predictions_total",
"Numero totale di predizioni",
["mode", "model_version"]
)
PREDICTION_DURATION = Histogram(
"churn_prediction_duration_seconds",
"Durata predizione in secondi",
["mode"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1.0, 5.0]
)
CHURN_PROBABILITY_GAUGE = Gauge(
"churn_prediction_avg_probability",
"Probabilità media di churn nelle ultime predizioni"
)
MODEL_VERSION_INFO = Gauge(
"model_version_info",
"Informazioni sulla versione del modello caricato",
["version", "run_id"]
)
# ---- Schema Pydantic ----
class CustomerFeatures(BaseModel):
"""Feature di un singolo cliente per la predizione real-time."""
customer_id: str = Field(..., description="ID univoco del cliente")
recency_days: float = Field(..., ge=0, le=3650, description="Giorni dall'ultima transazione")
frequency: int = Field(..., ge=0, description="Numero di transazioni storiche")
monetary_total: float = Field(..., ge=0, description="Valore monetario totale EUR")
monetary_avg: float = Field(..., ge=0)
tenure_days: int = Field(..., ge=0)
activity_trend: float = Field(..., ge=0)
calls_count_30d: int = Field(..., ge=0)
data_usage_gb_30d: float = Field(..., ge=0)
support_tickets_90d: int = Field(..., ge=0)
frustration_index: float = Field(..., ge=0.0, le=1.0)
plan_monthly_eur: float = Field(..., ge=0)
contract_type_monthly: bool = Field(False)
contract_type_annual: bool = Field(False)
payment_method_auto: bool = Field(False)
age_group_18_35: bool = Field(False)
age_group_36_55: bool = Field(False)
@validator("monetary_total")
def monetary_must_be_reasonable(cls, v):
if v > 1_000_000:
raise ValueError("Valore monetario non plausibile (> 1M EUR)")
return v
class PredictionResponse(BaseModel):
customer_id: str
churn_probability: float
churn_prediction: bool
confidence: str # "high", "medium", "low"
model_version: str
class BatchPredictionRequest(BaseModel):
customers: List[CustomerFeatures]
threshold: Optional[float] = Field(0.5, ge=0.0, le=1.0)
# ---- Global Model State ----
model_state: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
logger.info("Caricamento modello da MLflow registry...")
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://mlflow:5000"))
model_name = os.getenv("MODEL_NAME", "telecomit-churn-xgb")
model_alias = os.getenv("MODEL_ALIAS", "champion")
try:
model_uri = f"models:/{model_name}@{model_alias}"
model_state["model"] = mlflow.xgboost.load_model(model_uri)
model_state["version"] = os.getenv("MODEL_VERSION", "unknown")
model_state["run_id"] = os.getenv("MLFLOW_RUN_ID", "unknown")
MODEL_VERSION_INFO.labels(
version=model_state["version"],
run_id=model_state["run_id"]
).set(1)
logger.info(f"Modello caricato: {model_name} @ {model_alias} v{model_state['version']}")
except Exception as e:
logger.error(f"Errore caricamento modello: {e}")
raise
yield # Server in esecuzione
logger.info("Shutdown: deallocazione modello...")
model_state.clear()
# ---- FastAPI App ----
app = FastAPI(
title="TelecomIT Churn Prediction API",
description="API per predizione churn clienti - TelecomIT S.p.A.",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://crm.telecomit.internal"],
allow_methods=["POST", "GET"],
allow_headers=["*"],
)
def features_to_dataframe(customer: CustomerFeatures) -> pd.DataFrame:
"""Converte un oggetto CustomerFeatures in DataFrame per il modello."""
data = customer.dict(exclude={"customer_id"})
return pd.DataFrame([data])
def get_confidence(probability: float) -> str:
"""Classifica la confidenza della predizione."""
if probability >= 0.75 or probability <= 0.25:
return "high"
elif probability >= 0.60 or probability <= 0.40:
return "medium"
return "low"
@app.get("/health")
async def health_check():
"""Health check per Kubernetes liveness/readiness probe."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non caricato")
return {
"status": "healthy",
"model_version": model_state.get("version", "unknown"),
"model_loaded": True
}
@app.get("/metrics")
async def metrics():
"""Endpoint Prometheus metrics."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.post("/predict", response_model=PredictionResponse)
async def predict_single(customer: CustomerFeatures):
"""Predizione real-time per singolo cliente (<200ms p99)."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non disponibile")
start_time = time.time()
try:
X = features_to_dataframe(customer)
proba = float(model_state["model"].predict_proba(X)[0, 1])
prediction = proba >= 0.5
PREDICTIONS_TOTAL.labels(mode="realtime", model_version=model_state["version"]).inc()
CHURN_PROBABILITY_GAUGE.set(proba)
return PredictionResponse(
customer_id=customer.customer_id,
churn_probability=round(proba, 4),
churn_prediction=bool(prediction),
confidence=get_confidence(proba),
model_version=model_state.get("version", "unknown")
)
except Exception as e:
logger.error(f"Errore predizione per customer {customer.customer_id}: {e}")
raise HTTPException(status_code=500, detail=f"Errore interno: {str(e)}")
finally:
duration = time.time() - start_time
PREDICTION_DURATION.labels(mode="realtime").observe(duration)
@app.post("/predict/batch")
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch - ottimizzata per alti volumi (2.1M clienti/notte)."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non disponibile")
start_time = time.time()
n_customers = len(request.customers)
try:
# Converti in DataFrame vettorizzato (molto più efficiente del loop)
rows = [c.dict(exclude={"customer_id"}) for c in request.customers]
customer_ids = [c.customer_id for c in request.customers]
X = pd.DataFrame(rows)
probas = model_state["model"].predict_proba(X)[:, 1]
predictions = (probas >= request.threshold).astype(bool)
results = [
{
"customer_id": cid,
"churn_probability": round(float(p), 4),
"churn_prediction": bool(pred),
"confidence": get_confidence(float(p))
}
for cid, p, pred in zip(customer_ids, probas, predictions)
]
PREDICTIONS_TOTAL.labels(mode="batch", model_version=model_state["version"]).inc(n_customers)
CHURN_PROBABILITY_GAUGE.set(float(probas.mean()))
duration = time.time() - start_time
PREDICTION_DURATION.labels(mode="batch").observe(duration)
return {
"results": results,
"total_customers": n_customers,
"churn_count": int(predictions.sum()),
"churn_rate": round(float(predictions.mean()), 4),
"processing_time_seconds": round(duration, 3),
"model_version": model_state.get("version", "unknown")
}
except Exception as e:
logger.error(f"Errore batch prediction: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080, workers=4)
모니터링: 드리프트 감지 및 경고
모니터링은 인프라 지표(지연 시간, 가동 시간)에만 국한되지 않고 다음을 포함해야 합니다. 시간 경과에 따른 예측의 품질. 우리는 분명히 AI 에 대한 드리프트 감지 e 프로메테우스 + 그라파나 실시간 모니터링을 위해 중요한 구성 요소는 모델 성능이 저하될 때 자동 재훈련 트리거입니다.
# drift_detector.py
# Rilevamento drift su dati e predizioni con Evidently AI
# Eseguito ogni settimana via cron job o GitHub Actions scheduled
import json
import logging
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from pathlib import Path
from typing import Optional
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import (
DatasetDriftMetric,
DatasetMissingValuesSummary,
ColumnDriftMetric
)
logger = logging.getLogger(__name__)
class ChurnDriftDetector:
"""
Monitora il drift nei dati e nelle predizioni del modello churn.
Confronta la distribuzione dei dati di produzione con il reference dataset
(i dati su cui il modello e stato addestrato).
"""
def __init__(
self,
reference_path: str = "data/features/feature_matrix_reference.parquet",
drift_threshold: float = 0.2,
alert_webhook: Optional[str] = None
):
self.reference_df = pd.read_parquet(reference_path)
self.drift_threshold = drift_threshold
self.alert_webhook = alert_webhook
def analyze_drift(
self,
current_df: pd.DataFrame,
analysis_date: datetime = None
) -> dict:
"""
Esegue analisi completa di drift sul dataset corrente.
Args:
current_df: Dati di produzione del periodo corrente
analysis_date: Data di riferimento dell'analisi
Returns:
dict con risultati drift e flag retraining_required
"""
analysis_date = analysis_date or datetime.utcnow()
# ---- Report Evidently ----
report = Report(metrics=[
DatasetDriftMetric(),
DatasetMissingValuesSummary(),
# Monitoriamo le feature più predittive
ColumnDriftMetric(column_name="recency_days"),
ColumnDriftMetric(column_name="frustration_index"),
ColumnDriftMetric(column_name="activity_trend"),
ColumnDriftMetric(column_name="data_usage_gb_30d"),
])
report.run(
reference_data=self.reference_df,
current_data=current_df
)
report_dict = report.as_dict()
drift_results = report_dict["metrics"][0]["result"]
dataset_drift_detected = drift_results["dataset_drift"]
drift_share = drift_results["drift_share"]
n_drifted_features = drift_results["number_of_drifted_columns"]
# ---- Analisi predizioni (churn rate trend) ----
ref_churn_rate = self.reference_df["churned"].mean() if "churned" in self.reference_df.columns else None
curr_churn_rate = current_df["churn_prediction"].mean() if "churn_prediction" in current_df.columns else None
churn_rate_drift = None
if ref_churn_rate and curr_churn_rate:
churn_rate_drift = abs(curr_churn_rate - ref_churn_rate) / ref_churn_rate
# ---- Decision logic per retraining ----
retraining_required = (
dataset_drift_detected and drift_share > self.drift_threshold
) or (
churn_rate_drift is not None and churn_rate_drift > 0.30
)
results = {
"analysis_date": analysis_date.isoformat(),
"dataset_drift_detected": bool(dataset_drift_detected),
"drift_share": float(drift_share),
"n_drifted_features": int(n_drifted_features),
"drift_threshold": self.drift_threshold,
"reference_churn_rate": float(ref_churn_rate) if ref_churn_rate else None,
"current_churn_rate": float(curr_churn_rate) if curr_churn_rate else None,
"churn_rate_drift_pct": float(churn_rate_drift * 100) if churn_rate_drift else None,
"retraining_required": bool(retraining_required),
"severity": self._compute_severity(drift_share, churn_rate_drift)
}
# Salva report HTML
report_path = f"monitoring/drift_report_{analysis_date.strftime('%Y%m%d')}.html"
Path("monitoring").mkdir(exist_ok=True)
report.save_html(report_path)
logger.info(f"Report drift salvato: {report_path}")
if retraining_required:
self._trigger_retraining_alert(results)
return results
def _compute_severity(self, drift_share: float, churn_rate_drift: Optional[float]) -> str:
if drift_share > 0.5 or (churn_rate_drift and churn_rate_drift > 0.5):
return "critical"
elif drift_share > 0.3 or (churn_rate_drift and churn_rate_drift > 0.3):
return "high"
elif drift_share > self.drift_threshold:
return "medium"
return "low"
def _trigger_retraining_alert(self, results: dict):
"""Notifica il team e trigghera il retraining via GitHub Actions dispatch."""
logger.warning(f"DRIFT CRITICO rilevato - Retraining richiesto! {results}")
if self.alert_webhook:
import requests
payload = {
"event_type": "retrain-trigger",
"client_payload": {
"reason": "drift_detected",
"drift_share": results["drift_share"],
"analysis_date": results["analysis_date"]
}
}
resp = requests.post(
self.alert_webhook,
json=payload,
headers={"Authorization": f"token ${GITHUB_TOKEN}"}
)
logger.info(f"GitHub Actions dispatch: HTTP {resp.status_code}")
CI/CD 파이프라인: GitHub 작업 완료
GitHub Actions 파이프라인은 데이터 검증부터 모든 구성요소를 통합합니다. 교육, 평가부터 Kubernetes 배포까지. 워크플로가 설계되었습니다. 측정항목이 실패할 경우 배포를 차단하여 빠르게 실패합니다. 정의된 임계값을 초과합니다.
name: ML Training Pipeline - TelecomIT Churn
on:
schedule:
- cron: '0 3 1 * *' # Ogni 1° del mese alle 03:00 UTC
workflow_dispatch:
inputs:
reason:
description: 'Motivo del retraining manuale'
required: false
default: 'manual'
force_deploy:
description: 'Forza deployment anche se metriche non migliorano'
type: boolean
default: false
repository_dispatch:
types: [retrain-trigger]
env:
PYTHON_VERSION: "3.11"
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
DVC_REMOTE_URL: ${{ secrets.DVC_REMOTE_URL }}
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/churn-serving
jobs:
# ---- JOB 1: Data Validation ----
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Configure DVC remote
run: |
dvc remote modify myremote access_key_id ${{ secrets.DVC_ACCESS_KEY }}
dvc remote modify myremote secret_access_key ${{ secrets.DVC_SECRET_KEY }}
- name: Pull latest data
run: dvc pull data/raw/
- name: Run data validation
run: dvc repro data_validation
id: validation
- name: Check validation passed
run: |
RESULT=$(cat data/raw/validation_summary.json | python -c "import sys,json; d=json.load(sys.stdin); sys.exit(0 if d['all_checks_passed'] else 1)")
echo "Validation result: $RESULT"
- name: Upload validation report
uses: actions/upload-artifact@v4
with:
name: validation-report
path: data/raw/validation_report.html
# ---- JOB 2: Training ----
training:
runs-on: ubuntu-latest
needs: data-validation
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Configure DVC and pull features
run: |
dvc remote modify myremote access_key_id ${{ secrets.DVC_ACCESS_KEY }}
dvc remote modify myremote secret_access_key ${{ secrets.DVC_SECRET_KEY }}
dvc pull data/features/
- name: Run preprocessing + feature engineering
run: dvc repro preprocessing feature_engineering
- name: Run training
run: dvc repro training
env:
MLFLOW_TRACKING_URI: ${{ env.MLFLOW_TRACKING_URI }}
- name: Push artifacts to DVC remote
run: dvc push models/
- name: Output metrics
id: metrics
run: |
AUC=$(cat models/registry/metrics.json | python -c "import sys,json; print(json.load(sys.stdin)['auc_roc'])")
RECALL=$(cat models/registry/metrics.json | python -c "import sys,json; print(json.load(sys.stdin)['recall'])")
echo "auc_roc=$AUC" >> $GITHUB_OUTPUT
echo "recall=$RECALL" >> $GITHUB_OUTPUT
echo "Training completato - AUC: $AUC, Recall: $RECALL"
outputs:
auc_roc: ${{ steps.metrics.outputs.auc_roc }}
recall: ${{ steps.metrics.outputs.recall }}
# ---- JOB 3: Evaluation Gate ----
evaluation-gate:
runs-on: ubuntu-latest
needs: training
steps:
- name: Check metrics threshold
run: |
AUC=${{ needs.training.outputs.auc_roc }}
RECALL=${{ needs.training.outputs.recall }}
MIN_AUC=0.80
MIN_RECALL=0.65
echo "AUC: $AUC (min: $MIN_AUC)"
echo "Recall: $RECALL (min: $MIN_RECALL)"
python -c "
auc = float('$AUC')
recall = float('$RECALL')
if auc < $MIN_AUC:
print(f'FAIL: AUC {auc:.4f} sotto soglia {$MIN_AUC}')
exit(1)
if recall < $MIN_RECALL:
print(f'FAIL: Recall {recall:.4f} sotto soglia {$MIN_RECALL}')
exit(1)
print('PASS: Metriche sopra soglia - deployment approvato')
"
# ---- JOB 4: Build & Push Docker ----
build-push:
runs-on: ubuntu-latest
needs: evaluation-gate
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push serving image
uses: docker/build-push-action@v5
with:
context: .
file: docker/Dockerfile.serving
push: true
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ---- JOB 5: Deploy to Kubernetes ----
deploy:
runs-on: ubuntu-latest
needs: build-push
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
kubeconfig: ${{ secrets.KUBECONFIG }}
- name: Deploy to Kubernetes (rolling update)
run: |
kubectl set image deployment/churn-serving \
churn-api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
-n ml-production
kubectl rollout status deployment/churn-serving -n ml-production --timeout=300s
- name: Run smoke tests
run: |
python tests/smoke/test_api_smoke.py \
--endpoint ${{ secrets.API_ENDPOINT }}
- name: Promote model to champion in MLflow
run: |
python -c "
import mlflow
client = mlflow.MlflowClient('${{ env.MLFLOW_TRACKING_URI }}')
# Recupera l'ultima versione Staging
versions = client.get_latest_versions('telecomit-churn-xgb', stages=['Staging'])
if versions:
client.transition_model_version_stage(
name='telecomit-churn-xgb',
version=versions[0].version,
stage='Production',
archive_existing_versions=True
)
print(f'Modello v{versions[0].version} promosso a Production')
"
결과, 지표 및 ROI
TelecomIT 이탈 예측 시스템은 6개월간 프로덕션 운영을 거쳐 측정 가능하고 문서화된 결과를 얻었습니다. 다음 데이터가 대표적이다. 중규모 통신 회사의 실제 시나리오.
| 미터법 | 목표 | 결과 M1 | 결과 M6 | 동향 |
|---|---|---|---|---|
| AUC-ROC | ≥ 0.80 | 0.823 | 0.841 | + |
| 리콜(이탈) | ≥ 0.65 | 0.703 | 0.718 | + |
| 정밀도(이탈) | ≥ 0.60 | 0.634 | 0.672 | + |
| p99 실시간 대기 시간 | < 200ms | 87ms | 82ms | + |
| 배치 처리량 | 4시간 이내에 210만 | 2시간 47분에 210만 | 2시간 31분에 210만 | + |
| 가동시간 API | ≥ 99.9% | 99.94% | 99.97% | + |
| 드리프트 경고 이벤트 | - | 2 | 1(M3, 해결됨) | + |
실제 인프라 비용
| 요소 | 월간 비용 | 연간 비용 | 메모 |
|---|---|---|---|
| VPS Hetzner(2코어, 4GB) - 제공 | 5.83유로 | 70유로 | FastAPI + k3s |
| VPS Hetzner(4코어, 8GB) - MLflow + MinIO | 15.90유로 | 191유로 | 추적 + 저장 |
| VPS Hetzner(2코어, 4GB) - 모니터링 | 5.83유로 | 70유로 | 프로메테우스 + 그라파나 |
| GitHub Actions(무료 등급) | 0유로 | 0유로 | 2000분/월 무료 |
| 백업 스토리지(Hetzner Object) | 2.50유로 | 30유로 | DVC 원격 스토리지 |
| Totale | 30.06유로 | 361유로 | 연간 5K EUR 미만 |
MLOps 프로젝트의 ROI
첫 해 총 비용(개발 + 인프라): ~45,000 EUR(7개월 개발자 + 인프라). 예상 연간 순 절감액: 720만 유로. 12개월 ROI: ~158배. 회수 기간은 3주였다.
배운 교훈과 피해야 할 안티 패턴
안티 패턴 1: “먼저 모델링하고 MLOps는 나중에”
60%의 팀은 템플릿으로 시작하고 프로덕션 단계에서만 인프라를 추가합니다. 이 접근 방식은 리팩토링 비용을 두 배로 늘립니다. 해결책: DVC 구성, MLflow 및 자리 표시자가 있는 경우에도 1일차부터 최소한의 CI/CD 파이프라인이 제공됩니다. 한계 비용 낮으면 이점이 엄청납니다.
안티 패턴 2: 인프라 모니터링만
ML 시스템에서는 지연 시간과 가동 시간만 모니터링하는 것만으로는 충분하지 않습니다. 모델은 다음과 같습니다. 기술적으로는 "정상"(50ms 내에 응답, HTTP 500 오류 없음)이지만 예측을 생성합니다. 데이터 드리프트로 인해 성능이 저하되었습니다. 해결책: 항상 배포를 모니터링하세요 시스템 측정항목뿐만 아니라 모델의 입력 및 출력에도 영향을 미칩니다.
안티 패턴 3: 맹목적인 재교육
모든 재훈련이 모델을 향상시키는 것은 아닙니다. 방아쇠가 잘못 보정되면 위험합니다. 결국 생산에서는 더 나쁜 모델이 됩니다. 해결책: 모든 재교육은 도전자와 챔피언 간의 명시적인 비교를 통해 평가 게이트를 통과해야 합니다. 배포 전. 자동 롤백은 항상 가능해야 합니다.
최종 배포 전 체크리스트
## Checklist Deploy Modello ML - TelecomIT Churn
### qualità del Modello
- [ ] AUC-ROC test set >= 0.80
- [ ] Recall test set >= 0.65
- [ ] Performance >= champion model attuale
- [ ] Nessuna feature con importanza anomala (segnale data leakage)
- [ ] Test su holdout temporale (dati fuori dal training period)
### Governance
- [ ] Model card generata e approvata da ML Lead
- [ ] Fairness check eseguito (demographic parity < 0.10)
- [ ] SHAP analysis disponibile per top-10 feature
- [ ] Audit trail registrato in MLflow con annotazioni
### Infrastruttura
- [ ] Dockerfile building senza warning di sicurezza
- [ ] Health check /health risponde 200
- [ ] Smoke test /predict con payload reale
- [ ] Latenza p99 < 200ms verificata con k6 o locust
- [ ] Rollback plan documentato e testato
### Monitoring
- [ ] Dashboard Grafana aggiornata con versione modello
- [ ] Alert Prometheus configurati per nuova versione
- [ ] Reference dataset Evidently aggiornato
- [ ] On-call aggiornato sul deploy
### Documenti
- [ ] JIRA ticket di deployment chiuso con evidence
- [ ] Post-deploy review schedulata a T+7 giorni
결론: MLOps는 프로젝트가 아니라 실천입니다.
이 사례 연구에서는 이탈 예측을 위한 완전한 MLOps 시스템을 구축했습니다. 비즈니스 문제부터 모니터링을 통해 Kubernetes에 배포하는 것까지, 드리프트 감지 및 거버넌스. 시스템은 모든 요구 사항을 충족합니다: 대기 시간 <200ms 실시간, 3시간 이내에 210만 고객 배치, AUC-ROC 0.84, 비용 인프라 비용은 연간 361 EUR입니다.
시리즈의 주요 메시지는 다음과 같습니다. MLOps는 기술이 아닙니다. 그리고 규율. 한번 설치하면 모든 문제를 해결할 수 있는 도구는 없습니다. 문제. ML 모델을 가능하게 하는 일련의 관행, 프로세스 및 문화 시간이 지나도 안정적으로 작동합니다. 우리가 작성한 코드는 다음과 같습니다. 시리즈는 출발점입니다. 실제 작업은 관찰, 측정, 학습에 있습니다. 그리고 지속적으로 개선하세요.
계속할 리소스
- 이 전체 시리즈: MLOps: 실험에서 프로덕션까지 → CI/CD 파이프라인 → DVC 버전 관리 → MLflow → 드리프트 감지 → FastAPI 제공 → 쿠버네티스 → A/B 테스트 → 통치
- 고급 딥러닝: 고급 딥러닝 시리즈 - LoRA, 양자화, 엣지 AI
- AI 엔지니어링: AI 엔지니어링 시리즈 - RAG, 벡터 데이터베이스, LangChain
- GitHub 저장소: 이 사례 연구의 전체 코드를 사용할 수 있습니다. github.com/federicocalo/telecomit-churn-mlops (실제 데이터가 아닌 구조)
MLOps 시장은 2026년 43억 8천만 달러에서 2035년 891억 8천만 달러(CAGR 39.8%)로 성장할 것입니다. 이 시리즈에서 습득한 기술로 인해 귀하는 탁월한 위치에 있게 됩니다. 이러한 성장을 해결하기 위해. 행복한 MLOps.







