De l'Idée à la Production : Le Pipeline ML Complet
Construire un modèle ML dans un notebook Jupyter n'est que 10% du travail. Les 90% restants consistent à le transformer en système production-ready. Un pipeline ML end-to-end automatise tout le flux.
Phase 1 : Chargement et Validation
Cette section explore en détail phase 1 : chargement et validation.
Phases 2-3 : Preprocessing et Feature Engineering
Cette section explore en détail phases 2-3 : preprocessing et feature engineering.
Phases 4-5 : Entraînement et Évaluation
Cette section explore en détail phases 4-5 : entraînement et évaluation.
Phase 6 : Model Serving avec FastAPI
Cette section explore en détail phase 6 : model serving avec fastapi.
Phase 7 : Monitoring et Réentraînement
Cette section explore en détail phase 7 : monitoring et réentraînement.
Outils MLOps
Cette section explore en détail outils mlops.
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class DataValidationResult:
"""Resultado de la validación de datos."""
is_valid: bool
errors: List[str]
warnings: List[str]
stats: Dict[str, float]
def load_and_validate(filepath: str, expected_columns: List[str]) -> tuple:
"""Carga y valida el dataset."""
errors = []
warnings = []
# Carga
df = pd.read_csv(filepath)
# Validación de esquema
missing_cols = set(expected_columns) - set(df.columns)
if missing_cols:
errors.append(f"Columnas faltantes: {missing_cols}")
# Controles de calidad de datos
null_pct = df.isnull().mean()
high_null_cols = null_pct[null_pct > 0.3].index.tolist()
if high_null_cols:
warnings.append(f"Columnas con >30% null: {high_null_cols}")
# Duplicados
n_dupes = df.duplicated().sum()
if n_dupes > 0:
warnings.append(f"{n_dupes} filas duplicadas encontradas")
# Estadísticas
stats = {
'n_rows': len(df),
'n_cols': len(df.columns),
'null_pct_avg': null_pct.mean(),
'n_duplicates': n_dupes
}
result = DataValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
stats=stats
)
return df, result
# Uso
# df, validation = load_and_validate('data/churn.csv', expected_columns)
# if not validation.is_valid:
# raise ValueError(f"Validación fallida: {validation.errors}")
print("Pipeline Stage 1: Data Loading & Validation - OK")
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
import numpy as np
import pandas as pd
class FeatureEngineer(BaseEstimator, TransformerMixin):
"""Custom transformer para feature engineering."""
def fit(self, X, y=None):
return self
def transform(self, X):
df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
# Features derivadas (ejemplo churn prediction)
if 'tenure' in df.columns and 'monthly_charges' in df.columns:
df['total_spent'] = df['tenure'] * df['monthly_charges']
df['avg_monthly_ratio'] = df['monthly_charges'] / (df['tenure'] + 1)
if 'tenure' in df.columns:
df['tenure_group'] = pd.cut(
df['tenure'], bins=[0, 12, 24, 48, 72],
labels=['nuevo', 'en_desarrollo', 'maduro', 'fiel']
)
return df
def build_preprocessing_pipeline(
numeric_features: list,
categorical_features: list
) -> Pipeline:
"""Construye el pipeline de preprocesamiento."""
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(drop='first', handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
return Pipeline([
('feature_engineer', FeatureEngineer()),
('preprocessor', preprocessor)
])
print("Pipeline Stage 2-3: Preprocessing & Feature Engineering - OK")
Points Clés
- Un pipeline ML end-to-end couvre 7 phases
- Les custom transformers garantissent la cohérence
- Experiment tracking (MLflow) rend les expériences reproductibles
- FastAPI + Docker est le stack standard pour le model serving
- Data drift et concept drift requièrent un monitoring continu
- La reproductibilité est l'exigence la plus importante







