MLflow를 사용한 실험 추적: 전체 가이드
어떤 초매개변수 조합이 해당 결과를 생성했는지 검색하는 데 시간을 낭비한 적이 있습니까? 3주 전부터 아주 좋았지? 아니면 해당 모델이 왜 생산되고 있는지 궁금하신가요? 로컬에서 테스트한 것과 다르게 동작합니까? 이러한 문제는 매우 일반적입니다. 머신러닝 라이프사이클에서 이들의 공통점은 바로 시스템 부족입니다. 에 의해 구조화됨 실험적 추적.
MLflow 이 문제에 대한 가장 인기 있는 오픈 소스 답변입니다. 출생지 2018년의 Databricks는 2019년에 Apache 프로젝트가 되었고, MLflow는 Python 생태계에서 ML 실험을 추적하기 위한 사실상의 표준입니다. 와 함께 방출 2025년 6월의 MLflow 3 Databricks Data + AI Summit 기간 동안, 플랫폼은 추적 도구에서 ML 및 GenAI 모델의 개발, 평가, 배포를 위한 통합 플랫폼 LoggedModel을 일류 엔터티로 사용하고 로깅 성능이 25% 향상되었습니다. 2.5 버전과 비교.
이 가이드에서는 설치부터 고급 추적까지 MLflow의 엔드투엔드를 살펴보겠습니다. 자동 로깅부터 모델 레지스트리, 모델 제공 및 Docker 통합까지. 각 예제는 테스트를 거쳐 프로덕션에 사용할 준비가 되었습니다.
이 기사에서 배울 내용
- MLflow 아키텍처: 추적 서버, 백엔드 저장소, 아티팩트 저장소 및 MLflow 3의 새로운 기능
- 로컬 설정 및 제작: SQLite, PostgreSQL, S3를 아티팩트 저장소로 사용
- 완전한 실험 추적: 로그 매개변수, 지표, 아티팩트, 태그 및 중첩 실행
- 자동 로깅: scikit-learn, XGBoost, PyTorch, TensorFlow와의 제로 구성 통합
- 모델 레지스트리: 준비, 생산, 보관 및 모델 수명주기 관리
- MLflow를 사용한 모델 제공: REST API, Docker 컨테이너, FastAPI 통합
- 프로덕션 배포를 위해 Docker Compose를 사용하는 MLflow
- 대안과의 비교: W&B, Neptune, ClearML - 언제 무엇을 선택해야 하는지
- 모든 규모의 ML 팀을 위한 모범 사례 및 안티 패턴
MLOps 시리즈 및 프로덕션에서의 기계 학습
| # | Articolo | 집중하다 |
|---|---|---|
| 1 | MLOps: 실험에서 프로덕션까지 | 기초 및 전체 수명주기 |
| 2 | CI/CD가 포함된 ML 파이프라인 | ML용 GitHub Actions 및 Docker |
| 3 | DVC와 LakeFS 버전 관리 | 데이터 세트 및 모델 버전 관리 |
| 4 | 현재 위치 - MLflow를 사용한 실험 추적 | 추적, 등록, 게재 |
| 5 | 모델 드리프트 감지 | 자동 모니터링 및 재교육 |
| 6 | FastAPI + Uvicorn으로 제공 | 프로덕션에 모델 배포 |
| 7 | Kubernetes에서 ML 확장 | KubeFlow 및 Seldon Core |
| 8 | ML 모델의 A/B 테스트 | 방법론 및 구현 |
| 9 | ML 거버넌스 | 규정 준수, AI법 EU, 윤리 |
| 10 | 사례 연구: 이탈 예측 | 프로덕션의 엔드투엔드 파이프라인 |
MLflow 아키텍처: 4가지 기본 구성 요소
코드 줄을 작성하기 전에 MLflow의 구성 요소를 이해하는 것이 중요합니다. 플랫폼은 각각 특정 역할을 가진 네 가지 주요 구성 요소로 구성됩니다. 기계 학습 수명 주기에서:
- MLflow 추적: 실험 로깅 및 쿼리를 위한 API 및 UI입니다. 각 훈련 실행에 대한 매개변수, 지표, 태그, 아티팩트 및 메모를 기록합니다.
- MLflow 프로젝트: ML 코드를 재현 가능한 실행으로 패키징하기 위한 형식 Conda 또는 Docker를 통한 종속성 및 환경 관리가 가능합니다.
- MLflow 모델: 템플릿을 저장하기 위한 표준 형식입니다. 여러 프레임워크(Python 함수, REST API, Spark UDF 등)에서 제공됩니다.
- MLflow 모델 레지스트리: 라이프사이클을 관리하는 중앙 집중식 저장소 모델: 버전 관리, 준비, 생산, 보관 및 감사 추적.
MLflow 3(2025)에는 다섯 번째 기본 요소가 추가되었습니다. 로그된 모델 일류 실체로서. 이전 접근 방식 대신 실행 중심(모델이 실행의 결과물인 경우), LoggedModels 지속 완전한 매개변수 계보를 통해 여러 실행, 환경 및 배포에 걸쳐 측정항목, 추적 및 평가 데이터.
MLflow 스토리지 아키텍처
| 요소 | 포함된 내용 | 백엔드 권장 |
|---|---|---|
| 백엔드 저장소 | 매개변수, 지표, 태그, 메타데이터 실행 | PostgreSQL/MySQL(프로덕션), SQLite(로컬) |
| 유물 상점 | 모델 파일, 이미지, CSV, 평가 데이터 | S3, GCS, Azure Blob(프로덕션), 로컬 파일 시스템 |
| 추적 서버 | 로깅 및 웹 UI용 REST API | Docker/EC2/Kubernetes 포드 컨테이너 |
| 모델 레지스트리 | 템플릿 릴리스, 단계, 주석, 웹후크 | 데이터베이스 필요(파일 시스템에서는 작동하지 않음) |
MLflow 설정: 로컬에서 프로덕션으로
로컬 설치 및 설정
MLflow의 기본 설치에는 단일 pip 명령이 필요합니다. 지역발전을 위해, MLflow는 SQLite를 백엔드 저장소로 사용하고 로컬 파일 시스템을 아티팩트 저장소로 사용합니다. 추가 서버가 필요하지 않습니다.
# Installazione MLflow (versione 2.x/3.x)
pip install mlflow
# Con extras per integrazioni specifiche
pip install mlflow[extras] # scikit-learn, XGBoost, LightGBM
pip install mlflow[databricks] # integrazione Databricks
pip install mlflow[genai] # tools per GenAI e LLM (MLflow 3+)
# Verifica installazione
mlflow --version
# mlflow, version 2.x.x o 3.x.x
# Avvia la UI locale (usa ./mlruns come storage)
mlflow ui
# UI disponibile su http://localhost:5000
# Avvia con database SQLite
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlruns \
--host 0.0.0.0 \
--port 5000
프로덕션 설정: PostgreSQL + S3
여러 사용자가 있고 동시성이 높은 프로덕션 환경에서는 다음을 사용하는 것이 필수적입니다. 백엔드로 관계형 데이터베이스를, 아티팩트 저장소로 객체 스토리지를 사용합니다. 모델 MLflow Registry에는 데이터베이스가 필요합니다(파일 시스템에서는 작동하지 않음).
# ==================== docker-compose.yml ====================
version: '3.8'
services:
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mlflow"]
interval: 10s
timeout: 5s
retries: 5
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: ${MINIO_ACCESS_KEY}
MINIO_ROOT_PASSWORD: ${MINIO_SECRET_KEY}
volumes:
- minio_data:/data
ports:
- "9000:9000"
- "9001:9001"
mlflow:
image: ghcr.io/mlflow/mlflow:v2.19.0
depends_on:
postgres:
condition: service_healthy
environment:
MLFLOW_BACKEND_STORE_URI: postgresql://mlflow:${POSTGRES_PASSWORD}@postgres:5432/mlflow
MLFLOW_ARTIFACT_ROOT: s3://mlflow-artifacts/
AWS_ACCESS_KEY_ID: ${MINIO_ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${MINIO_SECRET_KEY}
MLFLOW_S3_ENDPOINT_URL: http://minio:9000
command: >
mlflow server
--backend-store-uri postgresql://mlflow:${POSTGRES_PASSWORD}@postgres:5432/mlflow
--default-artifact-root s3://mlflow-artifacts/
--host 0.0.0.0
--port 5000
ports:
- "5000:5000"
volumes:
postgres_data:
minio_data:
# .env file (NON commitare in Git!)
POSTGRES_PASSWORD=sicura_password_123
MINIO_ACCESS_KEY=minio_admin
MINIO_SECRET_KEY=minio_password_sicura
# Avvio dello stack completo
docker-compose up -d
# Verifica che MLflow sia in ascolto
curl http://localhost:5000/health
# {"status": "OK"}
# Crea il bucket MinIO per gli artefatti
docker exec -it minio_container mc alias set local http://localhost:9000 admin password
docker exec -it minio_container mc mb local/mlflow-artifacts
중소기업을 위한 예산 <5K EUR/년
예산이 제한된 팀의 경우 단일 VM에서 Docker Compose를 사용하여 설정합니다. 비용은 연간 약 180유로(EC2 t3.small 또는 이에 상응하는 금액)입니다. MinIO는 S3를 로컬로 대체합니다. 완벽하게 호환되는 API를 사용합니다. 영구 스토리지의 경우 AWS S3를 사용할 수도 있습니다. (몇 GB의 아티팩트에 대해 월 2-5 EUR 정도). 플랫폼 대비 비용 절감 W&B 팀과 같은 SaaS(50+ USD/사용자/월) 및 중요: 5명으로 구성된 팀 연간 2500 EUR 이상을 절약하세요.
실험 추적: 로그 매개변수, 지표 및 아티팩트
MLflow 및 추적 API의 핵심입니다. 모든 통화는 mlflow.start_run()
새로운 것을 만들어라 달리다 안에 실험. 에이
실험 그룹 관련 실행(예: 이탈 예측 모델에 대한 모든 실행)
다음 네 가지 유형의 데이터 로그를 실행합니다.
- 매개변수: 실행에 대한 고정값(하이퍼파라미터, 구성)
- 측정항목: 시간에 따라 변할 수 있는 수치(손실, 에포크당 정확도)
- 유물: 임의 파일(템플릿, 이미지, 데이터세트, HTML 보고서)
- 태그: 주석 달기 및 필터링 실행을 위한 키-값 메타데이터
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import (
accuracy_score, f1_score, roc_auc_score,
confusion_matrix, classification_report, RocCurveDisplay
)
from sklearn.model_selection import train_test_split
import os
# ==================== Configurazione MLflow ====================
# Connessione al tracking server (locale o remoto)
mlflow.set_tracking_uri("http://localhost:5000")
# Crea o usa un experiment esistente
experiment_name = "churn-prediction-gbm"
mlflow.set_experiment(experiment_name)
# Ottieni informazioni sull'experiment
experiment = mlflow.get_experiment_by_name(experiment_name)
print(f"Experiment ID: {experiment.experiment_id}")
# ==================== Training con Tracking Completo ====================
def train_churn_model(X_train, X_val, y_train, y_val, params: dict) -> str:
"""
Allena un GBM per churn prediction con tracking MLflow completo.
Restituisce il run_id del run MLflow.
"""
with mlflow.start_run(run_name=f"gbm-lr{params['learning_rate']}-depth{params['max_depth']}") as run:
# ---- 1. TAG: metadata del run ----
mlflow.set_tags({
"team": "ml-engineering",
"project": "churn-prediction",
"dataset_version": "v2.1",
"git_commit": os.popen("git rev-parse HEAD").read().strip(),
"environment": "dev",
})
# ---- 2. PARAMS: iperparametri e configurazione ----
mlflow.log_params(params)
mlflow.log_params({
"train_size": len(X_train),
"val_size": len(X_val),
"n_features": X_train.shape[1],
"target_positive_rate": float(y_train.mean()),
})
# ---- 3. TRAINING ----
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# ---- 4. METRICS: step-by-step durante training ----
# Logga la loss per ogni stage del GBM (equivalente all'epoch loss)
train_scores = list(model.staged_predict(X_train))
val_scores = list(model.staged_predict(X_val))
for step, (tr_pred, val_pred) in enumerate(zip(train_scores, val_scores)):
train_acc = accuracy_score(y_train, tr_pred)
val_acc = accuracy_score(y_val, val_pred)
mlflow.log_metrics({
"train_accuracy_step": train_acc,
"val_accuracy_step": val_acc,
}, step=step)
# ---- 5. METRICS FINALI ----
y_pred = model.predict(X_val)
y_prob = model.predict_proba(X_val)[:, 1]
final_metrics = {
"accuracy": accuracy_score(y_val, y_pred),
"f1_score": f1_score(y_val, y_pred),
"auc_roc": roc_auc_score(y_val, y_prob),
"precision": float(np.mean(y_pred[y_pred == 1] == y_val[y_pred == 1])) if sum(y_pred) > 0 else 0.0,
"recall": float(sum((y_pred == 1) & (y_val == 1)) / sum(y_val == 1)),
}
mlflow.log_metrics(final_metrics)
# ---- 6. ARTIFACTS: file del modello e report ----
# Salva e logga confusion matrix come immagine
fig, ax = plt.subplots(figsize=(6, 5))
cm = confusion_matrix(y_val, y_pred)
im = ax.imshow(cm, interpolation='nearest', cmap='Blues')
ax.set_title('Confusion Matrix - Churn Prediction')
ax.set_xlabel('Predicted')
ax.set_ylabel('True')
plt.colorbar(im)
plt.tight_layout()
mlflow.log_figure(fig, "confusion_matrix.png")
plt.close()
# Salva ROC curve
fig2, ax2 = plt.subplots(figsize=(6, 5))
RocCurveDisplay.from_predictions(y_val, y_prob, ax=ax2)
ax2.set_title('ROC Curve - Churn Model')
mlflow.log_figure(fig2, "roc_curve.png")
plt.close()
# Logga classification report come file di testo
report = classification_report(y_val, y_pred, target_names=["No Churn", "Churn"])
mlflow.log_text(report, "classification_report.txt")
# Logga feature importance come CSV
feature_imp = pd.DataFrame({
"feature": X_train.columns.tolist(),
"importance": model.feature_importances_
}).sort_values("importance", ascending=False)
mlflow.log_table(feature_imp.to_dict(orient="list"), "feature_importance.json")
# ---- 7. LOG MODEL: salva il modello con firma e input example ----
input_example = X_val.head(3)
signature = mlflow.models.infer_signature(X_val, y_pred)
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="churn-gbm-model", # Registra automaticamente nel Registry
)
print(f"Run ID: {run.info.run_id}")
print(f"Accuracy: {final_metrics['accuracy']:.4f}")
print(f"AUC-ROC: {final_metrics['auc_roc']:.4f}")
print(f"UI: http://localhost:5000/#/experiments/{experiment.experiment_id}/runs/{run.info.run_id}")
return run.info.run_id
# ==================== Esempio di utilizzo ====================
if __name__ == "__main__":
# Dati sintetici per demo
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000, n_features=20, n_informative=10, random_state=42)
X_df = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(20)])
X_train, X_val, y_train, y_val = train_test_split(X_df, y, test_size=0.2, random_state=42)
params = {
"n_estimators": 300,
"learning_rate": 0.05,
"max_depth": 4,
"subsample": 0.8,
"min_samples_split": 20,
"random_state": 42,
}
run_id = train_churn_model(X_train, X_val, y_train, y_val, params)
중첩 실행 및 초매개변수 검색
MLflow는 i를 지원합니다 중첩 실행: 하위 실행은 상위 실행 내에서 실행됩니다. 이 패턴은 원하는 곳 어디에서나 하이퍼파라미터 검색(Optuna, GridSearchCV)에 이상적입니다. 전체 검색을 나타내는 상위 실행과 여러 하위 실행(각각 하나씩) 테스트된 구성:
import mlflow
import optuna
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import cross_val_score
def hyperparameter_search(X_train, y_train, n_trials: int = 50) -> str:
"""
Hyperparameter search con Optuna + MLflow nested runs.
Run padre: contiene il summary della ricerca
Run figli: ogni trial Optuna e un run MLflow figlio
"""
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("churn-hparam-search")
with mlflow.start_run(run_name="optuna-search-v1") as parent_run:
mlflow.set_tag("search_method", "optuna-tpe")
mlflow.log_param("n_trials", n_trials)
mlflow.log_param("optimization_metric", "auc_roc")
best_auc = 0.0
best_params = {}
def objective(trial) -> float:
"""Funzione obiettivo Optuna - ogni trial e un nested run MLflow."""
params = {
"n_estimators": trial.suggest_int("n_estimators", 100, 1000),
"learning_rate": trial.suggest_float("learning_rate", 0.001, 0.3, log=True),
"max_depth": trial.suggest_int("max_depth", 2, 8),
"subsample": trial.suggest_float("subsample", 0.5, 1.0),
"min_samples_split": trial.suggest_int("min_samples_split", 5, 50),
}
# Ogni trial ottiene il suo run MLflow figlio
with mlflow.start_run(
run_name=f"trial-{trial.number}",
nested=True # <-- indica che e un run figlio
) as child_run:
mlflow.log_params(params)
model = GradientBoostingClassifier(**params, random_state=42)
scores = cross_val_score(model, X_train, y_train, cv=3, scoring="roc_auc")
auc_mean = scores.mean()
auc_std = scores.std()
mlflow.log_metrics({
"cv_auc_mean": auc_mean,
"cv_auc_std": auc_std,
"trial_number": trial.number,
})
return auc_mean
# Esegui la ricerca Optuna
study = optuna.create_study(
direction="maximize",
sampler=optuna.samplers.TPESampler(seed=42)
)
study.optimize(objective, n_trials=n_trials, n_jobs=1)
# Logga i risultati della ricerca nel run padre
best_trial = study.best_trial
mlflow.log_params({f"best_{k}": v for k, v in best_trial.params.items()})
mlflow.log_metrics({
"best_auc": best_trial.value,
"n_trials_completed": len(study.trials),
})
print(f"Miglior AUC: {best_trial.value:.4f}")
print(f"Migliori parametri: {best_trial.params}")
return parent_run.info.run_id
자동 로깅: 제로 구성 추적
자동 로깅은 MLflow의 가장 편리한 기능 중 하나입니다. 단 한 줄의 코드로 MLflow는 기본 ML 프레임워크에 대한 호출을 자동으로 가로채고 매개변수를 기록합니다. 학습 코드를 변경하지 않고도 측정항목 및 아티팩트를 사용할 수 있습니다. 그리고 scikit-learn이 지원합니다. XGBoost, LightGBM, PyTorch Lightning, TensorFlow/Keras, Spark MLlib 등.
import mlflow
import mlflow.sklearn
import mlflow.xgboost
# ==================== Autologging scikit-learn ====================
# Abilita autologging per scikit-learn
# Logga automaticamente: hyperparametri, metriche di training, signature del modello
mlflow.sklearn.autolog(
log_input_examples=True, # Logga esempi di input
log_model_signatures=True, # Inferisce e logga la firma del modello
log_models=True, # Salva il modello come artefatto
log_datasets=False, # Non loggare l'intero dataset (troppo grande)
max_tuning_runs=100, # Per GridSearchCV: max run tracciati
exclusive=False, # Permette log manuali in aggiunta
)
mlflow.set_experiment("autolog-demo")
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
# Con autolog attivo, fit() logga tutto automaticamente
with mlflow.start_run(run_name="rf-gridsearch-autolog"):
param_grid = {
"n_estimators": [100, 300],
"max_depth": [4, 6, 8],
"min_samples_split": [10, 20],
}
model = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=3,
scoring="roc_auc",
n_jobs=-1
)
model.fit(X_train, y_train)
# MLflow ha loggato automaticamente:
# - Tutti i parametri del RandomForest
# - cv=3, scoring, n_jobs
# - Best params dal GridSearchCV
# - Score di validazione incrociata
# - Il modello come artefatto
# ==================== Autologging XGBoost ====================
mlflow.xgboost.autolog(
importance_types=["gain", "weight"], # Logga feature importance
log_model_signatures=True,
log_input_examples=True,
)
import xgboost as xgb
import numpy as np
with mlflow.start_run(run_name="xgb-autolog"):
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
params = {
"objective": "binary:logistic",
"eval_metric": ["logloss", "auc"],
"learning_rate": 0.05,
"max_depth": 6,
"n_estimators": 500,
"subsample": 0.8,
"seed": 42,
}
# Autolog traccia: tutte le metriche per ogni boosting round
# e il modello finale, la feature importance
booster = xgb.train(
params,
dtrain,
num_boost_round=500,
evals=[(dtrain, "train"), (dval, "val")],
early_stopping_rounds=50,
verbose_eval=False,
)
# ==================== Autologging PyTorch Lightning ====================
mlflow.pytorch.autolog(
every_n_iter=10, # Logga ogni 10 iterazioni
log_models=True,
checkpoint_monitor="val_loss",
)
# Con PyTorch Lightning, chiama semplicemente trainer.fit()
# e MLflow cattura automaticamente tutte le loss e le metriche
자동 로깅의 한계
자동 로깅은 신속한 프로토타이핑에 편리하지만 생산에는 몇 가지 제한 사항이 있습니다.
사용자 정의 측정항목, 데이터 세트 정보를 기록하지 않습니다.
(크기, 버전, 클래스 분포) 간의 종속성을 처리하지 않습니다.
실험. 성숙한 MLOps 파이프라인의 경우 자동 로깅을 기본으로 사용하는 것이 좋습니다.
수동 통화를 추가하세요. mlflow.log_param(), mlflow.log_metric()
e mlflow.log_artifact() 도메인별 정보를 확인하세요.
MLflow 모델 레지스트리: 모델 수명 주기
MLflow 모델 레지스트리는 간단한 도구에서 MLflow를 변환하는 구성 요소입니다. 진정한 MLOps 플랫폼으로 추적합니다. 다음을 통해 모델 버전을 관리할 수 있습니다. 표준화된 수명주기: 개발 a 각색 a 생산, 감사 추적, 주석 및 알림이 포함됩니다.
레지스트리는 UI(단계를 변경하려면 드래그 앤 드롭)와 다음을 통해 액세스할 수 있습니다. CI/CD 자동화에 필수적인 Python API. MLflow 3을 사용하면 레지스트리가 상태입니다. 단계를 전환할 때 자동 알림을 위한 웹훅이 풍부해졌습니다.
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities.model_registry import ModelVersion
import time
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
MODEL_NAME = "churn-gbm-model"
# ==================== 1. REGISTRARE UN MODELLO ====================
# Metodo A: durante il log_model (più comune)
with mlflow.start_run() as run:
# ... training ...
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name=MODEL_NAME,
)
# Il modello viene automaticamente registrato come versione 1
# con stage "None" (development)
# Metodo B: da un run esistente tramite URI
run_id = "abc123def456"
model_uri = f"runs:/{run_id}/model"
version = mlflow.register_model(
model_uri=model_uri,
name=MODEL_NAME,
tags={"team": "ml-eng", "algorithm": "gbm"}
)
print(f"Registrato: {MODEL_NAME} versione {version.version}")
# ==================== 2. GESTIRE LE VERSIONI E GLI STAGE ====================
# Aggiungi una descrizione alla versione
client.update_model_version(
name=MODEL_NAME,
version=version.version,
description=(
"GBM per churn prediction v2.1. "
"Accuracy: 0.9423, AUC-ROC: 0.9567. "
"Trainato su dataset 2024-01 to 2025-01, 45k samples."
)
)
# Promuovi a Staging (dopo validazione interna)
client.transition_model_version_stage(
name=MODEL_NAME,
version=version.version,
stage="Staging",
archive_existing_versions=False, # Mantieni altre versioni staging
)
print(f"Modello v{version.version} promosso a Staging")
# Aggiungi tag per tracciabilita
client.set_model_version_tag(
name=MODEL_NAME,
version=version.version,
key="validated_by",
value="alice.rossi@company.com"
)
client.set_model_version_tag(
name=MODEL_NAME,
version=version.version,
key="validation_date",
value="2025-11-15"
)
# Promuovi a Production (dopo approvazione)
client.transition_model_version_stage(
name=MODEL_NAME,
version=version.version,
stage="Production",
archive_existing_versions=True, # Archivia la versione Production precedente
)
print(f"Modello v{version.version} in produzione!")
# ==================== 3. CARICARE IL MODELLO IN PRODUZIONE ====================
def load_production_model(model_name: str):
"""Carica sempre la versione Production dal registry."""
model_uri = f"models:/{model_name}/Production"
model = mlflow.sklearn.load_model(model_uri)
return model
# In uno script di inference o serving
model = load_production_model(MODEL_NAME)
predictions = model.predict(new_data)
# ==================== 4. INTERROGARE IL REGISTRY ====================
# Lista tutte le versioni di un modello
versions = client.search_model_versions(f"name='{MODEL_NAME}'")
for v in versions:
print(f"v{v.version} | Stage: {v.current_stage} | Run: {v.run_id[:8]}...")
# Cerca solo versioni in Production
prod_versions = client.get_latest_versions(MODEL_NAME, stages=["Production"])
if prod_versions:
latest_prod = prod_versions[0]
print(f"Versione in produzione: v{latest_prod.version}")
print(f"Run ID: {latest_prod.run_id}")
# Ottieni tutte le metriche associate alla versione in produzione
run_data = client.get_run(latest_prod.run_id).data
print(f"AUC-ROC in produzione: {run_data.metrics.get('auc_roc', 'N/A')}")
# ==================== 5. ROLLBACK IN CASO DI PROBLEMA ====================
def rollback_to_previous_production(model_name: str) -> None:
"""
Rollback: archivia la versione Production attuale e
ripristina la versione Archived più recente.
"""
# Trova versione corrente in Production
current_prod = client.get_latest_versions(model_name, stages=["Production"])
if not current_prod:
print("Nessuna versione in produzione trovata")
return
# Trova la versione Archived più recente (precedente Production)
archived = client.search_model_versions(
f"name='{model_name}'",
filter_string="tags.stage_history LIKE '%production%'",
)
if len(archived) < 2:
print("Nessuna versione archiviata disponibile per rollback")
return
# Archivia la versione problematica
client.transition_model_version_stage(
name=model_name,
version=current_prod[0].version,
stage="Archived",
)
# Promuovi la versione precedente
prev_version = archived[1].version
client.transition_model_version_stage(
name=model_name,
version=prev_version,
stage="Production",
)
print(f"Rollback completato: ora in produzione v{prev_version}")
MLflow를 사용한 모델 제공
MLflow에는 등록된 모델을 노출하는 기본 제공 서버가 포함되어 있습니다. 단일 명령을 사용하는 REST API로. 프로토타입 제작을 위한 탁월한 솔루션 및 개발 환경. 대규모 생산을 위해서는 모델 통합을 권장합니다. FastAPI를 사용한 MLflow(시리즈의 다음 문서 참조)
# ==================== Serving via CLI ====================
# Servi l'ultima versione Production del modello
mlflow models serve \
--model-uri "models:/churn-gbm-model/Production" \
--host 0.0.0.0 \
--port 8080 \
--env-manager conda
# Servi un run specifico
mlflow models serve \
--model-uri "runs:/abc123def456/model" \
--port 8080
# Con Docker (raccomandato per produzione)
mlflow models build-docker \
--model-uri "models:/churn-gbm-model/Production" \
--name "churn-model-server" \
--enable-mlserver # Usa MLServer per performance migliori
docker run -p 8080:8080 churn-model-server
# ==================== Test del Serving ====================
# Il server espone l'endpoint /invocations
import requests
import json
import pandas as pd
# Prepara i dati di input nel formato atteso da MLflow
test_data = pd.DataFrame({
"feature_0": [0.5, -1.2],
"feature_1": [1.3, 0.8],
# ... altri 18 features
})
# MLflow accetta JSON in formato "split" o "records"
payload = {
"dataframe_split": {
"columns": test_data.columns.tolist(),
"data": test_data.values.tolist()
}
}
response = requests.post(
"http://localhost:8080/invocations",
headers={"Content-Type": "application/json"},
data=json.dumps(payload)
)
predictions = response.json()
print(f"Predizioni: {predictions}")
# {"predictions": [0, 1]}
프로덕션을 위한 MLflow + FastAPI 통합
강력한 프로덕션 제공을 위한 모범 사례는 다음에서 모델을 로드하는 것입니다. FastAPI 애플리케이션을 시작할 때 MLflow 모델 레지스트리 및 업데이트 새 프로덕션 버전이 승격되면 자동으로:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import mlflow
import pandas as pd
import numpy as np
import threading
import time
import logging
from typing import List
logger = logging.getLogger(__name__)
app = FastAPI(title="Churn Prediction API", version="2.0.0")
# ==================== Model Manager con Auto-Refresh ====================
class MLflowModelManager:
"""
Gestisce il caricamento e il refresh automatico del modello
dal MLflow Model Registry.
"""
def __init__(self, model_name: str, tracking_uri: str, refresh_interval: int = 300):
self.model_name = model_name
self.tracking_uri = tracking_uri
self.refresh_interval = refresh_interval # secondi
self._model = None
self._model_version = None
self._lock = threading.Lock()
mlflow.set_tracking_uri(tracking_uri)
self._load_model()
self._start_refresh_thread()
def _load_model(self) -> None:
"""Carica la versione Production corrente dal registry."""
try:
model_uri = f"models:/{self.model_name}/Production"
new_model = mlflow.sklearn.load_model(model_uri)
# Ottieni il numero di versione
client = mlflow.MlflowClient()
versions = client.get_latest_versions(self.model_name, stages=["Production"])
version = versions[0].version if versions else "unknown"
with self._lock:
self._model = new_model
self._model_version = version
logger.info(f"Modello {self.model_name} v{version} caricato dal registry")
except Exception as e:
logger.error(f"Errore nel caricamento del modello: {e}")
def _start_refresh_thread(self) -> None:
"""Avvia thread background per refresh periodico del modello."""
def refresh_loop():
while True:
time.sleep(self.refresh_interval)
self._load_model()
thread = threading.Thread(target=refresh_loop, daemon=True)
thread.start()
def predict(self, features: pd.DataFrame) -> np.ndarray:
with self._lock:
if self._model is None:
raise RuntimeError("Modello non disponibile")
return self._model.predict(features)
def predict_proba(self, features: pd.DataFrame) -> np.ndarray:
with self._lock:
if self._model is None:
raise RuntimeError("Modello non disponibile")
return self._model.predict_proba(features)[:, 1]
@property
def model_version(self) -> str:
return self._model_version or "unknown"
# Istanza globale del manager
model_manager = MLflowModelManager(
model_name="churn-gbm-model",
tracking_uri="http://mlflow-server:5000",
refresh_interval=300, # Controlla nuove versioni ogni 5 minuti
)
# ==================== API Endpoints ====================
class PredictionRequest(BaseModel):
features: List[List[float]]
feature_names: List[str]
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[float]
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest) -> PredictionResponse:
"""Endpoint di predizione churn."""
try:
df = pd.DataFrame(
request.features,
columns=request.feature_names
)
predictions = model_manager.predict(df).tolist()
probabilities = model_manager.predict_proba(df).tolist()
return PredictionResponse(
predictions=predictions,
probabilities=probabilities,
model_version=model_manager.model_version,
)
except Exception as e:
logger.error(f"Errore nella predizione: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"model_name": "churn-gbm-model",
"model_version": model_manager.model_version,
}
MLflow와 대안: W&B, Neptune, ClearML
MLflow는 실험 추적을 위한 유일한 옵션이 아닙니다. 시장은 다양한 것을 제공합니다 각각 특정한 강점을 지닌 유효한 대안. 선택은 예산에 따라 달라지며, 팀 규모, 기존 인프라 및 거버넌스 요구 사항.
실험 추적 도구의 완전한 비교
| 크기 | MLflow | W&B | 해왕성 | 클리어ML |
|---|---|---|---|---|
| 특허 | 오픈 소스(아파치 2.0) | SaaS + 자체 호스팅 엔터프라이즈 | SaaS + 셀프 호스트 | 오픈소스 + 엔터프라이즈 |
| 비용(5명으로 구성된 팀) | 무료(자체 호스팅) | ~250 USD/월 | ~100달러/월 | 무료(자체 호스팅) |
| 설정 | 약 30분 만에 Docker | 제로 구성(SaaS) | 제로 구성(SaaS) | 복잡함(ClearML 서버) |
| UI/UX | 기능적이지만 아름답지는 않습니다. | 훌륭하고 그래픽이 풍부합니다. | 좋아요, 사용자 정의가 매우 가능합니다 | 완전하고 높은 학습 곡선 |
| 자동 로깅 | 우수함(20개 이상의 프레임워크) | 우수(W&B SDK) | 좋은 | 원숭이 패치를 통한 자동 |
| 모델 레지스트리 | 통합된 스테이징 워크플로우 | W&B 모델 레지스트리 | 모델 레지스트리 사용 가능 | 통합 모델 저장소 |
| 하이퍼파라미터 스윕 | Optuna/Hyperopt 통합 | 네이티브 스윕(우수) | 좋은 | 통합 HPO |
| 거버넌스/규정 준수 | 기본 감사 추적 | 접근 제어, 팀 기능 | 팀워크스페이스 | 고급(RBAC, 감사) |
| GenAI/LLM 지원 | MLflow 3: 추적, 평가 | 프롬프트, LLM 모니터링 | LLM 추적 | LLM 실험 추적 |
| 다음에 이상적입니다. | 자체 인프라를 갖춘 팀, SME, 자체 호스팅 | UX와 협업을 최우선으로 생각하는 팀 | 중간예산팀 | 자동화가 필요한 기업 |
MLflow를 선택해야 하는 경우
MLflow는 다음 시나리오에서 최적의 선택입니다.
- 제한된 예산(<5K EUR/년): 단일 VM의 자체 호스팅 비용은 연간 약 180 EUR입니다.
- 데이터 상주 요구 사항: 회사 인프라 외부로 유출될 수 없는 민감한 데이터
- 기존 Python 생태계와의 통합: MLflow는 scikit-learn, PyTorch, TensorFlow, XGBoost 및 20개 이상의 프레임워크와 기본적으로 통합됩니다.
- 규정 준수 및 감사(AI Act EU): 데이터베이스 및 아티팩트에 대한 전체 액세스, SaaS 종속 없음
- DevOps 중심 팀: MLflow는 다른 서비스와 마찬가지로 Docker 컨테이너입니다.
실험에 대한 질문 및 분석
MLflow의 가장 중요한 기능 중 하나는 프로그래밍 방식으로 쿼리하는 기능입니다. 모든 실험을 통해 최상의 실행을 찾고, 여러 차원에 걸쳐 실행을 비교하거나, 추출을 수행합니다. 자동 보고서용 데이터:
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
# ==================== Ricerca di Run con Filtri ====================
# Trova tutti i run con AUC-ROC > 0.92 nel tuo experiment
runs = mlflow.search_runs(
experiment_names=["churn-prediction-gbm"],
filter_string="metrics.auc_roc > 0.92 and tags.environment = 'dev'",
order_by=["metrics.auc_roc DESC"],
max_results=20,
)
# Il risultato e un DataFrame pandas
print(runs[["run_id", "metrics.auc_roc", "metrics.f1_score",
"params.learning_rate", "params.max_depth"]].head())
# ==================== Confronto Run su Multiple Metriche ====================
def compare_top_runs(experiment_name: str, n: int = 5) -> pd.DataFrame:
"""Restituisce un DataFrame con i top N run per AUC-ROC."""
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string="status = 'FINISHED'",
order_by=["metrics.auc_roc DESC"],
max_results=n,
)
# Seleziona le colonne più rilevanti
cols = [
"run_id",
"metrics.accuracy", "metrics.f1_score", "metrics.auc_roc",
"params.n_estimators", "params.learning_rate", "params.max_depth",
"tags.dataset_version", "start_time",
]
# Filtra solo colonne esistenti
existing_cols = [c for c in cols if c in runs.columns]
return runs[existing_cols].copy()
comparison_df = compare_top_runs("churn-prediction-gbm")
print(comparison_df.to_string(index=False))
# ==================== Trovare il Best Run e Caricarlo ====================
def get_best_run(experiment_name: str, metric: str = "metrics.auc_roc") -> dict:
"""Trova il run con la metrica migliore e restituisce run_id e metriche."""
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string="status = 'FINISHED'",
order_by=[f"{metric} DESC"],
max_results=1,
)
if runs.empty:
raise ValueError(f"Nessun run trovato in {experiment_name}")
best = runs.iloc[0]
return {
"run_id": best["run_id"],
"auc_roc": best.get("metrics.auc_roc"),
"accuracy": best.get("metrics.accuracy"),
"f1_score": best.get("metrics.f1_score"),
}
best_run = get_best_run("churn-prediction-gbm")
print(f"Best run: {best_run['run_id']}, AUC-ROC: {best_run['auc_roc']:.4f}")
# Carica il modello dal best run
model = mlflow.sklearn.load_model(f"runs:/{best_run['run_id']}/model")
# ==================== Export Metriche per Report ====================
# Carica la history di una metrica step-by-step (es. validation loss per epoch)
run_id = "abc123def456"
metric_history = client.get_metric_history(run_id, "val_accuracy_step")
steps = [m.step for m in metric_history]
values = [m.value for m in metric_history]
accuracy_over_time = pd.DataFrame({"step": steps, "val_accuracy": values})
print(f"Training steps loggati: {len(accuracy_over_time)}")
프로덕션의 MLflow 모범 사례
1. 명명 규칙
일관된 명명 규칙을 사용하면 몇 달 후에도 실험을 검색하고 이해할 수 있습니다.
# Schema naming raccomandato per experiments e run
# EXPERIMENT: [team]-[progetto]-[tipo]
# Esempi:
"ml-eng-churn-prediction-gbm"
"ml-eng-churn-prediction-neural-net"
"research-recommender-collaborative"
# RUN NAME: [algoritmo]-[key-param]-[data]
# Esempi:
"gbm-lr0.05-depth6-2025-11-15"
"xgb-v2-autofeat-2025-11-20"
"baseline-logistic-regression"
# Usa sempre tag per metadata strutturati
mlflow.set_tags({
"team": "ml-engineering",
"project": "churn-prediction",
"environment": "dev", # dev | staging | prod
"dataset_version": "v2.1",
"git_branch": git_branch,
"git_commit": git_commit[:8],
"triggered_by": "ci-cd", # manual | ci-cd | scheduled
"approved_by": "", # Compilato prima della promozione
})
2. MLflow 코드의 구조
from contextlib import contextmanager
import mlflow
import functools
# Pattern: decorator per tracking automatico
def mlflow_run(experiment_name: str, run_name: str = None, tags: dict = None):
"""Decorator che wrappa una funzione in un MLflow run."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name or func.__name__) as run:
if tags:
mlflow.set_tags(tags)
result = func(*args, **kwargs)
return result
return wrapper
return decorator
# Uso del decorator
@mlflow_run(
experiment_name="churn-prediction-gbm",
run_name="training-v2",
tags={"team": "ml-eng", "version": "v2"}
)
def train_model(X_train, y_train, params: dict):
mlflow.log_params(params)
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# ... metriche e artefatti
return model
# Pattern: context manager per setup/teardown
@contextmanager
def mlflow_experiment(experiment_name: str, run_name: str):
"""Context manager con gestione errori."""
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name) as run:
try:
mlflow.set_tag("status", "running")
yield run
mlflow.set_tag("status", "success")
except Exception as e:
mlflow.set_tag("status", "failed")
mlflow.set_tag("error", str(e))
raise
# Uso
with mlflow_experiment("churn-prediction", "training-run-v3") as run:
mlflow.log_params({"n_estimators": 300})
# ... training
mlflow.log_metric("accuracy", 0.94)
3. CI/CD용 GitHub Actions 통합
# .github/workflows/ml-experiment.yml
name: ML Experiment + Model Promotion
on:
push:
branches: [main]
paths:
- 'src/models/**'
- 'src/features/**'
- 'params.yaml'
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run training
id: training
run: |
# Script di training che logga su MLflow e stampa il run_id
RUN_ID=$(python src/models/train.py --output-run-id)
echo "run_id=$RUN_ID" >> $GITHUB_OUTPUT
- name: Evaluate and check metrics
id: evaluation
run: |
python scripts/ci_evaluate.py \
--run-id ${{ steps.training.outputs.run_id }} \
--min-auc 0.92 \
--min-accuracy 0.90
- name: Promote to Staging
if: success()
run: |
python scripts/promote_model.py \
--run-id ${{ steps.training.outputs.run_id }} \
--model-name churn-gbm-model \
--target-stage Staging
- name: Notify team
if: success()
uses: slackapi/slack-github-action@v1.24.0
with:
payload: |
{
"text": "Nuovo modello promosso a Staging: run ${{ steps.training.outputs.run_id }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
피해야 할 안티패턴
-
동일한 측정항목에 대해 서로 다른 키와 함께 mlflow.log_metric()을 사용하세요.:
accuracyeacc이는 MLflow에 대한 두 가지 별도의 측정항목입니다. 표준 이름 사전을 정의하고 항상 사용하십시오. -
활성 실행을 닫지 마십시오.: 활성 실행 내에서 코드가 충돌하는 경우
컨텍스트 관리자 없이
with mlflow.start_run(), 실행이 남아 있습니다 무기한 "RUNNING" 상태입니다. 항상 컨텍스트 관리자를 사용하십시오. - 대규모 아티팩트를 CSV로 로깅: MLflow는 데이터 레이크가 아닙니다. 대규모 데이터세트의 경우 메타데이터(DVC 경로, 해시, 크기)만 기록하고 데이터 버전 관리를 위한 DVC.
- 여러 사용자가 있는 프로덕션에서 SQLite 사용: SQLite는 지원하지 않습니다 동시 저술. 두 개의 병렬 학습 프로세스를 사용하면 잠금이 발생합니다. 오류. 다중 사용자 설정에는 PostgreSQL 또는 MySQL을 사용하십시오.
- 데이터세트 버전을 기록하지 마세요.: 모델 매개변수 버전을 관리하지 않으면 데이터의 재현성이 충분하지 않습니다. 항상 로그인하세요 Git 커밋, DVC 태그 및 데이터 세트 크기.
- 중간 승격 없이 생산으로 승격 준비: 중개 스테이징 워크플로를 통해 통합 및 검증 테스트가 가능합니다. 프로덕션에 배포하기 전 팀의 모습입니다. 이 단계를 건너뛰지 마십시오.
MLflow 3의 새로운 기능: GenAI 및 에이전트를 향하여
2025년 6월 MLflow 3 출시로 플랫폼은 진화적인 도약을 이루었습니다. GenAI 세계를 지향하는 중요한 요소입니다. 모델과 함께 일하는 사람들에게 가장 관련성이 높은 뉴스 기존 ML 및 LLM 사용:
- LoggedModel은 일류 엔터티입니다.: 모델이 더 이상 존재하지 않습니다. 단지 실행의 유물입니다. LoggedModel은 실행, 환경 및 측정항목, 매개변수, 추적 및 평가 데이터에 대한 완전한 계보를 갖춘 배포입니다.
- 성능 25% 향상: MLflow 3.x에는 쿼리가 최적화되어 있습니다. 데이터베이스에 추가하고 로깅 오버헤드를 줄여 로깅 처리량을 높입니다. 버전 2.5(2026년 벤치마크)에 비해 25% 향상되었습니다.
- GenAI 추적: LLM, 체인, 도구 호출 및 에이전트에 대한 자동 추적 LangChain, LlamaIndex, OpenAI SDK, Anthropic 등을 지원합니다.
- 피드백 수집 API: 인간의 피드백을 체계적으로 수집한 것 검토 및 평가를 위해 UI와 통합된 모델 결과.
-
진화된 평가 프레임워크:
mlflow.evaluate()지금 사용자 정의 지표, LLM-판사 및 자동 모델 비교를 지원합니다.
결론 및 다음 단계
MLflow는 생태계에서 가장 널리 퍼진 실험 추적 도구로 자리 잡았습니다. 활발한 커뮤니티와 지속적인 발전을 갖춘 오픈 소스 ML. 의 조합 단일 자체 호스팅 플랫폼에서 추적, 모델 레지스트리 및 서비스를 제공하는 것이 선택입니다. MLOps가 필요 없는 인프라를 완벽하게 제어하려는 팀에게는 자연스러운 일입니다. SaaS 솔루션 비용.
이 기사에서 본 워크플로는 실험 등록부터 모델 레지스트리를 통해 프로덕션으로 승격시키기 위해 사용 사례의 90%를 다룹니다. ML 팀의 현실 데이터 버전 관리를 위해 DVC와 통합(이전 기사) CI/CD 자동화를 위한 GitHub Actions를 사용하면 완전한 MLOps 시스템을 얻을 수 있으며 연간 예산이 250 EUR 미만인 전문가.
다음 글에서는 머신러닝에서 가장 까다로운 문제 중 하나를 다루겠습니다. 생산 중 : 모델 드리프트. 성능 저하를 감지하는 방법을 알아 보겠습니다. 시간 경과에 따른 성능(데이터 드리프트, 개념 드리프트, 예측 드리프트) 및 구현 방법 Grafana 및 Prometheus에 대한 경고가 포함된 자동 재교육 시스템.
리소스 및 다음 단계
- 공식 MLflow 문서: mlflow.org/docs/latest
- MLflow 3 릴리스 노트: mlflow.org/releases/3
- MLflow GitHub: github.com/mlflow/mlflow
- 이전 기사: DVC를 사용하여 데이터 세트 및 모델 버전 관리
- 다음 기사: 모델 드리프트 감지 및 자동 재훈련
- 상호 연결: 고급 딥 러닝 - 고급 교육
- 상호 연결: 컴퓨터 비전 - 객체 감지 파이프라인
MLflow를 포함한 전체 MLOps 스택(예산 <5K EUR/년)
| 요소 | 기구 | 예상 연간 비용 |
|---|---|---|
| 실험 추적 | MLflow 자체 호스팅 | 무료(오픈소스) |
| 백엔드 저장소 | Docker의 PostgreSQL | 무료(동일한 MLflow 서버) |
| 유물 상점 | MinIO(S3 호환) 또는 AWS S3 | 무료 / ~30 EUR/년 |
| MLflow + PostgreSQL용 VM | EC2 t3.small(vCPU 2개, 2GB RAM) | ~180 EUR/년 |
| 데이터 세트 버전 관리 | DVC + DagsHub | 무료 |
| CI/CD 파이프라인 | GitHub 작업 | 무료(2000분/월) |
| 예상 총액 | <220유로/년 |







