Case Study: MLOps in Production - From Zero to Full Pipeline
We have covered the entire MLOps series: from foundational concepts to CI/CD pipelines with GitHub Actions and Docker, data versioning with DVC, experiment tracking with MLflow, drift detection, FastAPI serving, Kubernetes scaling, A/B testing, and AI governance. Now it is time to bring everything together in a concrete, real-world scenario.
In this case study, we will build a complete churn prediction system for a fictional telecommunications company (TelecomIT Ltd.) that applies all the MLOps principles learned throughout this series. We will start from the business problem, design the end-to-end architecture, implement every component with working Python code, and explore how to monitor and maintain the system in production over time. The final result is a complete, reproducible MLOps pipeline ready for real-world deployment.
What You Will Learn
- How to translate a business problem into an end-to-end MLOps architecture
- Feature engineering for churn prediction: RFM and behavioral signals
- Complete DVC pipeline: data ingestion, preprocessing, training, evaluation
- Structured experiment tracking with MLflow and systematic model comparison
- Model serving with containerized FastAPI + Uvicorn, Kubernetes-ready
- Real-time monitoring with Prometheus + Grafana: drift, latency, business KPIs
- Statistically rigorous A/B testing between challenger and champion models
- Full governance: model cards, audit trails, pre-deploy fairness checks
- Realistic cost estimation and ROI calculation for a mid-size company
The Business Problem: Churn in Telecom Services
TelecomIT Ltd. is a telecommunications operator with 2.1 million active customers. The monthly churn rate stands at 2.3%, meaning roughly 48,000 customers are lost every month. The average cost to acquire a new customer is EUR 180, while the cost to retain an existing one is EUR 35. Identifying at-risk customers before they leave is one of the most powerful economic levers available.
The Economic Value of the Project
With a model that correctly identifies 70% of at-risk churners (recall = 0.70) with 65% precision (35% false positives, customers contacted unnecessarily), out of 48,000 monthly churners:
- Identified churners: 48,000 × 0.70 = 33,600
- Retention campaign cost: 33,600 / 0.65 × EUR 35 = ~EUR 1.8M/month
- Avoided acquisition cost: 33,600 × 0.40 × EUR 180 = ~EUR 2.4M/month
- Estimated net ROI: +EUR 600K/month, or EUR 7.2M/year
The MLOps infrastructure cost for this pipeline: approximately EUR 3,500/year on cloud.
Technical and Business Requirements
Before writing code, we define concrete requirements that will guide every architectural decision. These requirements emerge from conversations with the business team and the existing technical context.
| Requirement | Specification | Constraint |
|---|---|---|
| Prediction frequency | Monthly batch, scoring all active customers | Ready by the 1st of each month for CRM campaign |
| Batch scoring latency | 2.1M customers in under 4 hours | CRM time slot: 02:00-06:00 UTC |
| Real-time latency | Single-customer scoring for call center agents | < 200ms p99 |
| Target metric | AUC-ROC ≥ 0.80, Recall ≥ 0.65 | Validated on monthly holdout set |
| Retraining | Monthly, automatic on new data | Also triggered by drift detection |
| Governance | Model card, audit trail, fairness check | AI Act: limited-risk (telecom sector) |
| Stack | 100% open-source, on-premise + hybrid cloud | Infrastructure budget: < EUR 5,000/year |
End-to-End Architecture
The system architecture integrates all tools from the MLOps series into a coherent workflow. Each component has a precise role and communicates through well-defined interfaces. The guiding principle is separation of concerns: data, training, and serving live in independent layers that can be updated without affecting each other.
+------------------+ +------------------+ +-------------------+
| DATA SOURCES | | DATA PIPELINE | | TRAINING LAYER |
| | | | | |
| - CRM Database |-->| - DVC Pipeline |-->| - MLflow Tracking |
| - CDR (Call Data)| | - Great Expects | | - XGBoost/RF/LGB |
| - Billing System | | - Feature Store | | - Hyperopt Tuning |
| - App Usage Logs | | - DVC Remote | | - Model Registry |
+------------------+ +------------------+ +-------------------+
|
(model promoted)
|
+------------------+ +------------------+ +-------------------+
| MONITORING | | SERVING | | CI/CD PIPELINE |
| | | | | |
| - Prometheus |<--| - FastAPI REST |<--| - GitHub Actions |
| - Grafana Dash | | - Batch Scorer | | - Docker Build |
| - Drift Alerts | | - Kubernetes | | - Auto-tests |
| - Retrain Trig. | | - Load Balancer | | - Auto-deploy |
+------------------+ +------------------+ +-------------------+
|
+------+-------+
| GOVERNANCE |
| |
| - Model Card |
| - Audit Log |
| - Fairness |
+--------------+
Full Technology Stack
| Layer | Tool | Version | Cost |
|---|---|---|---|
| Data Versioning | DVC + MinIO (S3-compatible) | DVC 3.x | Open-source |
| Experiment Tracking | MLflow self-hosted | MLflow 2.x | Open-source |
| Data Validation | Great Expectations | GX 0.18 | Open-source |
| CI/CD | GitHub Actions | - | Free tier |
| Containerization | Docker + Docker Compose | Docker 24.x | Open-source |
| Model Serving | FastAPI + Uvicorn | FastAPI 0.110 | Open-source |
| Orchestration | Kubernetes (k3s) | k3s 1.28 | Open-source |
| Monitoring | Prometheus + Grafana | Prom 2.47 | Open-source |
| Drift Detection | Evidently AI | 0.4.x | Open-source |
| Governance/Fairness | Fairlearn + SHAP | FL 0.10 | Open-source |
| ML Framework | XGBoost + LightGBM + scikit-learn | XGB 2.0 | Open-source |
| Cloud infrastructure | Hetzner Cloud (2 core / 4GB RAM VPS) | - | ~EUR 360/year |
Repository Structure and Configuration
A well-organized repository structure is the foundation of any maintainable MLOps project. The following layout clearly separates code, configuration, tests, and documentation, following the principles of high cohesion and low coupling.
telecomit-churn/
├── .github/
│ └── workflows/
│ ├── ml-pipeline.yml # Main CI/CD pipeline
│ ├── retrain-trigger.yml # Automated retraining trigger
│ └── pr-validation.yml # PR validation workflow
├── config/
│ ├── model_config.yaml # Hyperparameters and model config
│ ├── feature_config.yaml # Feature set and preprocessing
│ └── serving_config.yaml # FastAPI configuration
├── data/
│ ├── raw/ # Raw data (DVC tracked)
│ ├── processed/ # Preprocessed data (DVC tracked)
│ └── features/ # Engineered features (DVC tracked)
├── models/
│ └── registry/ # Registered models (DVC tracked)
├── src/
│ ├── data/
│ │ ├── ingestion.py # Data extraction from CRM/CDR
│ │ ├── validation.py # Great Expectations checks
│ │ └── preprocessing.py # Cleaning and transformation
│ ├── features/
│ │ ├── rfm_features.py # RFM features (Recency, Frequency, Monetary)
│ │ ├── behavioral.py # Behavioral features
│ │ └── feature_store.py # Local feature store
│ ├── training/
│ │ ├── train.py # Training loop with MLflow
│ │ ├── evaluate.py # Evaluation and champion comparison
│ │ └── hyperopt_search.py # Hyperparameter tuning
│ ├── serving/
│ │ ├── api.py # FastAPI application
│ │ ├── batch_scorer.py # Monthly batch scoring
│ │ └── middleware.py # Logging, rate limiting, metrics
│ ├── monitoring/
│ │ ├── drift_detector.py # Evidently drift detection
│ │ ├── metrics_exporter.py # Prometheus metrics
│ │ └── alert_manager.py # Alerting and retraining trigger
│ └── governance/
│ ├── model_card.py # Model card generator
│ ├── fairness_checker.py # Fairness analysis with Fairlearn
│ └── audit_logger.py # Immutable audit trail
├── tests/
│ ├── unit/ # Unit tests for each module
│ ├── integration/ # Integration tests
│ └── smoke/ # Post-deploy smoke tests
├── dvc.yaml # DVC pipeline stages
├── params.yaml # DVC parameters
├── docker/
│ ├── Dockerfile.training # Training image
│ ├── Dockerfile.serving # Serving image
│ └── docker-compose.yml # Local full stack
├── k8s/
│ ├── deployment.yaml # Kubernetes deployment
│ ├── service.yaml # Kubernetes service
│ └── hpa.yaml # Horizontal Pod Autoscaler
└── notebooks/
└── exploration/ # EDA notebooks (not in pipeline)
Feature Engineering for Churn Prediction
Feature engineering is often the difference between a mediocre and an excellent model. For telecom churn prediction, the most predictive features come from customer behavior over time. We use the RFM framework (Recency, Frequency, Monetary) as a baseline, enriched with domain-specific behavioral signals from call records and support interactions.
# rfm_features.py
# Feature engineering for telecom churn prediction
# Produces a tabular dataset with 45 predictive features
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Tuple
import mlflow
def compute_rfm_features(
df_transactions: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Calculates RFM (Recency, Frequency, Monetary) features per customer.
Args:
df_transactions: DataFrame with transactions/top-ups
snapshot_date: Reference date for calculations
customer_id_col: Customer identifier column name
Returns:
DataFrame with one row per customer and RFM features
"""
df = df_transactions.copy()
df["transaction_date"] = pd.to_datetime(df["transaction_date"])
rfm = df.groupby(customer_id_col).agg(
recency_days=("transaction_date", lambda x: (snapshot_date - x.max()).days),
frequency=("transaction_date", "count"),
monetary_total=("amount", "sum"),
monetary_avg=("amount", "mean"),
monetary_std=("amount", "std"),
first_transaction=("transaction_date", "min"),
last_transaction=("transaction_date", "max"),
).reset_index()
# Customer tenure in days
rfm["tenure_days"] = (snapshot_date - rfm["first_transaction"]).dt.days
# Activity trend: last 30 days vs historical average
last_30_days = snapshot_date - timedelta(days=30)
recent = df[df["transaction_date"] >= last_30_days].groupby(customer_id_col).agg(
recent_frequency=("transaction_date", "count"),
recent_monetary=("amount", "sum")
).reset_index()
rfm = rfm.merge(recent, on=customer_id_col, how="left")
rfm["recent_frequency"] = rfm["recent_frequency"].fillna(0)
rfm["recent_monetary"] = rfm["recent_monetary"].fillna(0)
# Trend ratio: recent activity vs monthly historical average
avg_monthly_freq = rfm["frequency"] / (rfm["tenure_days"] / 30).clip(lower=1)
rfm["activity_trend"] = rfm["recent_frequency"] / avg_monthly_freq.clip(lower=0.01)
return rfm
def compute_behavioral_features(
df_cdr: pd.DataFrame,
df_support: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Behavioral features from CDR (call detail records) and support tickets.
These features capture telecom-specific churn signals.
"""
# ---- CDR Features: calling behavior ----
cdr_30d = df_cdr[
df_cdr["call_date"] >= snapshot_date - timedelta(days=30)
]
cdr_features = cdr_30d.groupby(customer_id_col).agg(
calls_count_30d=("call_id", "count"),
calls_duration_avg_30d=("duration_seconds", "mean"),
calls_duration_total_30d=("duration_seconds", "sum"),
data_usage_gb_30d=("data_usage_mb", lambda x: x.sum() / 1024),
international_calls_ratio=("is_international", "mean"),
peak_hour_ratio=("is_peak_hour", "mean"),
unique_contacts_30d=("called_number_hash", "nunique"),
).reset_index()
# ---- Support Features: negative interactions ----
support_90d = df_support[
df_support["ticket_date"] >= snapshot_date - timedelta(days=90)
]
support_features = support_90d.groupby(customer_id_col).agg(
support_tickets_90d=("ticket_id", "count"),
support_escalations_90d=("is_escalated", "sum"),
avg_resolution_time_h=("resolution_hours", "mean"),
complaint_ratio=("is_complaint", "mean"),
unresolved_tickets=("is_resolved", lambda x: (~x).sum()),
).reset_index()
behavioral = cdr_features.merge(support_features, on=customer_id_col, how="left")
behavioral = behavioral.fillna(0)
# Derived: "frustration index" - proxy for customer dissatisfaction
behavioral["frustration_index"] = (
behavioral["support_tickets_90d"] * 2 +
behavioral["support_escalations_90d"] * 5 +
behavioral["unresolved_tickets"] * 3
).clip(upper=20) / 20 # Normalized 0-1
return behavioral
def build_feature_matrix(
df_customers: pd.DataFrame,
df_rfm: pd.DataFrame,
df_behavioral: pd.DataFrame,
label_col: str = "churned"
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Assembles the final feature matrix by joining all data sources.
Returns:
(X, y) - feature matrix and label vector
"""
features = df_customers[[
"customer_id", "contract_type", "payment_method",
"age_group", "region", "plan_monthly_eur", label_col
]].merge(df_rfm, on="customer_id", how="left") \
.merge(df_behavioral, on="customer_id", how="left")
cat_cols = ["contract_type", "payment_method", "age_group", "region"]
features = pd.get_dummies(features, columns=cat_cols, drop_first=True)
y = features[label_col].astype(int)
X = features.drop(columns=["customer_id", label_col,
"first_transaction", "last_transaction"])
return X, y
DVC Pipeline: End-to-End Reproducibility
The DVC pipeline defines every processing stage as a directed acyclic graph (DAG). Each stage declares its inputs (deps), outputs (outs), and parameters. DVC tracks dependencies and re-runs only the stages invalidated by changes, exactly like Make but designed for ML pipelines with data and model artifacts.
stages:
# Stage 1: Ingestion - extract data from CRM/CDR
data_ingestion:
cmd: python src/data/ingestion.py
deps:
- src/data/ingestion.py
- config/feature_config.yaml
params:
- snapshot_date
- data.source_db_table
outs:
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
metrics:
- data/raw/ingestion_report.json:
cache: false
# Stage 2: Validation - Great Expectations checks
data_validation:
cmd: python src/data/validation.py
deps:
- src/data/validation.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
params:
- validation.min_rows
- validation.max_null_ratio
outs:
- data/raw/validation_report.html:
cache: false
metrics:
- data/raw/validation_summary.json:
cache: false
# Stage 3: Preprocessing - cleaning and transformation
preprocessing:
cmd: python src/data/preprocessing.py
deps:
- src/data/preprocessing.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
- config/feature_config.yaml
params:
- preprocessing.imputation_strategy
- preprocessing.outlier_threshold
outs:
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
# Stage 4: Feature Engineering - RFM + behavioral
feature_engineering:
cmd: python src/features/rfm_features.py
deps:
- src/features/rfm_features.py
- src/features/behavioral.py
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
params:
- features.lookback_days
- features.rfm_bins
outs:
- data/features/feature_matrix.parquet
- data/features/feature_names.json
# Stage 5: Training - XGBoost with MLflow tracking
training:
cmd: python src/training/train.py
deps:
- src/training/train.py
- data/features/feature_matrix.parquet
- config/model_config.yaml
params:
- model.algorithm
- model.n_estimators
- model.max_depth
- model.learning_rate
- model.subsample
- model.scale_pos_weight
- training.test_size
- training.random_seed
outs:
- models/registry/challenger_model.pkl
- models/registry/preprocessor.pkl
metrics:
- models/registry/metrics.json:
cache: false
plots:
- models/registry/confusion_matrix.json:
cache: false
- models/registry/roc_curve.json:
cache: false
# Stage 6: Evaluation - challenger vs champion comparison
evaluation:
cmd: python src/training/evaluate.py
deps:
- src/training/evaluate.py
- models/registry/challenger_model.pkl
- data/features/feature_matrix.parquet
params:
- evaluation.min_auc_roc
- evaluation.min_recall
- evaluation.promotion_strategy
outs:
- models/registry/evaluation_report.json:
cache: false
- models/registry/model_card.json:
cache: false
Training with MLflow: Complete Experiment Tracking
The training module integrates MLflow for systematic tracking of every experiment. We use XGBoost as the primary algorithm for its excellent balance between performance and interpretability on tabular data, with early stopping and Hyperopt-based hyperparameter search.
# train.py
# Training pipeline for churn prediction with MLflow tracking
# Executed by DVC pipeline: python src/training/train.py
import os
import json
import pickle
import logging
from pathlib import Path
from typing import Dict, Tuple
import mlflow
import mlflow.xgboost
import pandas as pd
import numpy as np
import xgboost as xgb
import yaml
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
roc_auc_score, f1_score, precision_score,
recall_score, accuracy_score, confusion_matrix
)
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_features(features_path: str = "data/features/feature_matrix.parquet") -> Tuple:
"""Load feature matrix and prepare stratified train/val/test splits."""
df = pd.read_parquet(features_path)
with open("params.yaml") as f:
params = yaml.safe_load(f)
test_size = params["training"]["test_size"]
seed = params["training"]["random_seed"]
label_col = "churned"
X = df.drop(columns=[label_col])
y = df[label_col].astype(int)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=seed, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_train, y_train, test_size=0.15, random_state=seed, stratify=y_train
)
logger.info(f"Train: {len(X_train)} | Val: {len(X_val)} | Test: {len(X_test)}")
logger.info(f"Churn rate - Train: {y_train.mean():.3f} | Test: {y_test.mean():.3f}")
return X_train, X_val, X_test, y_train, y_val, y_test
def train_xgboost(X_train, y_train, X_val, y_val, params: dict):
"""Train an XGBoost model with early stopping."""
model = xgb.XGBClassifier(
n_estimators=params.get("n_estimators", 500),
max_depth=params.get("max_depth", 6),
learning_rate=params.get("learning_rate", 0.05),
subsample=params.get("subsample", 0.8),
colsample_bytree=params.get("colsample_bytree", 0.8),
scale_pos_weight=params.get("scale_pos_weight", 4.0),
min_child_weight=params.get("min_child_weight", 5),
reg_alpha=params.get("reg_alpha", 0.1),
reg_lambda=params.get("reg_lambda", 1.0),
use_label_encoder=False,
eval_metric="auc",
random_state=42,
n_jobs=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
verbose=100
)
return model
def compute_metrics(model, X, y, threshold: float = 0.5) -> Dict[str, float]:
"""Compute comprehensive evaluation metrics."""
y_proba = model.predict_proba(X)[:, 1]
y_pred = (y_proba >= threshold).astype(int)
cm = confusion_matrix(y, y_pred)
return {
"auc_roc": roc_auc_score(y, y_proba),
"accuracy": accuracy_score(y, y_pred),
"precision": precision_score(y, y_pred, zero_division=0),
"recall": recall_score(y, y_pred, zero_division=0),
"f1": f1_score(y, y_pred, zero_division=0),
"tn": int(cm[0][0]),
"fp": int(cm[0][1]),
"fn": int(cm[1][0]),
"tp": int(cm[1][1]),
}
def run_training_pipeline():
"""Main training pipeline with full MLflow tracking."""
X_train, X_val, X_test, y_train, y_val, y_test = load_features()
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"))
mlflow.set_experiment("telecomit-churn-prediction")
with mlflow.start_run(
run_name=f"xgb-training-{pd.Timestamp.now().strftime('%Y%m%d-%H%M')}"
) as run:
mlflow.log_params({
"algorithm": "XGBoostClassifier",
"train_size": len(X_train),
"val_size": len(X_val),
"test_size": len(X_test),
"churn_rate_train": float(y_train.mean()),
"feature_count": X_train.shape[1],
})
# Hyperparameter search
space = {
"max_depth": hp.choice("max_depth", [4, 5, 6, 7, 8]),
"learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.2)),
"n_estimators": hp.choice("n_estimators", [200, 300, 500, 700]),
"subsample": hp.uniform("subsample", 0.6, 1.0),
"colsample_bytree": hp.uniform("colsample_bytree", 0.6, 1.0),
"scale_pos_weight": hp.uniform("scale_pos_weight", 2.0, 8.0),
}
def objective(params):
m = train_xgboost(X_train, y_train, X_val, y_val, params)
return {"loss": -compute_metrics(m, X_val, y_val)["f1"], "status": STATUS_OK}
best_params = fmin(objective, space, algo=tpe.suggest, max_evals=30, trials=Trials())
mlflow.log_params(best_params)
model = train_xgboost(X_train, y_train, X_val, y_val, best_params)
test_metrics = compute_metrics(model, X_test, y_test)
mlflow.log_metrics({f"test_{k}": v for k, v in test_metrics.items()})
mlflow.xgboost.log_model(
model,
artifact_path="model",
registered_model_name="telecomit-churn-xgb"
)
logger.info(f"AUC-ROC: {test_metrics['auc_roc']:.4f}")
logger.info(f"Recall: {test_metrics['recall']:.4f}")
# Save metrics for DVC
Path("models/registry").mkdir(parents=True, exist_ok=True)
with open("models/registry/metrics.json", "w") as f:
json.dump({**test_metrics, "run_id": run.info.run_id}, f, indent=2)
with open("models/registry/challenger_model.pkl", "wb") as f:
pickle.dump(model, f)
if __name__ == "__main__":
run_training_pipeline()
Model Serving: FastAPI for Real-Time and Batch
The serving system supports two modes: real-time for call center agent queries (single customer, <200ms) and batch for monthly scoring of the entire customer database (2.1M in under 4 hours). Both modes use the same model and preprocessing logic to ensure consistency.
# api.py - FastAPI serving for churn prediction
# Exposes /predict (single), /predict/batch, /health, /metrics
import os
import pickle
import time
import logging
from contextlib import asynccontextmanager
from typing import List, Optional
import mlflow
import pandas as pd
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, validator
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import uvicorn
logger = logging.getLogger(__name__)
# ---- Prometheus Metrics ----
PREDICTIONS_TOTAL = Counter(
"churn_predictions_total", "Total number of predictions",
["mode", "model_version"]
)
PREDICTION_DURATION = Histogram(
"churn_prediction_duration_seconds", "Prediction latency",
["mode"], buckets=[0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1.0, 5.0]
)
CHURN_PROB_GAUGE = Gauge(
"churn_prediction_avg_probability", "Average churn probability"
)
# ---- Input Schema ----
class CustomerFeatures(BaseModel):
customer_id: str
recency_days: float = Field(..., ge=0, le=3650)
frequency: int = Field(..., ge=0)
monetary_total: float = Field(..., ge=0)
tenure_days: int = Field(..., ge=0)
activity_trend: float = Field(..., ge=0)
calls_count_30d: int = Field(..., ge=0)
data_usage_gb_30d: float = Field(..., ge=0)
support_tickets_90d: int = Field(..., ge=0)
frustration_index: float = Field(..., ge=0.0, le=1.0)
plan_monthly_eur: float = Field(..., ge=0)
class PredictionResponse(BaseModel):
customer_id: str
churn_probability: float
churn_prediction: bool
confidence: str
model_version: str
class BatchRequest(BaseModel):
customers: List[CustomerFeatures]
threshold: Optional[float] = Field(0.5, ge=0.0, le=1.0)
model_state: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Load model from MLflow registry on startup."""
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://mlflow:5000"))
model_name = os.getenv("MODEL_NAME", "telecomit-churn-xgb")
model_alias = os.getenv("MODEL_ALIAS", "champion")
model_uri = f"models:/{model_name}@{model_alias}"
model_state["model"] = mlflow.xgboost.load_model(model_uri)
model_state["version"] = os.getenv("MODEL_VERSION", "unknown")
logger.info(f"Model loaded: {model_name} @ {model_alias}")
yield
model_state.clear()
app = FastAPI(
title="TelecomIT Churn Prediction API",
version="1.0.0",
lifespan=lifespan
)
def get_confidence(p: float) -> str:
if p >= 0.75 or p <= 0.25:
return "high"
elif p >= 0.60 or p <= 0.40:
return "medium"
return "low"
@app.get("/health")
async def health():
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Model not loaded")
return {"status": "healthy", "model_version": model_state["version"]}
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.post("/predict", response_model=PredictionResponse)
async def predict_single(customer: CustomerFeatures):
"""Real-time prediction for single customer (<200ms p99)."""
start = time.time()
X = pd.DataFrame([customer.dict(exclude={"customer_id"})])
proba = float(model_state["model"].predict_proba(X)[0, 1])
PREDICTIONS_TOTAL.labels(mode="realtime", model_version=model_state["version"]).inc()
PREDICTION_DURATION.labels(mode="realtime").observe(time.time() - start)
CHURN_PROB_GAUGE.set(proba)
return PredictionResponse(
customer_id=customer.customer_id,
churn_probability=round(proba, 4),
churn_prediction=proba >= 0.5,
confidence=get_confidence(proba),
model_version=model_state["version"]
)
@app.post("/predict/batch")
async def predict_batch(request: BatchRequest):
"""Batch prediction - optimized for high volume (2.1M customers/night)."""
start = time.time()
rows = [c.dict(exclude={"customer_id"}) for c in request.customers]
ids = [c.customer_id for c in request.customers]
X = pd.DataFrame(rows)
probas = model_state["model"].predict_proba(X)[:, 1]
preds = (probas >= request.threshold).astype(bool)
PREDICTIONS_TOTAL.labels(mode="batch", model_version=model_state["version"]).inc(len(ids))
return {
"results": [
{"customer_id": cid, "churn_probability": round(float(p), 4),
"churn_prediction": bool(pred), "confidence": get_confidence(float(p))}
for cid, p, pred in zip(ids, probas, preds)
],
"total": len(ids),
"churn_count": int(preds.sum()),
"churn_rate": round(float(preds.mean()), 4),
"processing_time_seconds": round(time.time() - start, 3),
"model_version": model_state["version"]
}
Monitoring: Drift Detection and Automated Alerting
Monitoring a production ML system goes far beyond infrastructure metrics like latency and uptime. A model can appear technically healthy (responding in 50ms, no HTTP 500 errors) while producing degraded predictions due to data drift. We use Evidently AI for distribution-level drift detection and Prometheus + Grafana for real-time operational monitoring.
# drift_detector.py
# Data and prediction drift monitoring with Evidently AI
# Run weekly via cron or GitHub Actions scheduled workflow
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
from evidently.metrics import DatasetDriftMetric, ColumnDriftMetric
logger = logging.getLogger(__name__)
class ChurnDriftDetector:
"""
Monitors data and prediction drift for the churn model.
Compares the production data distribution against the reference dataset
(the data the model was trained on).
"""
def __init__(
self,
reference_path: str = "data/features/feature_matrix_reference.parquet",
drift_threshold: float = 0.2,
retrain_webhook: Optional[str] = None
):
self.reference_df = pd.read_parquet(reference_path)
self.drift_threshold = drift_threshold
self.retrain_webhook = retrain_webhook
def analyze_drift(self, current_df: pd.DataFrame) -> dict:
"""
Run comprehensive drift analysis on current production data.
Triggers retraining if drift exceeds configured thresholds.
"""
report = Report(metrics=[
DatasetDriftMetric(),
ColumnDriftMetric(column_name="recency_days"),
ColumnDriftMetric(column_name="frustration_index"),
ColumnDriftMetric(column_name="activity_trend"),
ColumnDriftMetric(column_name="data_usage_gb_30d"),
])
report.run(reference_data=self.reference_df, current_data=current_df)
report_dict = report.as_dict()
drift_results = report_dict["metrics"][0]["result"]
drift_detected = drift_results["dataset_drift"]
drift_share = drift_results["drift_share"]
# Churn rate comparison
ref_churn_rate = self.reference_df.get("churned", pd.Series()).mean()
curr_churn_rate = current_df.get("churn_prediction", pd.Series()).mean()
churn_rate_drift = (
abs(curr_churn_rate - ref_churn_rate) / ref_churn_rate
if ref_churn_rate and curr_churn_rate else None
)
retraining_required = (
drift_detected and drift_share > self.drift_threshold
) or (churn_rate_drift is not None and churn_rate_drift > 0.30)
results = {
"analysis_date": datetime.utcnow().isoformat(),
"dataset_drift_detected": bool(drift_detected),
"drift_share": float(drift_share),
"drift_threshold": self.drift_threshold,
"churn_rate_drift_pct": float(churn_rate_drift * 100) if churn_rate_drift else None,
"retraining_required": bool(retraining_required),
"severity": "critical" if drift_share > 0.5 else "high" if drift_share > 0.3 else "medium"
}
# Save HTML report
Path("monitoring").mkdir(exist_ok=True)
report.save_html(f"monitoring/drift_report_{datetime.utcnow().strftime('%Y%m%d')}.html")
if retraining_required and self.retrain_webhook:
self._trigger_retraining(results)
return results
def _trigger_retraining(self, results: dict):
"""Dispatch retraining event via GitHub Actions repository_dispatch."""
import requests
requests.post(
self.retrain_webhook,
json={"event_type": "retrain-trigger", "client_payload": results},
headers={"Authorization": f"token 






