아이디어에서 생산까지: 완전한 ML 파이프라인
Jupyter 노트북에서 실행되는 ML 모델을 구축하는 것은 전체 작업의 10%에 불과합니다. 나머지 90% 시스템으로 변환하는 것으로 구성됩니다. 생산 준비 완료: 신뢰성, 모니터링 가능, 재현 가능 유지 관리가 가능합니다. 에이 엔드투엔드 ML 파이프라인 원시 데이터부터 데이터까지 전체 흐름을 자동화합니다. 생산 예측을 통해 모든 실행에서 일관성과 품질을 보장합니다. 이 문서는 빌드 실제 문제에 대한 완전한 파이프라인: 이탈 예측 (포기의 예측 고객).
파이프라인은 데이터 로딩, 전처리, 기능 엔지니어링, 교육, 평가, 모델 선택 및 배포. 각 단계는 모듈식이며 테스트 가능하고 조정된 구성 요소입니다. 실험 추적을 위한 MLflow와 컨테이너화를 위한 Docker와 같은 도구를 사용합니다.
이 기사에서 배울 내용
- 완전한 ML 파이프라인의 아키텍처
- 사례 연구: 엔드 투 엔드 이탈 예측
- MLflow를 사용한 실험 추적
- 데이터 버전 관리 및 재현성
- FastAPI를 사용한 모델 제공
- 생산 모니터링 및 재교육
1단계: 데이터 로드 및 검증
첫 번째 단계에서는 소스에서 데이터를 로드하고 품질을 확인합니다. 컨트롤에는 다음이 포함됩니다. 스키마 유효성 검사(예상 열, 데이터 유형), 데이터 품질 검사(누락된 값의 비율, 유효한 범위, 비정상적인 분포) 및 데이터 프로파일링(기술 통계). 데이터가 검사에 실패하면 신뢰할 수 없는 모델을 생성하는 대신 파이프라인이 명확한 오류로 인해 중지됩니다.
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class DataValidationResult:
"""Risultato della validazione dati."""
is_valid: bool
errors: List[str]
warnings: List[str]
stats: Dict[str, float]
def load_and_validate(filepath: str, expected_columns: List[str]) -> tuple:
"""Carica e valida il dataset."""
errors = []
warnings = []
# Caricamento
df = pd.read_csv(filepath)
# Schema validation
missing_cols = set(expected_columns) - set(df.columns)
if missing_cols:
errors.append(f"Colonne mancanti: {missing_cols}")
# Data quality checks
null_pct = df.isnull().mean()
high_null_cols = null_pct[null_pct > 0.3].index.tolist()
if high_null_cols:
warnings.append(f"Colonne con >30% null: {high_null_cols}")
# Duplicati
n_dupes = df.duplicated().sum()
if n_dupes > 0:
warnings.append(f"{n_dupes} righe duplicate trovate")
# Statistiche
stats = {
'n_rows': len(df),
'n_cols': len(df.columns),
'null_pct_avg': null_pct.mean(),
'n_duplicates': n_dupes
}
result = DataValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
stats=stats
)
return df, result
# Uso
# df, validation = load_and_validate('data/churn.csv', expected_columns)
# if not validation.is_valid:
# raise ValueError(f"Validazione fallita: {validation.errors}")
print("Pipeline Stage 1: Data Loading & Validation - OK")
2-3단계: 전처리 및 특성 엔지니어링
전처리 및 기능 엔지니어링이 캡슐화되어 있습니다. scikit-learn 파이프라인 e
맞춤형 변압기. 맞춤형 변압기 확장 BaseEstimator e
TransformerMixin 파이프라인에 원활하게 통합됩니다. 이는 동일한 것을 보장합니다.
변환은 교육과 프로덕션 모두에 적용되므로 불일치의 위험이 제거됩니다.
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
import numpy as np
import pandas as pd
class FeatureEngineer(BaseEstimator, TransformerMixin):
"""Custom transformer per feature engineering."""
def fit(self, X, y=None):
return self
def transform(self, X):
df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
# Feature derivate (esempio churn prediction)
if 'tenure' in df.columns and 'monthly_charges' in df.columns:
df['total_spent'] = df['tenure'] * df['monthly_charges']
df['avg_monthly_ratio'] = df['monthly_charges'] / (df['tenure'] + 1)
if 'tenure' in df.columns:
df['tenure_group'] = pd.cut(
df['tenure'], bins=[0, 12, 24, 48, 72],
labels=['new', 'developing', 'mature', 'loyal']
)
return df
def build_preprocessing_pipeline(
numeric_features: list,
categorical_features: list
) -> Pipeline:
"""Costruisce la pipeline di preprocessing."""
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(drop='first', handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
return Pipeline([
('feature_engineer', FeatureEngineer()),
('preprocessor', preprocessor)
])
print("Pipeline Stage 2-3: Preprocessing & Feature Engineering - OK")
4-5단계: 훈련, 평가 및 모델 선택
훈련 단계에서는 교차 검증을 통해 여러 알고리즘을 체계적으로 비교하고 더 나은. 그만큼'실험적 추적 MLflow를 사용하면 매개변수, 측정항목을 자동으로 기록합니다. 각 실험의 인공물을 통해 프로세스를 재현하고 비교할 수 있습니다. 선택한 모델 배포를 위해 직렬화되고 버전이 지정됩니다.
from sklearn.ensemble import (
RandomForestClassifier, GradientBoostingClassifier
)
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_validate, StratifiedKFold
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
import numpy as np
from datetime import datetime
class ExperimentTracker:
"""Tracker semplificato per esperimenti ML."""
def __init__(self):
self.experiments = []
def log_experiment(self, name, params, metrics, model):
self.experiments.append({
'name': name,
'params': params,
'metrics': metrics,
'model': model,
'timestamp': datetime.now().isoformat()
})
def get_best(self, metric='f1'):
return max(self.experiments, key=lambda x: x['metrics'].get(metric, 0))
def train_and_evaluate(X, y, tracker):
"""Addestra e valuta più' modelli."""
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scoring = {
'accuracy': 'accuracy',
'f1': 'f1',
'roc_auc': 'roc_auc',
'precision': 'precision',
'recall': 'recall'
}
models = {
'LogisticRegression': LogisticRegression(max_iter=10000, random_state=42),
'RandomForest': RandomForestClassifier(n_estimators=200, random_state=42),
'GradientBoosting': GradientBoostingClassifier(
n_estimators=200, learning_rate=0.05, max_depth=5, random_state=42
)
}
for name, model in models.items():
results = cross_validate(
model, X, y, cv=cv, scoring=scoring, return_train_score=True
)
metrics = {
metric: results[f'test_{metric}'].mean()
for metric in scoring.keys()
}
tracker.log_experiment(name, model.get_params(), metrics, model)
print(f"{name:<25s} F1={metrics['f1']:.3f} AUC={metrics['roc_auc']:.3f}")
best = tracker.get_best('f1')
print(f"\nMiglior modello: {best['name']} (F1={best['metrics']['f1']:.3f})")
return best
# Uso
tracker = ExperimentTracker()
# best_model = train_and_evaluate(X_preprocessed, y, tracker)
print("Pipeline Stage 4-5: Training & Evaluation - OK")
6단계: FastAPI를 사용한 모델 제공
모델 배포 REST API FastAPI를 사용하면 모든 애플리케이션에서 실시간 예측을 받아보세요. API는 고객 기능 및 반환이 포함된 JSON 요청을 수락합니다. 이탈 확률과 예측 클래스. 직렬화된 모델(joblib 또는 pickle 포함)은 다음과 같습니다. 서버가 시작될 때 로드됩니다.
# api.py - Deployment del modello con FastAPI
# pip install fastapi uvicorn
from dataclasses import dataclass
from typing import Dict, Any
import json
@dataclass
class PredictionRequest:
"""Schema della richiesta di predizione."""
tenure: int
monthly_charges: float
total_charges: float
contract: str
payment_method: str
@dataclass
class PredictionResponse:
"""Schema della risposta."""
churn_probability: float
prediction: str
confidence: float
model_version: str
class ModelServer:
"""Server per il modello ML."""
def __init__(self, model_path: str, version: str = "1.0.0"):
self.version = version
# In produzione: self.model = joblib.load(model_path)
# self.pipeline = joblib.load(f"{model_path}/pipeline.pkl")
print(f"Modello v{version} caricato da {model_path}")
def predict(self, request: PredictionRequest) -> PredictionResponse:
"""Genera predizione per un singolo cliente."""
# features = self.pipeline.transform(request_to_dataframe(request))
# proba = self.model.predict_proba(features)[0]
# Simulazione
proba = [0.3, 0.7]
return PredictionResponse(
churn_probability=round(proba[1], 3),
prediction="churn" if proba[1] > 0.5 else "no_churn",
confidence=round(max(proba), 3),
model_version=self.version
)
# FastAPI app (in produzione):
# app = FastAPI(title="Churn Prediction API")
# server = ModelServer("models/best_model")
#
# @app.post("/predict")
# async def predict(request: PredictionRequest):
# return server.predict(request)
#
# Avvio: uvicorn api:app --host 0.0.0.0 --port 8000
print("Pipeline Stage 6: Model Serving - OK")
7단계: 모니터링 및 재교육
생산 중인 모델은 시간이 지남에 따라 성능이 저하됩니다. 데이터 드리프트 (생산 데이터 훈련과 비교하여 변화) 및 컨셉 드리프트 (기능과 대상 변경 간의 관계) 지속적인 모니터링은 모델 성능, 입력 분포 및 분포를 추적합니다. 예측의. 측정항목이 임계값 아래로 떨어지면 재교육 자동.
ML 프로덕션 체크리스트: (1) 데이터 및 모델 버전 관리, (2) 재현 가능한 파이프라인, (3) 전처리 및 예측을 위한 자동화된 테스트, (4) 생산 중 지표 모니터링, (5) 데이터 드리프트 및 성능 저하에 대한 경고, (6) 이전 버전으로의 빠른 롤백, (7) 디버깅을 위한 전체 로깅, (8) 새 모델 버전에 대한 A/B 테스트.
MLOps 도구
MLOps 생태계는 각 단계에 특화된 도구를 제공합니다. MLflow 실험적 추적을 위해 그리고 모델 레지스트리. DVC(데이터 버전 관리) 데이터 및 파이프라인 버전 관리용. 도커 컨테이너화를 위해. GitHub 작업 CI/CD용. 프로메테우스 + 그라파나 모니터링을 위해. 큰 기대 데이터 품질을 위해. 선택 도구는 프로젝트 규모와 기존 인프라에 따라 달라집니다.
핵심 사항
- 엔드 투 엔드 ML 파이프라인은 원시 데이터부터 프로덕션에서 모니터링되는 모델까지 7단계를 포함합니다.
- scikit-learn의 맞춤형 변환기는 훈련과 추론 간의 일관성을 보장합니다.
- 실험 추적(MLflow)을 통해 실험을 재현 및 비교할 수 있습니다.
- FastAPI + Docker는 모델 제공을 위한 표준 스택입니다.
- 데이터 드리프트 및 개념 드리프트에는 지속적인 모니터링과 자동 재교육이 필요합니다.
- 재현성은 프로덕션 ML 파이프라인에서 가장 중요한 요구 사항입니다.







