ML モデルの提供: 本番環境での FastAPI + Uvicorn
すべてのベースラインを上回るパフォーマンスのモデルをトレーニングしました。MLflow のメトリクスは優れており、チームは と熱心に。次に、避けられない疑問が生じます。 「いつ本番環境で使用できるようになりますか?」。 そして、ここで多くの ML エンジニアが問題に遭遇します。それは、Jupyter ノートブックとサービスの間のギャップです。 HTTP はスケーラブルで信頼性が高く、監視可能であり、見た目よりもはるかに広範囲に渡ります。
ファストAPI 2024 年から 2025 年にかけて Python モデル提供の事実上の標準となりました。 PyPI では毎月 8,000 万以上のダウンロードが行われています。ネイティブタイプのヒントの組み合わせ、検証 Pydantic による自動、自動生成された OpenAPI ドキュメント、およびネイティブの非同期サポート 実稼働対応の推論 API を構築するのに最適です。隣接する ユビコーン (高性能 ASGI サーバー) と Docker コンテナ化のベスト プラクティスにより、FastAPI が可能になります scikit-learn、PyTorch、または Hugging Face モデルをわずか数時間で本番環境に導入できます。
このガイドでは、基本的な予測エンドポイントから完全なモデル サービング サービスを構築します。 バッチ処理による非同期推論、ヘルスチェックから Prometheus と Grafana による監視まで、 Docker と Kubernetes 上でのコンテナ化されたスケーラブルなデプロイメントに対応します。すべての例はテストされ、準備が整っています 実際の環境で使用するために。
何を学ぶか
- ライフサイクル管理を使用してモデルを提供するための FastAPI アプリを構築する
- CPU バウンドのタスク用にスレッド プールを使用して同期および非同期推論を実装する
- 動的バッチ処理を管理して GPU/CPU スループットを最大化します
- ヘルスチェック、readiness Probe、Prometheusモニタリングを追加
- Docker のマルチステージ ビルドを使用してコンテナ化し、運用環境に合わせて最適化します。
- FastAPI を BentoML、TorchServe、Triton Inference Server と比較する
- Locust を使用して負荷テストを実装してパフォーマンスを検証する
モデル提供に FastAPI を使用する理由
コードに入る前に、FastAPI がなぜこのような支配的な地位を獲得したのかを理解する価値があります。 Python モデルの提供。従来の選択肢である Flask との比較は非常に有益です。
Flask は、同期ブロッキング アーキテクチャである WSGI (Web Server Gateway Interface) を使用します。あらゆるリクエスト 完了するまでサーバー スレッドが占有されます。最短50msのモデルの場合 推論の場合、4 つのワーカーを備えた Flask は、劣化が始まる前に約 80 リクエスト/秒を処理します。 FastAPI は ASGI (Asynchronous Server Gateway Interface) を使用し、単一のプロセスで ノンブロッキングな方法で数千の同時接続を管理します。ユビコーンと4人の労働者と一緒に、 同じハードウェアで、光推論の 500 以上のリクエスト/秒を簡単に処理できます。
警告: 非同期は自動的に推論の高速化を意味するわけではありません
よくある間違いは、予測エンドポイントを次のように定義することです。 async def それから電話する
モデルを直接。 ML 推論は CPU バウンド (または GPU バウンド) です。非同期スレッドで実行します。
main はイベント ループをブロックし、パフォーマンスを悪化させます。正しい解決策は使用することです
asyncio.get_event_loop().run_in_executor() またはスターレットの
run_in_threadpool() 別のスレッドで推論を実行します。
プロジェクトのセットアップ
プロジェクトの構造から始めましょう。適切なコード構成は次のような場合に不可欠です 本番環境でのメンテナンス性。
# Struttura del progetto
ml-serving/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app e lifecycle
│ ├── models/
│ │ ├── __init__.py
│ │ ├── predictor.py # Wrapper del modello ML
│ │ └── schemas.py # Pydantic schemas
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── predict.py # Endpoint predizione
│ │ └── health.py # Health check endpoints
│ └── middleware/
│ ├── __init__.py
│ └── metrics.py # Prometheus metrics
├── tests/
│ ├── test_predict.py
│ └── test_health.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── locustfile.py
必要な依存関係をインストールしましょう。
# requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.10.0
scikit-learn==1.5.2
numpy==1.26.4
pandas==2.2.3
joblib==1.4.2
prometheus-fastapi-instrumentator==7.0.0
prometheus-client==0.21.0
httpx==0.28.0 # per test async
python-multipart==0.0.20
# Installazione
pip install -r requirements.txt
ライフサイクル管理を備えた FastAPI アプリ
モデル提供の核心は、アプリケーションの起動時にモデルを 1 回だけロードすることです。 すべてのリクエストに応じるわけではありません。 FastAPI 0.93+ では i が導入されました ライフスパンコンテキストマネージャー、 起動時に初期化する必要があるリソースを管理する最新のクリーンな方法 シャットダウン時に解放されます。
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
import logging
import time
from app.models.predictor import ModelPredictor
from app.routers import predict, health
# Configurazione logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Stato globale dell'applicazione (immutabile dopo init)
class AppState:
def __init__(self):
self.predictor: ModelPredictor | None = None
self.model_load_time: float = 0.0
self.model_version: str = ""
app_state = AppState()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Gestione lifecycle: load all'avvio, cleanup allo shutdown"""
# Startup
logger.info("Avvio applicazione - caricamento modello...")
start_time = time.time()
try:
app_state.predictor = ModelPredictor(
model_path="models/churn_model.pkl",
scaler_path="models/scaler.pkl"
)
app_state.model_load_time = time.time() - start_time
app_state.model_version = app_state.predictor.get_version()
logger.info(
f"Modello caricato in {app_state.model_load_time:.2f}s "
f"(versione: {app_state.model_version})"
)
except Exception as e:
logger.error(f"Errore caricamento modello: {e}")
raise RuntimeError(f"Impossibile avviare il servizio: {e}")
yield # L'app e in esecuzione
# Shutdown
logger.info("Shutdown applicazione - cleanup risorse...")
app_state.predictor = None
# Inizializzazione FastAPI
app = FastAPI(
title="ML Model Serving API",
description="Production-ready inference API con FastAPI e Uvicorn",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc"
)
# CORS (configura per il tuo ambiente)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In prod: specifica i domini
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Prometheus instrumentation
Instrumentator().instrument(app).expose(app)
# Router
app.include_router(predict.router, prefix="/api/v1", tags=["prediction"])
app.include_router(health.router, tags=["health"])
# Dependency injection dello stato
app.state.app_state = app_state
モデル予測子: ML モデル ラッパー
Il ModelPredictor そしてサービスの核心。 ML モデルをカプセル化します。
クリーンなインターフェイス、入力の前処理と入力の後処理を処理します。
出力し、監視に役立つメタデータを提供します。
# app/models/predictor.py
import joblib
import numpy as np
import pandas as pd
from pathlib import Path
import logging
from typing import Any
import hashlib
import time
logger = logging.getLogger(__name__)
class ModelPredictor:
"""Wrapper produzione-ready per modelli scikit-learn.
Responsabilità:
- Caricamento e validazione del modello
- Pre/post processing degli input/output
- Raccolta metriche di performance
"""
def __init__(self, model_path: str, scaler_path: str):
model_file = Path(model_path)
scaler_file = Path(scaler_path)
if not model_file.exists():
raise FileNotFoundError(f"Modello non trovato: {model_path}")
if not scaler_file.exists():
raise FileNotFoundError(f"Scaler non trovato: {scaler_path}")
self._model = joblib.load(model_file)
self._scaler = joblib.load(scaler_file)
self._model_hash = self._compute_hash(model_file)
self._load_timestamp = time.time()
# Feature names attese (definite al training)
self._feature_names = [
"tenure_months", "monthly_charges", "total_charges",
"num_products", "has_phone_service", "has_internet",
"contract_type", "payment_method"
]
logger.info(f"ModelPredictor inizializzato - hash: {self._model_hash[:8]}")
def predict(self, features: dict[str, Any]) -> dict[str, Any]:
"""Predizione singola con timing e validazione."""
start_time = time.perf_counter()
# Pre-processing
df = self._preprocess(features)
# Inference
prediction = self._model.predict(df)[0]
probability = self._model.predict_proba(df)[0].tolist()
inference_time_ms = (time.perf_counter() - start_time) * 1000
return {
"prediction": int(prediction),
"probability": {
"no_churn": round(probability[0], 4),
"churn": round(probability[1], 4)
},
"inference_time_ms": round(inference_time_ms, 2),
"model_version": self.get_version()
}
def predict_batch(
self,
batch: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Predizione batch ottimizzata (una sola chiamata al modello)."""
start_time = time.perf_counter()
# Costruisci DataFrame dall'intero batch
rows = [self._preprocess(item).iloc[0] for item in batch]
df_batch = pd.DataFrame(rows)
# Inference batch (una sola chiamata)
predictions = self._model.predict(df_batch)
probabilities = self._model.predict_proba(df_batch)
inference_time_ms = (time.perf_counter() - start_time) * 1000
results = []
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
results.append({
"prediction": int(pred),
"probability": {
"no_churn": round(float(prob[0]), 4),
"churn": round(float(prob[1]), 4)
},
"batch_index": i
})
logger.info(
f"Batch inference: {len(batch)} items in {inference_time_ms:.1f}ms "
f"({inference_time_ms/len(batch):.2f}ms/item)"
)
return results
def _preprocess(self, features: dict[str, Any]) -> pd.DataFrame:
"""Preprocessing input: validazione, encoding, scaling."""
df = pd.DataFrame([features])
# Encoding categorico
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
df["contract_type"] = df["contract_type"].map(contract_map).fillna(0)
df["payment_method"] = df["payment_method"].map(payment_map).fillna(0)
# Seleziona features nell'ordine corretto
df = df[self._feature_names]
# Scaling
df_scaled = self._scaler.transform(df)
return pd.DataFrame(df_scaled, columns=self._feature_names)
def get_version(self) -> str:
return self._model_hash[:12]
def get_metadata(self) -> dict[str, Any]:
return {
"model_hash": self._model_hash[:12],
"load_timestamp": self._load_timestamp,
"feature_names": self._feature_names,
"model_type": type(self._model).__name__
}
@staticmethod
def _compute_hash(file_path: Path) -> str:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
Pydantic スキーマ: 入力検証
Pydantic v2 (FastAPI 0.100+ のデフォルト) は、書き換えにより超高速な検証を提供します Rustで。厳密なスキーマを定義すると、不正な入力からモデルが保護され、 自動 API ドキュメント。
# app/models/schemas.py
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Literal
from enum import Enum
class ContractType(str, Enum):
MONTH_TO_MONTH = "month-to-month"
ONE_YEAR = "one-year"
TWO_YEAR = "two-year"
class PaymentMethod(str, Enum):
ELECTRONIC = "electronic"
MAILED = "mailed"
BANK = "bank"
CREDIT = "credit"
class PredictionRequest(BaseModel):
"""Schema input per predizione churn singola."""
tenure_months: int = Field(
..., ge=0, le=120,
description="Mesi di tenure del cliente"
)
monthly_charges: float = Field(
..., ge=0, le=500,
description="Addebito mensile in EUR"
)
total_charges: float = Field(
..., ge=0,
description="Addebito totale storico in EUR"
)
num_products: int = Field(
..., ge=1, le=10,
description="Numero di prodotti sottoscritti"
)
has_phone_service: bool = Field(
..., description="Il cliente ha servizio telefonico"
)
has_internet: bool = Field(
..., description="Il cliente ha servizio internet"
)
contract_type: ContractType = Field(
..., description="Tipo di contratto"
)
payment_method: PaymentMethod = Field(
..., description="Metodo di pagamento"
)
@model_validator(mode='after')
def validate_total_charges(self) -> 'PredictionRequest':
"""total_charges non può essere minore di monthly_charges."""
if self.total_charges < self.monthly_charges:
raise ValueError(
f"total_charges ({self.total_charges}) non può essere "
f"minore di monthly_charges ({self.monthly_charges})"
)
return self
model_config = {
"json_schema_extra": {
"example": {
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
}
}
}
class PredictionResponse(BaseModel):
"""Schema output per predizione singola."""
prediction: Literal[0, 1]
probability: dict[str, float]
inference_time_ms: float
model_version: str
class BatchPredictionRequest(BaseModel):
"""Schema input per batch prediction (max 100 items)."""
items: list[PredictionRequest] = Field(
..., min_length=1, max_length=100,
description="Lista di richieste da processare in batch"
)
class BatchPredictionResponse(BaseModel):
"""Schema output per batch prediction."""
results: list[dict]
batch_size: int
total_inference_time_ms: float
予測エンドポイント: 同期および非同期
CPU 依存タスクの正しいパターンに従って予測エンドポイントを実装します。 推論は、非同期イベント ループをブロックしないように別のスレッド プールで実行されます。
# app/routers/predict.py
from fastapi import APIRouter, Depends, HTTPException, Request
from starlette.concurrency import run_in_threadpool
import logging
import time
from app.models.predictor import ModelPredictor
from app.models.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse
)
from app.middleware.metrics import (
PREDICTION_COUNTER, PREDICTION_LATENCY,
BATCH_SIZE_HISTOGRAM, ERROR_COUNTER
)
logger = logging.getLogger(__name__)
router = APIRouter()
def get_predictor(request: Request) -> ModelPredictor:
"""Dependency injection del predictor."""
predictor = request.app.state.app_state.predictor
if predictor is None:
raise HTTPException(
status_code=503,
detail="Modello non disponibile - servizio in fase di avvio"
)
return predictor
@router.post(
"/predict",
response_model=PredictionResponse,
summary="Predizione singola",
description="Inferenza su un singolo cliente per churn prediction"
)
async def predict_single(
request: PredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> PredictionResponse:
"""
Endpoint di predizione singola.
Usa run_in_threadpool per eseguire l'inference CPU-bound
senza bloccare l'event loop async.
"""
try:
# CORRETTO: esegui task CPU-bound in threadpool
result = await run_in_threadpool(
predictor.predict,
request.model_dump()
)
# Aggiorna metriche Prometheus
PREDICTION_COUNTER.labels(
model_version=result["model_version"],
outcome="success"
).inc()
PREDICTION_LATENCY.observe(result["inference_time_ms"] / 1000)
return PredictionResponse(**result)
except Exception as e:
ERROR_COUNTER.labels(endpoint="predict", error_type=type(e).__name__).inc()
logger.error(f"Errore predizione: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Errore durante l'inference: {str(e)}"
)
@router.post(
"/predict/batch",
response_model=BatchPredictionResponse,
summary="Predizione batch",
description="Inferenza batch ottimizzata (max 100 items per request)"
)
async def predict_batch(
batch_request: BatchPredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> BatchPredictionResponse:
"""
Endpoint batch: una sola chiamata al modello per N items.
Throughput 3-5x superiore rispetto a N chiamate singole.
"""
start_time = time.perf_counter()
batch_size = len(batch_request.items)
try:
items_dicts = [item.model_dump() for item in batch_request.items]
results = await run_in_threadpool(
predictor.predict_batch,
items_dicts
)
total_time_ms = (time.perf_counter() - start_time) * 1000
BATCH_SIZE_HISTOGRAM.observe(batch_size)
PREDICTION_COUNTER.labels(
model_version=predictor.get_version(),
outcome="success_batch"
).inc(batch_size)
return BatchPredictionResponse(
results=results,
batch_size=batch_size,
total_inference_time_ms=round(total_time_ms, 2)
)
except Exception as e:
ERROR_COUNTER.labels(
endpoint="predict_batch",
error_type=type(e).__name__
).inc()
logger.error(f"Errore batch inference: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
ヘルスチェック: 稼働状態と準備完了状態
Kubernetes (またはヘルスチェックを備えた Docker Compose) デプロイメントでは、以下を区別します。 活性プローブ (プロセスは生きていますか?) e レディネスプローブ (サービスはトラフィックを受信する準備ができていますか?) 正しく動作するために不可欠です ルーティングとローリング展開の。
# app/routers/health.py
from fastapi import APIRouter, Request
from pydantic import BaseModel
import time
import psutil
import os
router = APIRouter()
class HealthResponse(BaseModel):
status: str
timestamp: float
uptime_seconds: float
class ReadinessResponse(BaseModel):
status: str
model_loaded: bool
model_version: str
model_load_time_seconds: float
memory_usage_mb: float
cpu_percent: float
_start_time = time.time()
@router.get(
"/health",
response_model=HealthResponse,
summary="Liveness probe",
tags=["health"]
)
async def liveness() -> HealthResponse:
"""
Liveness probe: verifica che il processo sia attivo.
Kubernetes usa questo per decidere se riavviare il pod.
Risponde sempre 200 se il processo e in esecuzione.
"""
return HealthResponse(
status="alive",
timestamp=time.time(),
uptime_seconds=round(time.time() - _start_time, 1)
)
@router.get(
"/health/ready",
response_model=ReadinessResponse,
summary="Readiness probe",
tags=["health"]
)
async def readiness(request: Request) -> ReadinessResponse:
"""
Readiness probe: verifica che il servizio sia pronto a ricevere traffico.
Kubernetes usa questo per il load balancing.
Risponde 503 se il modello non e ancora caricato.
"""
from fastapi import HTTPException
app_state = request.app.state.app_state
model_loaded = app_state.predictor is not None
# Metriche sistema
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
cpu_percent = process.cpu_percent(interval=0.1)
response = ReadinessResponse(
status="ready" if model_loaded else "not_ready",
model_loaded=model_loaded,
model_version=app_state.model_version if model_loaded else "",
model_load_time_seconds=round(app_state.model_load_time, 3),
memory_usage_mb=round(memory_mb, 1),
cpu_percent=round(cpu_percent, 1)
)
if not model_loaded:
raise HTTPException(status_code=503, detail=response.model_dump())
return response
@router.get("/metrics/model", tags=["health"])
async def model_metrics(request: Request):
"""Metadata e statistiche del modello caricato."""
app_state = request.app.state.app_state
if app_state.predictor is None:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="Modello non disponibile")
return app_state.predictor.get_metadata()
Prometheus と Grafana によるモニタリング
実稼働環境での ML サービスの監視は、標準の HTTP メトリクスをはるかに超えています。
推論レイテンシー、予測分布、エラー率を追跡したい
そして資源の使用。図書館 prometheus-fastapi-instrumentator
基本的な HTTP メトリクスを提供します。 ML に固有のカスタム指標を追加します。
# app/middleware/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Summary
import time
# Contatore predizioni per versione modello e outcome
PREDICTION_COUNTER = Counter(
"ml_predictions_total",
"Numero totale di predizioni eseguite",
["model_version", "outcome"]
)
# Latenza inference (in secondi)
PREDICTION_LATENCY = Histogram(
"ml_inference_duration_seconds",
"Durata inference in secondi",
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
# Dimensione batch
BATCH_SIZE_HISTOGRAM = Histogram(
"ml_batch_size",
"Dimensione delle richieste batch",
buckets=[1, 5, 10, 25, 50, 100]
)
# Contatore errori
ERROR_COUNTER = Counter(
"ml_errors_total",
"Numero totale di errori",
["endpoint", "error_type"]
)
# Distribuzione predizioni (gauge aggiornato periodicamente)
CHURN_RATE_GAUGE = Gauge(
"ml_churn_rate_rolling",
"Tasso di churn predetto (finestra mobile 1000 predizioni)"
)
# Utilizzo memoria modello
MODEL_MEMORY_GAUGE = Gauge(
"ml_model_memory_bytes",
"Memoria utilizzata dal modello ML"
)
class PredictionTracker:
"""Tracker per statistiche rolling delle predizioni."""
def __init__(self, window_size: int = 1000):
self._window_size = window_size
self._predictions: list[int] = []
def record(self, prediction: int) -> None:
self._predictions.append(prediction)
if len(self._predictions) > self._window_size:
self._predictions = self._predictions[-self._window_size:]
# Aggiorna gauge churn rate
if self._predictions:
churn_rate = sum(self._predictions) / len(self._predictions)
CHURN_RATE_GAUGE.set(churn_rate)
# Istanza globale
prediction_tracker = PredictionTracker()
完全な監視スタック用の Docker Compose セットアップ:
# docker-compose.yml
version: "3.9"
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/churn_model.pkl
- SCALER_PATH=/app/models/scaler.pkl
- LOG_LEVEL=INFO
volumes:
- ./models:/app/models:ro
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health/ready"]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s
restart: unless-stopped
deploy:
resources:
limits:
memory: 2G
cpus: "2.0"
prometheus:
image: prom/prometheus:v2.55.0
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
grafana:
image: grafana/grafana:11.3.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=mlops2025
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- grafana-data:/var/lib/grafana
- ./monitoring/dashboards:/etc/grafana/provisioning/dashboards:ro
depends_on:
- prometheus
volumes:
prometheus-data:
grafana-data:
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: "ml-api"
static_configs:
- targets: ["ml-api:8000"]
metrics_path: "/metrics"
scrape_interval: 10s
- job_name: "prometheus"
static_configs:
- targets: ["localhost:9090"]
Dockerfile のマルチステージ ビルド
本番環境に最適化された Dockerfile は、 多段階ビルド ビルドの依存関係を実行時の依存関係から分離し、大幅な削減を実現します。 最終イメージのサイズ (scikit-learn の場合は約 2GB ~約 400MB)。
# Dockerfile
# Stage 1: Builder - installa dipendenze
FROM python:3.12-slim AS builder
WORKDIR /build
# Installa tool di build
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc g++ && \
rm -rf /var/lib/apt/lists/*
# Copia requirements e installa in directory separata
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: Runtime - immagine finale minimale
FROM python:3.12-slim AS runtime
# Utente non-root per sicurezza
RUN useradd --create-home --shell /bin/bash mlserving
WORKDIR /app
# Copia dipendenze dal builder
COPY --from=builder /install /usr/local
# Copia il codice applicativo
COPY --chown=mlserving:mlserving app/ ./app/
# Crea directory modelli (i modelli sono montati come volume)
RUN mkdir -p /app/models && chown mlserving:mlserving /app/models
USER mlserving
# Healthcheck integrato
HEALTHCHECK --interval=30s --timeout=10s --start-period=20s --retries=3 \
CMD python -c "import httpx; r = httpx.get('http://localhost:8000/health'); exit(0 if r.status_code == 200 else 1)"
# Esposizione porta
EXPOSE 8000
# Avvio con Uvicorn: 4 worker, timeouts produzione
CMD ["uvicorn", "app.main:app",
"--host", "0.0.0.0",
"--port", "8000",
"--workers", "4",
"--timeout-keep-alive", "30",
"--access-log",
"--log-level", "info"]
本番環境には Uvicorn ワーカーが何人いますか?
経験則e 2 × CPU コア + 1。 2 つの vCPU を備えたポッドの場合は、5 つのワーカーを使用します。 警告: 各ワーカーはモデルのコピーをメモリにロードします。 500MBモデルでは、 ワーカーが 4 人の場合、コンテナーには約 2GB の RAM が必要です。ワーカー数とメモリのバランスを取る 利用可能です。大規模モデル (LLM) の場合、多くの場合、バッチ処理を行う 1 ワーカーが最適な選択です。
BentoML: モデル提供のための特殊なフレームワーク
FastAPI は一般的なサービスには優れていますが、 BentoML そしてデザインされた 特にモデルの提供に特化しており、FastAPI で発生する多くの問題を自動的に解決します。 動的バッチ処理、統合モデルのバージョニング、ランナーの抽象化などを手動で管理する必要があります。 独立した推論スケーリングと自動 Dockerfile 生成用 Kubernetes のマニフェスト。
# bentoml_service.py
import bentoml
import numpy as np
import pandas as pd
from bentoml.io import JSON
from pydantic import BaseModel, Field
from typing import Annotated
# 1. Salva il modello nel BentoML Model Store
# (esegui una sola volta dopo il training)
def save_model_to_store(sklearn_model, scaler):
"""Salva modello e scaler nel BentoML registry locale."""
bento_model = bentoml.sklearn.save_model(
"churn_classifier",
sklearn_model,
signatures={
"predict": {"batchable": True, "batch_dim": 0},
"predict_proba": {"batchable": True, "batch_dim": 0},
},
custom_objects={"scaler": scaler},
metadata={
"framework": "scikit-learn",
"task": "churn_prediction",
"training_date": "2025-01-15",
"metrics": {"auc_roc": 0.89, "f1": 0.82}
}
)
print(f"Modello salvato: {bento_model.tag}")
return bento_model
# 2. Definisci il Runner (inferenza scalabile)
# Il runner e l'astrazione che gestisce il modello ML
churn_runner = bentoml.sklearn.get("churn_classifier:latest").to_runner()
# 3. Definisci i Pydantic schemas
class ChurnRequest(BaseModel):
tenure_months: int = Field(..., ge=0, le=120)
monthly_charges: float = Field(..., ge=0)
total_charges: float = Field(..., ge=0)
num_products: int = Field(..., ge=1, le=10)
has_phone_service: bool
has_internet: bool
contract_type: str
payment_method: str
class ChurnResponse(BaseModel):
churn_prediction: int
churn_probability: float
model_tag: str
# 4. Definisci il Service BentoML
svc = bentoml.Service(
name="churn-prediction-service",
runners=[churn_runner]
)
@svc.api(
input=JSON(pydantic_model=ChurnRequest),
output=JSON(pydantic_model=ChurnResponse),
route="/predict"
)
async def predict(request: ChurnRequest) -> ChurnResponse:
"""Predizione churn con BentoML - batching automatico."""
# BentoML gestisce il batching automaticamente
# quando batchable=True e configurato nel runner
# Prepara features (stesso preprocessing del training)
features = preprocess(request)
# Chiamata async al runner (BentoML gestisce threading/batching)
prediction = await churn_runner.predict.async_run(features)
probability = await churn_runner.predict_proba.async_run(features)
return ChurnResponse(
churn_prediction=int(prediction[0]),
churn_probability=round(float(probability[0][1]), 4),
model_tag=str(bentoml.sklearn.get("churn_classifier:latest").tag)
)
def preprocess(request: ChurnRequest) -> np.ndarray:
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
return np.array([[
request.tenure_months,
request.monthly_charges,
request.total_charges,
request.num_products,
int(request.has_phone_service),
int(request.has_internet),
contract_map.get(request.contract_type, 0),
payment_map.get(request.payment_method, 0)
]])
BentoML を使用して 3 つのコマンドでデプロイします。
# 1. Build del Bento (artifact deployabile)
bentoml build
# Output:
# Successfully built Bento(tag="churn-prediction-service:a1b2c3d4")
# Bento size: 245MB (modello + codice + deps)
# 2. Genera immagine Docker automaticamente
bentoml containerize churn-prediction-service:latest
# 3. Avvia il container
docker run -p 3000:3000 churn-prediction-service:latest
# Oppure: deploy su BentoCloud (managed)
# bentoml deploy churn-prediction-service:latest --name prod-churn
動的バッチ処理: スループットの最大化
Il 動的バッチ処理 複数の受信リクエストを収集する手法 単一のモデル呼び出しでそれらをまとめて処理します。 GPU ではこれが特に当てはまります GPU は大規模なバッチでの並列操作向けに設計されているため、効果的です。 CPU では、利点は小さくなりますが、固定オーバーヘッドが高いモデルでは依然として重要です。
# app/batching/dynamic_batcher.py
import asyncio
import time
from collections import deque
from dataclasses import dataclass
from typing import Any
import logging
logger = logging.getLogger(__name__)
@dataclass
class PendingRequest:
"""Una singola richiesta in attesa di essere processata in batch."""
data: dict[str, Any]
future: asyncio.Future
arrival_time: float
class DynamicBatcher:
"""
Dynamic batcher per ML inference.
Raccoglie richieste per max_wait_ms millisecondi (o fino a
max_batch_size richieste) e poi le processa insieme.
Parametri da tuner per il proprio use case:
- max_batch_size: dipende dalla memoria GPU/CPU disponibile
- max_wait_ms: tradeoff tra latenza singola e throughput
"""
def __init__(
self,
predictor,
max_batch_size: int = 32,
max_wait_ms: float = 10.0
):
self._predictor = predictor
self._max_batch_size = max_batch_size
self._max_wait_ms = max_wait_ms
self._queue: deque[PendingRequest] = deque()
self._lock = asyncio.Lock()
self._batch_task: asyncio.Task | None = None
async def predict(self, data: dict[str, Any]) -> dict[str, Any]:
"""
Aggiunge la richiesta alla coda e attende il risultato.
Chiamata concorrente - sicura per uso multi-thread.
"""
loop = asyncio.get_event_loop()
future = loop.create_future()
pending = PendingRequest(
data=data,
future=future,
arrival_time=time.perf_counter()
)
async with self._lock:
self._queue.append(pending)
# Avvia il task di batch processing se non e già in esecuzione
if self._batch_task is None or self._batch_task.done():
self._batch_task = asyncio.create_task(
self._process_batch()
)
return await future
async def _process_batch(self) -> None:
"""Processa un batch di richieste."""
# Attendi max_wait_ms o fino a max_batch_size richieste
await asyncio.sleep(self._max_wait_ms / 1000)
async with self._lock:
if not self._queue:
return
# Prendi fino a max_batch_size richieste dalla coda
batch = []
while self._queue and len(batch) < self._max_batch_size:
batch.append(self._queue.popleft())
if not batch:
return
# Processa il batch (CPU-bound in threadpool)
try:
from starlette.concurrency import run_in_threadpool
items = [req.data for req in batch]
results = await run_in_threadpool(
self._predictor.predict_batch,
items
)
# Distribuisci i risultati ai rispettivi futures
for pending_req, result in zip(batch, results):
if not pending_req.future.done():
pending_req.future.set_result(result)
wait_times = [
(time.perf_counter() - req.arrival_time) * 1000
for req in batch
]
logger.info(
f"Batch processato: {len(batch)} items, "
f"attesa media {sum(wait_times)/len(wait_times):.1f}ms"
)
except Exception as e:
logger.error(f"Errore batch processing: {e}")
for pending_req in batch:
if not pending_req.future.done():
pending_req.future.set_exception(e)
Locust による負荷テスト
本番環境に入る前に、サービスのパフォーマンスを検証することが重要です 実際の負荷がかかっている状態。 イナゴ 負荷テスト用の標準の Python ツール、 直感的な DSL を使用して、複雑なユーザーの動作をシミュレートします。
# locustfile.py
from locust import HttpUser, task, between
import json
import random
SAMPLE_REQUESTS = [
{
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
},
{
"tenure_months": 60,
"monthly_charges": 45.0,
"total_charges": 2700.0,
"num_products": 2,
"has_phone_service": True,
"has_internet": False,
"contract_type": "two-year",
"payment_method": "bank"
},
]
class MLApiUser(HttpUser):
"""Simula un utente che chiama l'API di predizione."""
wait_time = between(0.1, 0.5) # Attesa tra richieste: 100-500ms
@task(weight=8)
def predict_single(self):
"""80% delle richieste: predizione singola."""
payload = random.choice(SAMPLE_REQUESTS)
with self.client.post(
"/api/v1/predict",
json=payload,
catch_response=True
) as response:
if response.status_code == 200:
data = response.json()
if "prediction" not in data:
response.failure("Response mancante campo 'prediction'")
else:
response.failure(f"Status code: {response.status_code}")
@task(weight=2)
def predict_batch(self):
"""20% delle richieste: batch prediction (10 items)."""
batch_size = random.randint(5, 20)
payload = {
"items": [
random.choice(SAMPLE_REQUESTS)
for _ in range(batch_size)
]
}
with self.client.post(
"/api/v1/predict/batch",
json=payload,
catch_response=True
) as response:
if response.status_code != 200:
response.failure(f"Batch failed: {response.status_code}")
@task(weight=1)
def health_check(self):
"""Health check periodico."""
self.client.get("/health/ready")
# Avvio load test:
# locust --headless --users 100 --spawn-rate 10 \
# --host http://localhost:8000 --run-time 2m \
# --html report.html
フレームワークの比較: いつ何を使用するか
提供フレームワークの選択はコンテキストによって異なります。実用的なガイドは次のとおりです。
| フレームワーク | 理想的な使用例 | プロ | に対して | レイテンシー (p99) |
|---|---|---|---|---|
| FastAPI + ユビコーン | カスタム API、マイクロサービス、Python チーム | 最大限の柔軟性、充実したエコシステム、優れたドキュメント | 自動バッチ処理なし、手動監視 | 5~20ミリ秒 |
| BentoML | モデルのパッケージ化、ML に重点を置いたチーム | 自動バッチ処理、統合モデル ストア、自動 Docker/K8s 生成 | オーバーヘッド フレームワーク、学習曲線 | 8~30ミリ秒 |
| トーチサーブ | 本番環境の PyTorch モデル | PyTorch、TorchScript サポート、マルチモデル用に最適化 | PyTorch のみ、Java ベースの内部構造 | 3~15ミリ秒 |
| Triton 推論サーバー | 高スループットの GPU サービス | 最大の GPU パフォーマンス、TensorRT、マルチフレームワーク | 非常に複雑で、NVIDIA GPU が必要 | 1~5ms (GPU) |
| MLflow モデル | ラピッドプロトタイピング、MLflow チーム | ネイティブ MLflow 統合、ゼロ構成 | 高トラフィック、限られたカスタマイズには適さない | 20~100ミリ秒 |
中小企業向けの推奨事項 (予算 < 5,000 ユーロ/年)
ほとんどのイタリアの中小企業では、モデルの提供から始まり、スタック FastAPI + Uvicorn + Docker + Prometheus + Grafana そして最適な選択: 100% オープンソースであり、ML フレームワークの専門スキルは必要ありません。 必要に応じて Kubernetes を使用して簡単に実行でき、サポートのための巨大なコミュニティがあります。 チームがさらに多くのモデルを管理し、必要とする場合には、BentoML を検討する価値があります。 梱包を自動化します。 Triton と TorchServe は専用 GPU にのみ関連します および 5ms 未満の遅延要件。
ベストプラクティスとアンチパターン
完全な実装を確認した後、重要なベスト プラクティスをまとめてみましょう。 FastAPI で提供されるモデルで最も一般的なアンチパターン。
絶対に避けるべきアンチパターン
- 各リクエストでモデルをロードします。 ロードには 1 ~ 10 秒かかります そしてパフォーマンスを破壊します。常にライフスパン コンテキスト マネージャーを使用してください。
- run_in_threadpool を使用せずに async def でモデルを呼び出します。 ブロック イベントループが発生し、サービスが事実上シングルスレッドになります。
- 入力検証なし: 異常な値によって引き起こされる可能性がある モデル内のあいまいな例外。常に厳しい制約を設けて Pydantic を使用してください。
- ヘルスチェックの準備ができていません: Kubernetes が送信を開始します モデルがロードされる前にトラフィックが発生し、コールド スタート時に 500 エラーが発生します。
- ホットパス内のログが冗長すぎます: 各予測をログに記録する INFO レベルでは、トラフィックが多い場合にそれ自体がボトルネックになる可能性があります。デバッグを使用する 単一の予測の場合は INFO、集計統計の場合は INFO。
基本的なベストプラクティス
-
API のバージョン管理: 常にプレフィックスを使用する
/api/v1/。 入力に重大な変更を加えてモデルを更新するときは、/api/v2/既存のクライアントとの互換性を保つために、v1 をアクティブなままにします。 -
明示的なタイムアウト: 推論タイムアウトを構成します (例: 5 秒)
と
asyncio.wait_for()遅いリクエストによってスレッド プールが飽和するのを防ぎます。 -
サーキットブレーカー: 停止するためにサーキットブレーカーを設置する
エラー率がしきい値 (例: 50%) を超えた場合にモデルにリクエストを送信します。
60秒以内)。図書館
pybreakerそして良い選択肢です。 -
正常なシャットダウン: Uvicorn を設定する
--timeout-graceful-shutdown 30コンテナがシャットダウンする前に進行中のリクエストを完了します。 -
構造化されたロギング: アメリカ合衆国
structlogまたはJSON形式 本番ログ用。 Elasticsearch/Loki との統合が容易になります。
サービスを開始する
すべてを実装したら、開発と運用でサービスを開始するのは簡単です。
# Sviluppo locale (hot reload)
uvicorn app.main:app --reload --port 8000
# Produzione con Uvicorn diretto (senza Docker)
uvicorn app.main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--timeout-keep-alive 30 \
--access-log \
--log-level info \
--timeout-graceful-shutdown 30
# Con Docker Compose (raccomandato per produzione locale)
docker compose up -d
# Verifica il servizio
curl http://localhost:8000/health/ready
curl -X POST http://localhost:8000/api/v1/predict \
-H "Content-Type: application/json" \
-d '{"tenure_months": 24, "monthly_charges": 65.5, "total_charges": 1572.0, "num_products": 3, "has_phone_service": true, "has_internet": true, "contract_type": "month-to-month", "payment_method": "electronic"}'
# Documentazione API interattiva
# http://localhost:8000/docs (Swagger UI)
# http://localhost:8000/redoc (ReDoc)
# http://localhost:8000/metrics (Prometheus metrics)
# http://localhost:3000 (Grafana dashboard)
結論と次のステップ
このガイドでは、本番環境に対応したモデル提供サービスを構築しました。 FastAPI と Uvicorn: ライフサイクル管理から Prometheus による監視、バッチ処理まで マルチステージビルドで最適化された動的Dockerfile。私たちも見ました 特化した代替手段としての BentoML と利用可能な主なフレームワークの比較 2025年に。
このガイドの完全なコード (テストを含む)、事前構成された Grafana ダッシュボード および Kubernetes マニフェストがあり、MLOps シリーズの GitHub リポジトリで入手できます。 FastAPI + Uvicorn + Docker + Prometheus スタックで大部分をカバー 最大 20 ~ 30 人の ML エンジニアからなるチーム向けのモデル サービング ユースケースのコスト コンテンツインフラストラクチャと最大限の柔軟性。
サービングとローをマスターした後の自然な次のステップ スケーリング Kubernetes 上で: 水平ポッド オートスケーラーを使用してデプロイし、より多くの管理を行います カナリア リリースを使用したモデル リリース、および複雑な ML パイプラインのオーケストレーション KubeFlowを使用して。それについては、このシリーズの次の記事で説明します。
MLOps シリーズ: 関連記事
- MLOps: 実験から実稼働へ - ML ライフサイクルの基礎
- CI/CD を使用した ML パイプライン: GitHub Actions + Docker - トレーニングと導入を自動化する
- MLflow を使用した実験の追跡 - 実験とモデルのレジストリを管理する
- Kubernetes での ML のスケーリング - シリーズの次のステップ: スケーリングとオーケストレーション







