アイデアから本番まで: 完全な ML パイプライン
Jupyter ノートブック上で実行される ML モデルの構築は、作業のわずか 10% です。残りの90%は それをシステムに変換することにあります 本番環境に対応: 信頼性があり、監視されており、再現可能 そしてメンテナンス可能。あ エンドツーエンドの ML パイプライン 生データからデータに至るまでのフロー全体を自動化します。 本番環境で予測し、実行ごとに一貫性と品質を保証します。この記事のビルド 実際の問題に対する完全なパイプライン: チャーン予測 (放棄の予測 お客様)。
パイプラインは、データの読み込み、前処理、特徴エンジニアリング、トレーニング、評価、 モデルの選択と展開。各フェーズはモジュール式でテスト可能な、調整されたコンポーネントです 実験追跡用の MLflow やコンテナ化用の Docker などのツールから。
この記事で学べること
- 完全な ML パイプラインのアーキテクチャ
- ケーススタディ: エンドツーエンドのチャーン予測
- MLflow による実験の追跡
- データのバージョン管理と再現性
- FastAPI を使用したモデル提供
- 本番環境でのモニタリングと再トレーニング
フェーズ 1: データのロードと検証
最初のフェーズでは、ソースからデータをロードし、その品質をチェックします。コントロールには次のものが含まれます: スキーマ 検証 (予期される列、データ型)、データ品質チェック (欠損値の割合、有効な範囲、 異常分布)、およびデータプロファイリング(記述統計)。データがチェックに失敗した場合、 パイプラインは信頼性の低いモデルを生成するのではなく、明らかなエラーで停止します。
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class DataValidationResult:
"""Risultato della validazione dati."""
is_valid: bool
errors: List[str]
warnings: List[str]
stats: Dict[str, float]
def load_and_validate(filepath: str, expected_columns: List[str]) -> tuple:
"""Carica e valida il dataset."""
errors = []
warnings = []
# Caricamento
df = pd.read_csv(filepath)
# Schema validation
missing_cols = set(expected_columns) - set(df.columns)
if missing_cols:
errors.append(f"Colonne mancanti: {missing_cols}")
# Data quality checks
null_pct = df.isnull().mean()
high_null_cols = null_pct[null_pct > 0.3].index.tolist()
if high_null_cols:
warnings.append(f"Colonne con >30% null: {high_null_cols}")
# Duplicati
n_dupes = df.duplicated().sum()
if n_dupes > 0:
warnings.append(f"{n_dupes} righe duplicate trovate")
# Statistiche
stats = {
'n_rows': len(df),
'n_cols': len(df.columns),
'null_pct_avg': null_pct.mean(),
'n_duplicates': n_dupes
}
result = DataValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
stats=stats
)
return df, result
# Uso
# df, validation = load_and_validate('data/churn.csv', expected_columns)
# if not validation.is_valid:
# raise ValueError(f"Validazione fallita: {validation.errors}")
print("Pipeline Stage 1: Data Loading & Validation - OK")
フェーズ 2-3: 前処理と特徴エンジニアリング
前処理と特徴エンジニアリングは次のようにカプセル化されています。 scikit-learn パイプライン e
カスタムトランス。カスタムトランスの拡張 BaseEstimator e
TransformerMixin パイプラインにシームレスに統合します。これにより、同じことが保証されます
変換はトレーニングと本番環境の両方に適用されるため、不一致のリスクが排除されます。
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
import numpy as np
import pandas as pd
class FeatureEngineer(BaseEstimator, TransformerMixin):
"""Custom transformer per feature engineering."""
def fit(self, X, y=None):
return self
def transform(self, X):
df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
# Feature derivate (esempio churn prediction)
if 'tenure' in df.columns and 'monthly_charges' in df.columns:
df['total_spent'] = df['tenure'] * df['monthly_charges']
df['avg_monthly_ratio'] = df['monthly_charges'] / (df['tenure'] + 1)
if 'tenure' in df.columns:
df['tenure_group'] = pd.cut(
df['tenure'], bins=[0, 12, 24, 48, 72],
labels=['new', 'developing', 'mature', 'loyal']
)
return df
def build_preprocessing_pipeline(
numeric_features: list,
categorical_features: list
) -> Pipeline:
"""Costruisce la pipeline di preprocessing."""
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(drop='first', handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
return Pipeline([
('feature_engineer', FeatureEngineer()),
('preprocessor', preprocessor)
])
print("Pipeline Stage 2-3: Preprocessing & Feature Engineering - OK")
フェーズ 4 ~ 5: トレーニング、評価、モデルの選択
トレーニング フェーズでは、複数のアルゴリズムを相互検証で体系的に比較し、 もっと良い。ザ」実験的な追跡 MLflow を使用すると、パラメータ、メトリクスが自動的に記録されます 各実験から得られるアーティファクトにより、プロセスを再現可能かつ比較可能にします。選択したモデル デプロイメント用にシリアル化され、バージョン管理されます。
from sklearn.ensemble import (
RandomForestClassifier, GradientBoostingClassifier
)
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_validate, StratifiedKFold
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
import numpy as np
from datetime import datetime
class ExperimentTracker:
"""Tracker semplificato per esperimenti ML."""
def __init__(self):
self.experiments = []
def log_experiment(self, name, params, metrics, model):
self.experiments.append({
'name': name,
'params': params,
'metrics': metrics,
'model': model,
'timestamp': datetime.now().isoformat()
})
def get_best(self, metric='f1'):
return max(self.experiments, key=lambda x: x['metrics'].get(metric, 0))
def train_and_evaluate(X, y, tracker):
"""Addestra e valuta più' modelli."""
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scoring = {
'accuracy': 'accuracy',
'f1': 'f1',
'roc_auc': 'roc_auc',
'precision': 'precision',
'recall': 'recall'
}
models = {
'LogisticRegression': LogisticRegression(max_iter=10000, random_state=42),
'RandomForest': RandomForestClassifier(n_estimators=200, random_state=42),
'GradientBoosting': GradientBoostingClassifier(
n_estimators=200, learning_rate=0.05, max_depth=5, random_state=42
)
}
for name, model in models.items():
results = cross_validate(
model, X, y, cv=cv, scoring=scoring, return_train_score=True
)
metrics = {
metric: results[f'test_{metric}'].mean()
for metric in scoring.keys()
}
tracker.log_experiment(name, model.get_params(), metrics, model)
print(f"{name:<25s} F1={metrics['f1']:.3f} AUC={metrics['roc_auc']:.3f}")
best = tracker.get_best('f1')
print(f"\nMiglior modello: {best['name']} (F1={best['metrics']['f1']:.3f})")
return best
# Uso
tracker = ExperimentTracker()
# best_model = train_and_evaluate(X_preprocessed, y, tracker)
print("Pipeline Stage 4-5: Training & Evaluation - OK")
フェーズ 6: FastAPI を使用したモデル提供
モデルの展開 REST API FastAPI を使用すると、あらゆるアプリケーションで次のことが可能になります。 リアルタイムの予測を取得します。 API は顧客の機能を含む JSON リクエストを受け入れ、返します チャーンの確率と予測されたクラス。シリアル化されたモデル (joblib または pickle を使用) が付属します サーバーの起動時にロードされます。
# api.py - Deployment del modello con FastAPI
# pip install fastapi uvicorn
from dataclasses import dataclass
from typing import Dict, Any
import json
@dataclass
class PredictionRequest:
"""Schema della richiesta di predizione."""
tenure: int
monthly_charges: float
total_charges: float
contract: str
payment_method: str
@dataclass
class PredictionResponse:
"""Schema della risposta."""
churn_probability: float
prediction: str
confidence: float
model_version: str
class ModelServer:
"""Server per il modello ML."""
def __init__(self, model_path: str, version: str = "1.0.0"):
self.version = version
# In produzione: self.model = joblib.load(model_path)
# self.pipeline = joblib.load(f"{model_path}/pipeline.pkl")
print(f"Modello v{version} caricato da {model_path}")
def predict(self, request: PredictionRequest) -> PredictionResponse:
"""Genera predizione per un singolo cliente."""
# features = self.pipeline.transform(request_to_dataframe(request))
# proba = self.model.predict_proba(features)[0]
# Simulazione
proba = [0.3, 0.7]
return PredictionResponse(
churn_probability=round(proba[1], 3),
prediction="churn" if proba[1] > 0.5 else "no_churn",
confidence=round(max(proba), 3),
model_version=self.version
)
# FastAPI app (in produzione):
# app = FastAPI(title="Churn Prediction API")
# server = ModelServer("models/best_model")
#
# @app.post("/predict")
# async def predict(request: PredictionRequest):
# return server.predict(request)
#
# Avvio: uvicorn api:app --host 0.0.0.0 --port 8000
print("Pipeline Stage 6: Model Serving - OK")
フェーズ 7: モニタリングと再トレーニング
実稼働モデルは時間の経過とともに次のような理由で劣化します。 データドリフト (生産データ トレーニングと比較した変化)と コンセプトドリフト (機能とターゲットの関係が変わります)。 継続的なモニタリングにより、モデルのパフォーマンス、入力の分布、分布を追跡します。 予測の。メトリクスがしきい値を下回ると、 再訓練 自動.
ML 制作チェックリスト: (1) データとモデルのバージョニング、(2) 再現可能なパイプライン、 (3) 前処理と予測のための自動テスト、(4) 本番環境でのメトリクスの監視、 (5) データのドリフトとパフォーマンスの低下に関するアラート、(6) 以前のバージョンへの迅速なロールバック、 (7) デバッグ用の完全なログ記録、(8) 新しいモデル バージョンの A/B テスト。
MLOps ツール
MLOps エコシステムは、各段階に特化したツールを提供します。 MLフロー 実験的な追跡用 そしてモデルレジストリ。 DVC (データ バージョン コントロール) データとパイプラインのバージョン管理用。 ドッカー コンテナ化のため。 GitHub アクション CI/CDの場合。 プロメテウス + グラファナ モニタリング用。 大きな期待 データ品質のために。の選択 ツールはプロジェクトの規模と既存のインフラストラクチャによって異なります。
重要なポイント
- エンドツーエンドの ML パイプラインは、生データから本番環境で監視されるモデルまでの 7 つの段階をカバーします。
- scikit-learn のカスタムトランスフォーマーはトレーニングと推論の間の一貫性を確保します
- 実験追跡 (MLflow) により、実験を再現可能かつ比較可能にします
- FastAPI + Docker はモデル提供のための標準スタックです
- データドリフトとコンセプトドリフトには継続的なモニタリングと自動再トレーニングが必要です
- 再現性は本番環境の ML パイプラインで最も重要な要件です







