MLflow による実験追跡: 完全ガイド
どのハイパーパラメータの組み合わせがその結果を生み出したかを探すのに何時間も無駄にしたことはありませんか 3週間前から素晴らしいですか?あるいは、なぜそのモデルが実稼働環境にあるのか疑問に思ったことはありませんか? ローカルでテストしたものとは動作が異なりますか?これらの問題は非常に一般的です 機械学習のライフサイクルにおいては、システムの欠如という共通の根源があります。 によって構造化された 実験的な追跡.
MLフロー この問題に対する最も一般的なオープンソースの答えです。生まれ 2018 年の Databricks は 2019 年に Apache プロジェクトになり、MLflow はその地位を確立しました。 Python エコシステムにおける ML 実験を追跡するための事実上の標準です。と のリリース MLflow 3 (2025 年 6 月) Databricks Data + AI サミット中に、 このプラットフォームは、追跡ツールから ML および GenAI モデルの開発、評価、展開のための統合プラットフォーム、 LoggedModel をファーストクラス エンティティとして使用し、ロギング パフォーマンスが 25% 向上 バージョン2.5との比較。
このガイドでは、インストールから高度な追跡まで、MLflow をエンドツーエンドで見ていきます。 モデル レジストリへの自動ロギングから、モデルの提供と Docker の統合まで。 各サンプルはテストされており、実稼働環境で使用する準備ができています。
この記事で学べること
- MLflow アーキテクチャ: トラッキング サーバー、バックエンド ストア、アーティファクト ストア、および MLflow 3 の新機能
- ローカルセットアップとプロダクション: アーティファクトストアとしての SQLite、PostgreSQL、S3
- 完全な実験追跡: ログパラメータ、メトリクス、アーティファクト、タグ、ネストされた実行
- 自動ロギング: scikit-learn、XGBoost、PyTorch、TensorFlow との設定不要の統合
- モデル レジストリ: ステージング、実稼働、アーカイブ、およびモデルのライフサイクル管理
- MLflow によるモデル提供: REST API、Docker コンテナ、FastAPI 統合
- 実稼働デプロイのための Docker Compose を使用した MLflow
- 代替手段との比較: W&B、Neptune、ClearML - いつ何を選択するか
- あらゆる規模の ML チーム向けのベスト プラクティスとアンチパターン
MLOps シリーズと本番環境における機械学習
| # | アイテム | 集中 |
|---|---|---|
| 1 | MLOps: 実験から実稼働へ | 基盤と完全なライフサイクル |
| 2 | CI/CD を使用した ML パイプライン | ML 用の GitHub アクションと Docker |
| 3 | DVC と LakeFS のバージョン管理 | データセットとモデルのバージョン管理 |
| 4 | 現在地 - MLflow による実験追跡 | 追跡、登録、提供 |
| 5 | モデルドリフトの検出 | 自動監視と再トレーニング |
| 6 | FastAPI + Uvicorn によるサービスの提供 | 本番環境へのモデルのデプロイメント |
| 7 | Kubernetes での ML のスケーリング | KubeFlow と Seldon コア |
| 8 | ML モデルの A/B テスト | 方法論と実装 |
| 9 | ML ガバナンス | コンプライアンス、EU の AI 法、倫理 |
| 10 | ケーススタディ: チャーン予測 | 本番環境におけるエンドツーエンドのパイプライン |
MLflow アーキテクチャ: 4 つの基本コンポーネント
コード行を記述する前に、MLflow の構成要素を理解することが重要です。 プラットフォームは 4 つの主要コンポーネントで構成され、それぞれに特定の役割があります 機械学習のライフサイクルでは:
- MLflow 追跡: 実験のログ記録とクエリを実行するための API と UI。 各トレーニング実行のパラメータ、メトリクス、タグ、アーティファクト、およびメモを記録します。
- MLflow プロジェクト: ML コードを再現可能な実行にパッケージ化するための形式、 Conda または Docker を介した依存関係と環境管理を備えています。
- MLflow モデル: テンプレートを保存するための標準形式。 複数のフレームワーク (Python 関数、REST API、Spark UDF など) によって提供されます。
- MLflow モデル レジストリ: ライフサイクルを管理する集中ストア モデルのバージョン管理、ステージング、実稼働、アーカイブ、監査証跡。
MLflow 3 (2025) では、5 番目の基本要素である概念が追加されています。 ログモデル 一流の存在として。以前のアプローチの代わりに 実行中心 (モデルは実行の単なるアーティファクト)、LoggedModels は永続的 複数の実行、環境、デプロイメントにわたって、完全なパラメータ系統を使用して、 メトリクス、トレース、評価データ。
MLflow ストレージ アーキテクチャ
| 成分 | 含まれるもの | 推奨されるバックエンド |
|---|---|---|
| バックエンドストア | 実行パラメータ、メトリクス、タグ、メタデータ | PostgreSQL/MySQL (本番環境)、SQLite (ローカル) |
| アーティファクトストア | モデルファイル、画像、CSV、評価データ | S3、GCS、Azure BLOB (運用)、ローカル ファイル システム |
| 追跡サーバー | ロギングとWeb UI用のREST API | Docker/EC2/Kubernetes ポッドコンテナ |
| モデルレジストリ | テンプレートのリリース、ステージ、アノテーション、Webhook | データベースが必要です (ファイル システムでは機能しません) |
MLflow セットアップ: ローカルから本番環境まで
ローカルでのインストールとセットアップ
MLflow の基本インストールには、1 つの pip コマンドが必要です。地域発展のために、 MLflow は SQLite をバックエンド ストアとして使用し、ローカル ファイルシステムをアーティファクト ストアとして使用します。 追加のサーバーは必要ありません。
# Installazione MLflow (versione 2.x/3.x)
pip install mlflow
# Con extras per integrazioni specifiche
pip install mlflow[extras] # scikit-learn, XGBoost, LightGBM
pip install mlflow[databricks] # integrazione Databricks
pip install mlflow[genai] # tools per GenAI e LLM (MLflow 3+)
# Verifica installazione
mlflow --version
# mlflow, version 2.x.x o 3.x.x
# Avvia la UI locale (usa ./mlruns come storage)
mlflow ui
# UI disponibile su http://localhost:5000
# Avvia con database SQLite
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlruns \
--host 0.0.0.0 \
--port 5000
本番環境のセットアップ: PostgreSQL + S3
複数のユーザーが存在し、同時実行性が高い運用環境では、次の使用が不可欠です。 バックエンドとしてのリレーショナル データベースと、アーティファクト ストアとしてのオブジェクト ストレージです。モデル MLflow レジストリにはデータベースが必要です (ファイル システムでは機能しません)。
# ==================== docker-compose.yml ====================
version: '3.8'
services:
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mlflow"]
interval: 10s
timeout: 5s
retries: 5
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: ${MINIO_ACCESS_KEY}
MINIO_ROOT_PASSWORD: ${MINIO_SECRET_KEY}
volumes:
- minio_data:/data
ports:
- "9000:9000"
- "9001:9001"
mlflow:
image: ghcr.io/mlflow/mlflow:v2.19.0
depends_on:
postgres:
condition: service_healthy
environment:
MLFLOW_BACKEND_STORE_URI: postgresql://mlflow:${POSTGRES_PASSWORD}@postgres:5432/mlflow
MLFLOW_ARTIFACT_ROOT: s3://mlflow-artifacts/
AWS_ACCESS_KEY_ID: ${MINIO_ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${MINIO_SECRET_KEY}
MLFLOW_S3_ENDPOINT_URL: http://minio:9000
command: >
mlflow server
--backend-store-uri postgresql://mlflow:${POSTGRES_PASSWORD}@postgres:5432/mlflow
--default-artifact-root s3://mlflow-artifacts/
--host 0.0.0.0
--port 5000
ports:
- "5000:5000"
volumes:
postgres_data:
minio_data:
# .env file (NON commitare in Git!)
POSTGRES_PASSWORD=sicura_password_123
MINIO_ACCESS_KEY=minio_admin
MINIO_SECRET_KEY=minio_password_sicura
# Avvio dello stack completo
docker-compose up -d
# Verifica che MLflow sia in ascolto
curl http://localhost:5000/health
# {"status": "OK"}
# Crea il bucket MinIO per gli artefatti
docker exec -it minio_container mc alias set local http://localhost:9000 admin password
docker exec -it minio_container mc mb local/mlflow-artifacts
中小企業の予算 <5,000 ユーロ/年
予算が限られているチームの場合は、単一 VM 上で Docker Compose を使用するこのセットアップを使用します。 費用は年間約 180 ユーロ (EC2 t3.small または同等)。 MinIO はローカルで S3 を置き換えます 完全に互換性のある API を備えています。永続ストレージの場合は、AWS S3 を使用することもできます (数 GB のアーティファクトの場合、月額約 2 ~ 5 ユーロ)。プラットフォームと比較した節約 W&B Teams のような SaaS (ユーザーあたり月額 50 米ドル以上) および重要: 5 人のチーム 年間 2500 ユーロ以上節約できます。
実験の追跡: ログパラメータ、メトリクス、アーティファクト
MLflow と追跡 API の中心。すべての電話 mlflow.start_run()
新しいものを作成する 走る の中に 実験。あ
実験グループに関連する実行 (例: チャーン予測モデルのすべての実行)。
実行ログの 4 種類のデータ:
- パラメータ: 実行の固定値 (ハイパーパラメータ、構成)
- メトリクス: 時間の経過とともに変化する可能性のある数値 (エポックごとの損失、精度)
- アーティファクト: 任意のファイル (テンプレート、画像、データセット、HTML レポート)
- タグ: 実行の注釈付けとフィルタリングのためのキーと値のメタデータ
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import (
accuracy_score, f1_score, roc_auc_score,
confusion_matrix, classification_report, RocCurveDisplay
)
from sklearn.model_selection import train_test_split
import os
# ==================== Configurazione MLflow ====================
# Connessione al tracking server (locale o remoto)
mlflow.set_tracking_uri("http://localhost:5000")
# Crea o usa un experiment esistente
experiment_name = "churn-prediction-gbm"
mlflow.set_experiment(experiment_name)
# Ottieni informazioni sull'experiment
experiment = mlflow.get_experiment_by_name(experiment_name)
print(f"Experiment ID: {experiment.experiment_id}")
# ==================== Training con Tracking Completo ====================
def train_churn_model(X_train, X_val, y_train, y_val, params: dict) -> str:
"""
Allena un GBM per churn prediction con tracking MLflow completo.
Restituisce il run_id del run MLflow.
"""
with mlflow.start_run(run_name=f"gbm-lr{params['learning_rate']}-depth{params['max_depth']}") as run:
# ---- 1. TAG: metadata del run ----
mlflow.set_tags({
"team": "ml-engineering",
"project": "churn-prediction",
"dataset_version": "v2.1",
"git_commit": os.popen("git rev-parse HEAD").read().strip(),
"environment": "dev",
})
# ---- 2. PARAMS: iperparametri e configurazione ----
mlflow.log_params(params)
mlflow.log_params({
"train_size": len(X_train),
"val_size": len(X_val),
"n_features": X_train.shape[1],
"target_positive_rate": float(y_train.mean()),
})
# ---- 3. TRAINING ----
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# ---- 4. METRICS: step-by-step durante training ----
# Logga la loss per ogni stage del GBM (equivalente all'epoch loss)
train_scores = list(model.staged_predict(X_train))
val_scores = list(model.staged_predict(X_val))
for step, (tr_pred, val_pred) in enumerate(zip(train_scores, val_scores)):
train_acc = accuracy_score(y_train, tr_pred)
val_acc = accuracy_score(y_val, val_pred)
mlflow.log_metrics({
"train_accuracy_step": train_acc,
"val_accuracy_step": val_acc,
}, step=step)
# ---- 5. METRICS FINALI ----
y_pred = model.predict(X_val)
y_prob = model.predict_proba(X_val)[:, 1]
final_metrics = {
"accuracy": accuracy_score(y_val, y_pred),
"f1_score": f1_score(y_val, y_pred),
"auc_roc": roc_auc_score(y_val, y_prob),
"precision": float(np.mean(y_pred[y_pred == 1] == y_val[y_pred == 1])) if sum(y_pred) > 0 else 0.0,
"recall": float(sum((y_pred == 1) & (y_val == 1)) / sum(y_val == 1)),
}
mlflow.log_metrics(final_metrics)
# ---- 6. ARTIFACTS: file del modello e report ----
# Salva e logga confusion matrix come immagine
fig, ax = plt.subplots(figsize=(6, 5))
cm = confusion_matrix(y_val, y_pred)
im = ax.imshow(cm, interpolation='nearest', cmap='Blues')
ax.set_title('Confusion Matrix - Churn Prediction')
ax.set_xlabel('Predicted')
ax.set_ylabel('True')
plt.colorbar(im)
plt.tight_layout()
mlflow.log_figure(fig, "confusion_matrix.png")
plt.close()
# Salva ROC curve
fig2, ax2 = plt.subplots(figsize=(6, 5))
RocCurveDisplay.from_predictions(y_val, y_prob, ax=ax2)
ax2.set_title('ROC Curve - Churn Model')
mlflow.log_figure(fig2, "roc_curve.png")
plt.close()
# Logga classification report come file di testo
report = classification_report(y_val, y_pred, target_names=["No Churn", "Churn"])
mlflow.log_text(report, "classification_report.txt")
# Logga feature importance come CSV
feature_imp = pd.DataFrame({
"feature": X_train.columns.tolist(),
"importance": model.feature_importances_
}).sort_values("importance", ascending=False)
mlflow.log_table(feature_imp.to_dict(orient="list"), "feature_importance.json")
# ---- 7. LOG MODEL: salva il modello con firma e input example ----
input_example = X_val.head(3)
signature = mlflow.models.infer_signature(X_val, y_pred)
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="churn-gbm-model", # Registra automaticamente nel Registry
)
print(f"Run ID: {run.info.run_id}")
print(f"Accuracy: {final_metrics['accuracy']:.4f}")
print(f"AUC-ROC: {final_metrics['auc_roc']:.4f}")
print(f"UI: http://localhost:5000/#/experiments/{experiment.experiment_id}/runs/{run.info.run_id}")
return run.info.run_id
# ==================== Esempio di utilizzo ====================
if __name__ == "__main__":
# Dati sintetici per demo
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000, n_features=20, n_informative=10, random_state=42)
X_df = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(20)])
X_train, X_val, y_train, y_val = train_test_split(X_df, y, test_size=0.2, random_state=42)
params = {
"n_estimators": 300,
"learning_rate": 0.05,
"max_depth": 4,
"subsample": 0.8,
"min_samples_split": 20,
"random_state": 42,
}
run_id = train_churn_model(X_train, X_val, y_train, y_val, params)
ネストされた実行とハイパーパラメータ検索
MLflow は i をサポートします ネストされた実行: 子は親の実行内で実行されます。 このパターンは、どこでもハイパーパラメータ検索 (Optuna、GridSearchCV) に最適です。 検索全体を表す親実行と、それぞれに 1 つずつの多数の子実行 テスト済みの構成:
import mlflow
import optuna
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import cross_val_score
def hyperparameter_search(X_train, y_train, n_trials: int = 50) -> str:
"""
Hyperparameter search con Optuna + MLflow nested runs.
Run padre: contiene il summary della ricerca
Run figli: ogni trial Optuna e un run MLflow figlio
"""
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("churn-hparam-search")
with mlflow.start_run(run_name="optuna-search-v1") as parent_run:
mlflow.set_tag("search_method", "optuna-tpe")
mlflow.log_param("n_trials", n_trials)
mlflow.log_param("optimization_metric", "auc_roc")
best_auc = 0.0
best_params = {}
def objective(trial) -> float:
"""Funzione obiettivo Optuna - ogni trial e un nested run MLflow."""
params = {
"n_estimators": trial.suggest_int("n_estimators", 100, 1000),
"learning_rate": trial.suggest_float("learning_rate", 0.001, 0.3, log=True),
"max_depth": trial.suggest_int("max_depth", 2, 8),
"subsample": trial.suggest_float("subsample", 0.5, 1.0),
"min_samples_split": trial.suggest_int("min_samples_split", 5, 50),
}
# Ogni trial ottiene il suo run MLflow figlio
with mlflow.start_run(
run_name=f"trial-{trial.number}",
nested=True # <-- indica che e un run figlio
) as child_run:
mlflow.log_params(params)
model = GradientBoostingClassifier(**params, random_state=42)
scores = cross_val_score(model, X_train, y_train, cv=3, scoring="roc_auc")
auc_mean = scores.mean()
auc_std = scores.std()
mlflow.log_metrics({
"cv_auc_mean": auc_mean,
"cv_auc_std": auc_std,
"trial_number": trial.number,
})
return auc_mean
# Esegui la ricerca Optuna
study = optuna.create_study(
direction="maximize",
sampler=optuna.samplers.TPESampler(seed=42)
)
study.optimize(objective, n_trials=n_trials, n_jobs=1)
# Logga i risultati della ricerca nel run padre
best_trial = study.best_trial
mlflow.log_params({f"best_{k}": v for k, v in best_trial.params.items()})
mlflow.log_metrics({
"best_auc": best_trial.value,
"n_trials_completed": len(study.trials),
})
print(f"Miglior AUC: {best_trial.value:.4f}")
print(f"Migliori parametri: {best_trial.params}")
return parent_run.info.run_id
自動ロギング: ゼロ構成追跡
自動ロギングは MLflow の最も便利な機能の 1 つです。1 行のコードで、 MLflow は、メインの ML フレームワークへの呼び出しを自動的にインターセプトし、パラメーターをログに記録します。 トレーニング コードを変更することなく、メトリクスとアーティファクトを取得できます。そして scikit-learn によってサポートされており、 XGBoost、LightGBM、PyTorch Lightning、TensorFlow/Keras、Spark MLlib など。
import mlflow
import mlflow.sklearn
import mlflow.xgboost
# ==================== Autologging scikit-learn ====================
# Abilita autologging per scikit-learn
# Logga automaticamente: hyperparametri, metriche di training, signature del modello
mlflow.sklearn.autolog(
log_input_examples=True, # Logga esempi di input
log_model_signatures=True, # Inferisce e logga la firma del modello
log_models=True, # Salva il modello come artefatto
log_datasets=False, # Non loggare l'intero dataset (troppo grande)
max_tuning_runs=100, # Per GridSearchCV: max run tracciati
exclusive=False, # Permette log manuali in aggiunta
)
mlflow.set_experiment("autolog-demo")
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
# Con autolog attivo, fit() logga tutto automaticamente
with mlflow.start_run(run_name="rf-gridsearch-autolog"):
param_grid = {
"n_estimators": [100, 300],
"max_depth": [4, 6, 8],
"min_samples_split": [10, 20],
}
model = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=3,
scoring="roc_auc",
n_jobs=-1
)
model.fit(X_train, y_train)
# MLflow ha loggato automaticamente:
# - Tutti i parametri del RandomForest
# - cv=3, scoring, n_jobs
# - Best params dal GridSearchCV
# - Score di validazione incrociata
# - Il modello come artefatto
# ==================== Autologging XGBoost ====================
mlflow.xgboost.autolog(
importance_types=["gain", "weight"], # Logga feature importance
log_model_signatures=True,
log_input_examples=True,
)
import xgboost as xgb
import numpy as np
with mlflow.start_run(run_name="xgb-autolog"):
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
params = {
"objective": "binary:logistic",
"eval_metric": ["logloss", "auc"],
"learning_rate": 0.05,
"max_depth": 6,
"n_estimators": 500,
"subsample": 0.8,
"seed": 42,
}
# Autolog traccia: tutte le metriche per ogni boosting round
# e il modello finale, la feature importance
booster = xgb.train(
params,
dtrain,
num_boost_round=500,
evals=[(dtrain, "train"), (dval, "val")],
early_stopping_rounds=50,
verbose_eval=False,
)
# ==================== Autologging PyTorch Lightning ====================
mlflow.pytorch.autolog(
every_n_iter=10, # Logga ogni 10 iterazioni
log_models=True,
checkpoint_monitor="val_loss",
)
# Con PyTorch Lightning, chiama semplicemente trainer.fit()
# e MLflow cattura automaticamente tutte le loss e le metriche
自動ロギングの制限事項
自動ログはラピッド プロトタイピングには便利ですが、運用環境ではいくつかの制限があります。
ユーザー定義のカスタムメトリクスをログに記録しません。データセット情報もログに記録しません。
(サイズ、バージョン、クラス分布)、間の依存関係は処理されません。
実験。成熟した MLOps パイプラインの場合、基礎として自動ロギングを使用することをお勧めします
手動呼び出しを追加します mlflow.log_param(), mlflow.log_metric()
e mlflow.log_artifact() ドメイン固有の情報については。
MLflow モデル レジストリ: モデルのライフサイクル
MLflow モデル レジストリは、単純なツールから MLflow を変換するコンポーネントです 真の MLOps プラットフォームへの追跡。モデルのバージョンを管理できます。 標準化されたライフサイクル: から 発達 a ステージング a 生産、 監査証跡、注釈、通知が含まれます。
レジストリには、UI (ステージを変更するためのドラッグ アンド ドロップ) と経由の両方からアクセスできます。 CI/CD 自動化に不可欠な Python API。 MLflow 3 では、レジストリは状態です ステージ切り替え時の自動通知のための Webhook が強化されています。
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities.model_registry import ModelVersion
import time
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
MODEL_NAME = "churn-gbm-model"
# ==================== 1. REGISTRARE UN MODELLO ====================
# Metodo A: durante il log_model (più comune)
with mlflow.start_run() as run:
# ... training ...
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name=MODEL_NAME,
)
# Il modello viene automaticamente registrato come versione 1
# con stage "None" (development)
# Metodo B: da un run esistente tramite URI
run_id = "abc123def456"
model_uri = f"runs:/{run_id}/model"
version = mlflow.register_model(
model_uri=model_uri,
name=MODEL_NAME,
tags={"team": "ml-eng", "algorithm": "gbm"}
)
print(f"Registrato: {MODEL_NAME} versione {version.version}")
# ==================== 2. GESTIRE LE VERSIONI E GLI STAGE ====================
# Aggiungi una descrizione alla versione
client.update_model_version(
name=MODEL_NAME,
version=version.version,
description=(
"GBM per churn prediction v2.1. "
"Accuracy: 0.9423, AUC-ROC: 0.9567. "
"Trainato su dataset 2024-01 to 2025-01, 45k samples."
)
)
# Promuovi a Staging (dopo validazione interna)
client.transition_model_version_stage(
name=MODEL_NAME,
version=version.version,
stage="Staging",
archive_existing_versions=False, # Mantieni altre versioni staging
)
print(f"Modello v{version.version} promosso a Staging")
# Aggiungi tag per tracciabilita
client.set_model_version_tag(
name=MODEL_NAME,
version=version.version,
key="validated_by",
value="alice.rossi@company.com"
)
client.set_model_version_tag(
name=MODEL_NAME,
version=version.version,
key="validation_date",
value="2025-11-15"
)
# Promuovi a Production (dopo approvazione)
client.transition_model_version_stage(
name=MODEL_NAME,
version=version.version,
stage="Production",
archive_existing_versions=True, # Archivia la versione Production precedente
)
print(f"Modello v{version.version} in produzione!")
# ==================== 3. CARICARE IL MODELLO IN PRODUZIONE ====================
def load_production_model(model_name: str):
"""Carica sempre la versione Production dal registry."""
model_uri = f"models:/{model_name}/Production"
model = mlflow.sklearn.load_model(model_uri)
return model
# In uno script di inference o serving
model = load_production_model(MODEL_NAME)
predictions = model.predict(new_data)
# ==================== 4. INTERROGARE IL REGISTRY ====================
# Lista tutte le versioni di un modello
versions = client.search_model_versions(f"name='{MODEL_NAME}'")
for v in versions:
print(f"v{v.version} | Stage: {v.current_stage} | Run: {v.run_id[:8]}...")
# Cerca solo versioni in Production
prod_versions = client.get_latest_versions(MODEL_NAME, stages=["Production"])
if prod_versions:
latest_prod = prod_versions[0]
print(f"Versione in produzione: v{latest_prod.version}")
print(f"Run ID: {latest_prod.run_id}")
# Ottieni tutte le metriche associate alla versione in produzione
run_data = client.get_run(latest_prod.run_id).data
print(f"AUC-ROC in produzione: {run_data.metrics.get('auc_roc', 'N/A')}")
# ==================== 5. ROLLBACK IN CASO DI PROBLEMA ====================
def rollback_to_previous_production(model_name: str) -> None:
"""
Rollback: archivia la versione Production attuale e
ripristina la versione Archived più recente.
"""
# Trova versione corrente in Production
current_prod = client.get_latest_versions(model_name, stages=["Production"])
if not current_prod:
print("Nessuna versione in produzione trovata")
return
# Trova la versione Archived più recente (precedente Production)
archived = client.search_model_versions(
f"name='{model_name}'",
filter_string="tags.stage_history LIKE '%production%'",
)
if len(archived) < 2:
print("Nessuna versione archiviata disponibile per rollback")
return
# Archivia la versione problematica
client.transition_model_version_stage(
name=model_name,
version=current_prod[0].version,
stage="Archived",
)
# Promuovi la versione precedente
prev_version = archived[1].version
client.transition_model_version_stage(
name=model_name,
version=prev_version,
stage="Production",
)
print(f"Rollback completato: ora in produzione v{prev_version}")
MLflow を使用したモデル提供
MLflow には、登録されたモデルを公開する組み込みのサービング サーバーが含まれています 単一のコマンドで REST API として使用できます。プロトタイピングのための優れたソリューション そして開発環境。大規模生産の場合は、モデルを統合することをお勧めします FastAPI を使用した MLflow (シリーズの次の記事を参照)。
# ==================== Serving via CLI ====================
# Servi l'ultima versione Production del modello
mlflow models serve \
--model-uri "models:/churn-gbm-model/Production" \
--host 0.0.0.0 \
--port 8080 \
--env-manager conda
# Servi un run specifico
mlflow models serve \
--model-uri "runs:/abc123def456/model" \
--port 8080
# Con Docker (raccomandato per produzione)
mlflow models build-docker \
--model-uri "models:/churn-gbm-model/Production" \
--name "churn-model-server" \
--enable-mlserver # Usa MLServer per performance migliori
docker run -p 8080:8080 churn-model-server
# ==================== Test del Serving ====================
# Il server espone l'endpoint /invocations
import requests
import json
import pandas as pd
# Prepara i dati di input nel formato atteso da MLflow
test_data = pd.DataFrame({
"feature_0": [0.5, -1.2],
"feature_1": [1.3, 0.8],
# ... altri 18 features
})
# MLflow accetta JSON in formato "split" o "records"
payload = {
"dataframe_split": {
"columns": test_data.columns.tolist(),
"data": test_data.values.tolist()
}
}
response = requests.post(
"http://localhost:8080/invocations",
headers={"Content-Type": "application/json"},
data=json.dumps(payload)
)
predictions = response.json()
print(f"Predizioni: {predictions}")
# {"predictions": [0, 1]}
本番環境向けの MLflow + FastAPI の統合
堅牢な運用サービスを提供するには、ベスト プラクティスは、モデルを次からロードすることです。 FastAPI アプリケーションの起動時に MLflow モデル レジストリを更新し、更新する 新しい製品バージョンがプロモートされると、自動的に次のようになります。
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import mlflow
import pandas as pd
import numpy as np
import threading
import time
import logging
from typing import List
logger = logging.getLogger(__name__)
app = FastAPI(title="Churn Prediction API", version="2.0.0")
# ==================== Model Manager con Auto-Refresh ====================
class MLflowModelManager:
"""
Gestisce il caricamento e il refresh automatico del modello
dal MLflow Model Registry.
"""
def __init__(self, model_name: str, tracking_uri: str, refresh_interval: int = 300):
self.model_name = model_name
self.tracking_uri = tracking_uri
self.refresh_interval = refresh_interval # secondi
self._model = None
self._model_version = None
self._lock = threading.Lock()
mlflow.set_tracking_uri(tracking_uri)
self._load_model()
self._start_refresh_thread()
def _load_model(self) -> None:
"""Carica la versione Production corrente dal registry."""
try:
model_uri = f"models:/{self.model_name}/Production"
new_model = mlflow.sklearn.load_model(model_uri)
# Ottieni il numero di versione
client = mlflow.MlflowClient()
versions = client.get_latest_versions(self.model_name, stages=["Production"])
version = versions[0].version if versions else "unknown"
with self._lock:
self._model = new_model
self._model_version = version
logger.info(f"Modello {self.model_name} v{version} caricato dal registry")
except Exception as e:
logger.error(f"Errore nel caricamento del modello: {e}")
def _start_refresh_thread(self) -> None:
"""Avvia thread background per refresh periodico del modello."""
def refresh_loop():
while True:
time.sleep(self.refresh_interval)
self._load_model()
thread = threading.Thread(target=refresh_loop, daemon=True)
thread.start()
def predict(self, features: pd.DataFrame) -> np.ndarray:
with self._lock:
if self._model is None:
raise RuntimeError("Modello non disponibile")
return self._model.predict(features)
def predict_proba(self, features: pd.DataFrame) -> np.ndarray:
with self._lock:
if self._model is None:
raise RuntimeError("Modello non disponibile")
return self._model.predict_proba(features)[:, 1]
@property
def model_version(self) -> str:
return self._model_version or "unknown"
# Istanza globale del manager
model_manager = MLflowModelManager(
model_name="churn-gbm-model",
tracking_uri="http://mlflow-server:5000",
refresh_interval=300, # Controlla nuove versioni ogni 5 minuti
)
# ==================== API Endpoints ====================
class PredictionRequest(BaseModel):
features: List[List[float]]
feature_names: List[str]
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[float]
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest) -> PredictionResponse:
"""Endpoint di predizione churn."""
try:
df = pd.DataFrame(
request.features,
columns=request.feature_names
)
predictions = model_manager.predict(df).tolist()
probabilities = model_manager.predict_proba(df).tolist()
return PredictionResponse(
predictions=predictions,
probabilities=probabilities,
model_version=model_manager.model_version,
)
except Exception as e:
logger.error(f"Errore nella predizione: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"model_name": "churn-gbm-model",
"model_version": model_manager.model_version,
}
MLflow と代替案: W&B、Neptune、ClearML
MLflow が実験追跡の唯一のオプションではありません。市場はさまざまな製品を提供しています 有効な代替案であり、それぞれに特有の長所があります。選ぶのは予算次第ですが、 チームの規模、既存のインフラストラクチャ、ガバナンスの要件。
実験追跡ツールの完全な比較
| サイズ | MLフロー | W&B | ネプチューン | クリアML |
|---|---|---|---|---|
| ライセンス | オープンソース (Apache 2.0) | SaaS + セルフホスト型エンタープライズ | SaaS + セルフホスト | オープンソース + エンタープライズ |
| 費用(5人チーム) | 無料(自己ホスト型) | ~250 米ドル/月 | ~100 米ドル/月 | 無料(自己ホスト型) |
| 設定 | 約 30 分で Docker を起動 | ゼロ構成 (SaaS) | ゼロ構成 (SaaS) | 複合体 (ClearML サーバー) |
| UI/UX | 機能的だが美しくない | 優れた、グラフィックスが豊富 | 良い、非常にカスタマイズ可能 | 完全かつ高い学習曲線 |
| 自動ロギング | 優れた (20 以上のフレームワーク) | 優れています (W&B SDK) | 良い | モンキーパッチによる自動化 |
| モデルレジストリ | 統合されたステージング ワークフロー | W&B モデル レジストリ | モデルレジストリが利用可能 | 統合モデルリポジトリ |
| ハイパーパラメータスイープ | Optuna/Hyperopt の統合 | ネイティブ スイープ (優れた) | 良い | 統合された HPO |
| ガバナンス/コンプライアンス | 基本的な監査証跡 | アクセス制御、チーム機能 | チームワークスペース | 高度な (RBAC、監査) |
| GenAI/LLM サポート | MLflow 3: トレース、評価 | プロンプト、LLM モニタリング | LLM追跡 | LLM 実験の追跡 |
| に最適 | 独自のインフラストラクチャ、中小企業、セルフホスト型チーム | UXとコラボレーションを優先するチーム | 中程度の予算のチーム | 自動化のニーズがある企業 |
MLflow を選択する場合
MLflow は、次のシナリオに最適な選択肢です。
- 限られた予算 (<5,000 ユーロ/年): 単一 VM でのセルフホスティングの費用は年間約 180 ユーロです
- データ所在地の要件: 企業インフラストラクチャから離れることができない機密データ
- 既存の Python エコシステムとの統合: MLflow は、scikit-learn、PyTorch、TensorFlow、XGBoost、および 20 以上のフレームワークとネイティブに統合します
- コンプライアンスと監査 (AI 法 EU): データベースとアーティファクトへのフルアクセス、SaaS ロックインなし
- DevOps指向のチーム: MLflow は他のサービスと同様に Docker コンテナです
実験の調査と分析
MLflow の最も価値のある機能の 1 つは、プログラムでクエリを実行できることです。 最適な実行を見つけるためのすべての実験、複数の次元にわたる実行の比較、または抽出 自動レポートのデータ:
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
# ==================== Ricerca di Run con Filtri ====================
# Trova tutti i run con AUC-ROC > 0.92 nel tuo experiment
runs = mlflow.search_runs(
experiment_names=["churn-prediction-gbm"],
filter_string="metrics.auc_roc > 0.92 and tags.environment = 'dev'",
order_by=["metrics.auc_roc DESC"],
max_results=20,
)
# Il risultato e un DataFrame pandas
print(runs[["run_id", "metrics.auc_roc", "metrics.f1_score",
"params.learning_rate", "params.max_depth"]].head())
# ==================== Confronto Run su Multiple Metriche ====================
def compare_top_runs(experiment_name: str, n: int = 5) -> pd.DataFrame:
"""Restituisce un DataFrame con i top N run per AUC-ROC."""
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string="status = 'FINISHED'",
order_by=["metrics.auc_roc DESC"],
max_results=n,
)
# Seleziona le colonne più rilevanti
cols = [
"run_id",
"metrics.accuracy", "metrics.f1_score", "metrics.auc_roc",
"params.n_estimators", "params.learning_rate", "params.max_depth",
"tags.dataset_version", "start_time",
]
# Filtra solo colonne esistenti
existing_cols = [c for c in cols if c in runs.columns]
return runs[existing_cols].copy()
comparison_df = compare_top_runs("churn-prediction-gbm")
print(comparison_df.to_string(index=False))
# ==================== Trovare il Best Run e Caricarlo ====================
def get_best_run(experiment_name: str, metric: str = "metrics.auc_roc") -> dict:
"""Trova il run con la metrica migliore e restituisce run_id e metriche."""
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string="status = 'FINISHED'",
order_by=[f"{metric} DESC"],
max_results=1,
)
if runs.empty:
raise ValueError(f"Nessun run trovato in {experiment_name}")
best = runs.iloc[0]
return {
"run_id": best["run_id"],
"auc_roc": best.get("metrics.auc_roc"),
"accuracy": best.get("metrics.accuracy"),
"f1_score": best.get("metrics.f1_score"),
}
best_run = get_best_run("churn-prediction-gbm")
print(f"Best run: {best_run['run_id']}, AUC-ROC: {best_run['auc_roc']:.4f}")
# Carica il modello dal best run
model = mlflow.sklearn.load_model(f"runs:/{best_run['run_id']}/model")
# ==================== Export Metriche per Report ====================
# Carica la history di una metrica step-by-step (es. validation loss per epoch)
run_id = "abc123def456"
metric_history = client.get_metric_history(run_id, "val_accuracy_step")
steps = [m.step for m in metric_history]
values = [m.value for m in metric_history]
accuracy_over_time = pd.DataFrame({"step": steps, "val_accuracy": values})
print(f"Training steps loggati: {len(accuracy_over_time)}")
本番環境における MLflow のベスト プラクティス
1. 命名規則
一貫した命名規則により、数か月後も実験を検索して理解できるようになります。
# Schema naming raccomandato per experiments e run
# EXPERIMENT: [team]-[progetto]-[tipo]
# Esempi:
"ml-eng-churn-prediction-gbm"
"ml-eng-churn-prediction-neural-net"
"research-recommender-collaborative"
# RUN NAME: [algoritmo]-[key-param]-[data]
# Esempi:
"gbm-lr0.05-depth6-2025-11-15"
"xgb-v2-autofeat-2025-11-20"
"baseline-logistic-regression"
# Usa sempre tag per metadata strutturati
mlflow.set_tags({
"team": "ml-engineering",
"project": "churn-prediction",
"environment": "dev", # dev | staging | prod
"dataset_version": "v2.1",
"git_branch": git_branch,
"git_commit": git_commit[:8],
"triggered_by": "ci-cd", # manual | ci-cd | scheduled
"approved_by": "", # Compilato prima della promozione
})
2. MLflow コードの構造
from contextlib import contextmanager
import mlflow
import functools
# Pattern: decorator per tracking automatico
def mlflow_run(experiment_name: str, run_name: str = None, tags: dict = None):
"""Decorator che wrappa una funzione in un MLflow run."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name or func.__name__) as run:
if tags:
mlflow.set_tags(tags)
result = func(*args, **kwargs)
return result
return wrapper
return decorator
# Uso del decorator
@mlflow_run(
experiment_name="churn-prediction-gbm",
run_name="training-v2",
tags={"team": "ml-eng", "version": "v2"}
)
def train_model(X_train, y_train, params: dict):
mlflow.log_params(params)
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# ... metriche e artefatti
return model
# Pattern: context manager per setup/teardown
@contextmanager
def mlflow_experiment(experiment_name: str, run_name: str):
"""Context manager con gestione errori."""
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name) as run:
try:
mlflow.set_tag("status", "running")
yield run
mlflow.set_tag("status", "success")
except Exception as e:
mlflow.set_tag("status", "failed")
mlflow.set_tag("error", str(e))
raise
# Uso
with mlflow_experiment("churn-prediction", "training-run-v3") as run:
mlflow.log_params({"n_estimators": 300})
# ... training
mlflow.log_metric("accuracy", 0.94)
3. CI/CD 用の GitHub アクションとの統合
# .github/workflows/ml-experiment.yml
name: ML Experiment + Model Promotion
on:
push:
branches: [main]
paths:
- 'src/models/**'
- 'src/features/**'
- 'params.yaml'
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run training
id: training
run: |
# Script di training che logga su MLflow e stampa il run_id
RUN_ID=$(python src/models/train.py --output-run-id)
echo "run_id=$RUN_ID" >> $GITHUB_OUTPUT
- name: Evaluate and check metrics
id: evaluation
run: |
python scripts/ci_evaluate.py \
--run-id ${{ steps.training.outputs.run_id }} \
--min-auc 0.92 \
--min-accuracy 0.90
- name: Promote to Staging
if: success()
run: |
python scripts/promote_model.py \
--run-id ${{ steps.training.outputs.run_id }} \
--model-name churn-gbm-model \
--target-stage Staging
- name: Notify team
if: success()
uses: slackapi/slack-github-action@v1.24.0
with:
payload: |
{
"text": "Nuovo modello promosso a Staging: run ${{ steps.training.outputs.run_id }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
避けるべきアンチパターン
-
同じメトリックに対して異なるキーで mlflow.log_metric() を使用する:
accuracyeaccこれらは MLflow の 2 つの別個のメトリクスです。 正規名の辞書を定義し、常にそれを使用します。 -
アクティブな実行を閉じないでください: アクティブな実行中にコードがクラッシュした場合
コンテキストマネージャーなしで
with mlflow.start_run()、ランは残ります 無限に「実行中」状態になります。常にコンテキスト マネージャーを使用してください。 - 巨大な成果物を CSV として記録する: MLflow はデータレイクではありません。 大規模なデータセットの場合は、メタデータ (DVC パス、ハッシュ、サイズ) のみをログに記録し、次を使用します。 データのバージョン管理のための DVC。
- 本番環境で複数のユーザーが SQLite を使用する: SQLite はサポートしていません 同時執筆。 2 つの並列トレーニング プロセスではロックが発生します エラー。マルチユーザー設定には PostgreSQL または MySQL を使用してください。
- データセットのバージョンをログに記録しない: モデルパラメータ データのバージョン管理を行わないと、再現性が十分ではありません。常にログインする Git コミット、DVC タグ、およびデータセットのサイズ。
- 中間昇格ステージングなしで本番環境に昇格: 中間のステージング ワークフローにより、統合テストと検証テストが可能になります 運用環境に展開する前にチームのメンバーを決定します。このステップを飛ばさないでください。
MLflow 3 の新機能: GenAI とエージェントに向けて
2025 年 6 月の MLflow 3 のリリースにより、プラットフォームは進化の飛躍を遂げました。 GenAI の世界を重視しています。モデルを扱う人にとって最も重要なニュース 従来の ML と LLM を使用した場合:
- 最高級エンティティとしての LoggedModel: モデルはもう存在しません 単なる実行の産物です。 LoggedModel は、実行、環境、および環境全体にわたって保持されます。 メトリクス、パラメータ、トレース、評価データへの完全な系統を備えた展開。
- パフォーマンスが 25% 向上: MLflow 3.x にはクエリが最適化されています データベースへの負荷が軽減され、ロギングのオーバーヘッドが削減され、ロギングのスループットが向上します。 バージョン 2.5 (ベンチマーク 2026) と比較して 25%。
- GenAI トレース: LLM、チェーン、ツール呼び出し、エージェントの自動トレース LangChain、LlamaIndex、OpenAI SDK、Anthropic などをサポートします。
- フィードバック収集API: に関する人間のフィードバックの構造化された収集 モデルの結果をレビューと評価のために UI に統合します。
-
進化した評価フレームワーク:
mlflow.evaluate()今 カスタムメトリクス、LLM-as-judge、および自動モデル比較をサポートします。
結論と次のステップ
MLflow は、エコシステム内で最も普及している実験的な追跡ツールとしての地位を確立しました 活発なコミュニティと絶え間ない進化を備えたオープンソース ML。の組み合わせ 追跡、モデル レジストリ、および単一の自己ホスト型プラットフォームでの提供により、選択が可能になります。 MLOps を使用しないインフラストラクチャを完全に制御したいチームにとっては自然なことです SaaS ソリューションのコスト。
この記事で見た実験の登録からワークフロー モデル レジストリを介して運用環境に昇格するまで、ユース ケースの 90% をカバーします ML チームの現実。データのバージョン管理のために DVC と統合する (前の記事) CI/CD 自動化のための GitHub Actions を使用すると、完全な MLOps システムが得られ、 年間 250 ユーロ未満の予算でプロフェッショナルをサポートします。
次の記事では、機械学習における最も厄介な問題の 1 つに取り組みます。 本番環境: モデルドリフト。の劣化を検出する方法を見ていきます。 経時的なパフォーマンス (データ ドリフト、コンセプト ドリフト、予測ドリフト) とその実装方法 Grafana と Prometheus に関するアラートを備えた自動再トレーニング システム。
リソースと次のステップ
- MLflow の公式ドキュメント: mlflow.org/docs/latest
- MLflow 3 リリースノート: mlflow.org/releases/3
- MLflow GitHub: github.com/mlflow/mlflow
- 前の記事: DVC を使用したデータセットとモデルのバージョン管理
- 次の記事: モデルのドリフト検出と自動再トレーニング
- クロスリンク: 高度なディープラーニング - 高度なトレーニング
- クロスリンク: コンピューター ビジョン - 物体検出パイプライン
MLflow を使用した完全な MLOps スタック (予算 <5,000 ユーロ/年)
| 成分 | 楽器 | 推定年間コスト |
|---|---|---|
| 実験の追跡 | MLflow セルフホスト型 | 無料(オープンソース) |
| バックエンドストア | Docker 上の PostgreSQL | 無料(同じMLflowサーバー) |
| アーティファクトストア | MinIO (S3 互換) または AWS S3 | 無料 / 年間最大 30 ユーロ |
| MLflow + PostgreSQL 用の VM | EC2 t3.small (2 vCPU、2 GB RAM) | ~180ユーロ/年 |
| データセットのバージョニング | DVC + ダグスハブ | 無料 |
| CI/CD パイプライン | GitHub アクション | 無料(2000分/月) |
| 推定合計額 | <220 ユーロ/年 |







