ML 모델 제공: 프로덕션의 FastAPI + Uvicorn
모든 기준을 능가하는 모델을 훈련했으며 MLflow의 측정항목이 훌륭하고 팀이 그리고 열정적이다. 그러면 피할 수 없는 질문이 나옵니다. “언제 프로덕션에 사용할 수 있나요?”. 그리고 이것이 바로 많은 ML 엔지니어가 문제에 직면하는 부분입니다. Jupyter 노트북과 서비스 간의 격차입니다. HTTP는 확장 가능하고 안정적이며 모니터링이 가능하며 보이는 것보다 훨씬 더 광범위합니다.
FastAPI 2024~2025년에 제공되는 Python 모델의 사실상 표준이 되었습니다. PyPI에서는 매달 8천만 건 이상의 다운로드가 이루어지고 있습니다. 기본 유형 힌트, 유효성 검사의 조합 Pydantic을 통한 자동, 자동으로 생성된 OpenAPI 문서 및 기본 비동기 지원 lo 프로덕션에 바로 사용할 수 있는 추론 API를 구축하는 데 이상적입니다. 측면에 유비콘 (고성능 ASGI 서버) 및 Docker 컨테이너화 모범 사례를 통해 FastAPI를 통해 단 몇 시간 만에 scikit-learn, PyTorch 또는 Hugging Face 모델을 프로덕션에 적용할 수 있습니다.
이 가이드에서는 기본 예측 엔드포인트에서 완전한 모델 제공 서비스를 구축합니다. 상태 확인부터 Prometheus 및 Grafana를 통한 모니터링까지 일괄 처리를 통한 비동기식 추론에 이르기까지 Docker 및 Kubernetes에서 컨테이너화되고 확장 가능한 배포가 가능합니다. 모든 예시는 테스트를 거쳐 준비되었습니다. 실제 환경에서 사용하기 위해.
무엇을 배울 것인가
- 수명 주기 관리를 통해 모델 제공을 위한 FastAPI 앱 구조화
- CPU 바인딩된 작업을 위해 스레드 풀을 사용하여 동기 및 비동기 추론 구현
- GPU/CPU 처리량을 극대화하기 위해 동적 일괄 처리를 관리합니다.
- 상태 확인, 준비 상태 프로브 및 Prometheus 모니터링 추가
- Docker 다단계 빌드로 컨테이너화하고 프로덕션에 최적화
- FastAPI를 BentoML, TorchServe 및 Triton 추론 서버와 비교
- 성능 검증을 위해 Locust로 부하 테스트 구현
모델 제공을 위해 FastAPI를 사용하는 이유
코드에 들어가기 전에 FastAPI가 왜 이러한 지배적인 위치를 차지했는지 이해하는 것이 좋습니다. Python 모델 제공에서. 전통적인 선택인 플라스크와의 비교는 계몽적입니다.
Flask는 동기식 차단 아키텍처인 WSGI(Web Server Gateway Interface)를 사용합니다. 모든 요청 완료될 때까지 서버 스레드를 차지합니다. 50ms 정도의 짧은 시간이 필요한 모델의 경우 추론 결과, 작업자가 4개인 Flask는 성능 저하가 시작되기 전에 초당 약 80개의 요청을 처리합니다. FastAPI는 ASGI(Asynchronous Server Gateway Interface)를 사용하여 단일 프로세스가 비차단 방식으로 수천 개의 동시 연결을 관리합니다. 유비콘과 일꾼 4명이서 동일한 하드웨어는 광 추론을 위해 초당 500개 이상의 요청을 쉽게 처리합니다.
경고: 비동기가 자동으로 추론 속도가 빨라지는 것을 의미하지는 않습니다.
일반적인 실수는 예측 끝점을 다음과 같이 정의하는 것입니다. async def 그런 다음 전화
모델이 직접. ML 추론은 CPU 바인딩(또는 GPU 바인딩)입니다. 비동기 스레드에서 실행합니다.
main은 이벤트 루프를 차단하고 성능을 저하시킵니다. 올바른 해결책은 다음을 사용하는 것입니다.
asyncio.get_event_loop().run_in_executor() 아니면 스타렛의
run_in_threadpool() 별도의 스레드에서 추론을 실행합니다.
프로젝트 설정
프로젝트 구조부터 시작해 보겠습니다. 좋은 코드 구성은 필수적입니다. 생산의 유지 보수성.
# Struttura del progetto
ml-serving/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app e lifecycle
│ ├── models/
│ │ ├── __init__.py
│ │ ├── predictor.py # Wrapper del modello ML
│ │ └── schemas.py # Pydantic schemas
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── predict.py # Endpoint predizione
│ │ └── health.py # Health check endpoints
│ └── middleware/
│ ├── __init__.py
│ └── metrics.py # Prometheus metrics
├── tests/
│ ├── test_predict.py
│ └── test_health.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── locustfile.py
필요한 종속성을 설치해 보겠습니다.
# requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.10.0
scikit-learn==1.5.2
numpy==1.26.4
pandas==2.2.3
joblib==1.4.2
prometheus-fastapi-instrumentator==7.0.0
prometheus-client==0.21.0
httpx==0.28.0 # per test async
python-multipart==0.0.20
# Installazione
pip install -r requirements.txt
수명주기 관리 기능을 갖춘 FastAPI 앱
모델 서빙의 핵심은 애플리케이션이 시작될 때 모델을 한 번만 로드하는 것입니다. 모든 요청에 는 아닙니다. FastAPI 0.93+에서는 i를 소개합니다. 수명 컨텍스트 관리자, 시작 시 초기화해야 하는 리소스를 관리하는 현대적이고 깔끔한 방법 종료 시 출시됨.
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
import logging
import time
from app.models.predictor import ModelPredictor
from app.routers import predict, health
# Configurazione logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Stato globale dell'applicazione (immutabile dopo init)
class AppState:
def __init__(self):
self.predictor: ModelPredictor | None = None
self.model_load_time: float = 0.0
self.model_version: str = ""
app_state = AppState()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Gestione lifecycle: load all'avvio, cleanup allo shutdown"""
# Startup
logger.info("Avvio applicazione - caricamento modello...")
start_time = time.time()
try:
app_state.predictor = ModelPredictor(
model_path="models/churn_model.pkl",
scaler_path="models/scaler.pkl"
)
app_state.model_load_time = time.time() - start_time
app_state.model_version = app_state.predictor.get_version()
logger.info(
f"Modello caricato in {app_state.model_load_time:.2f}s "
f"(versione: {app_state.model_version})"
)
except Exception as e:
logger.error(f"Errore caricamento modello: {e}")
raise RuntimeError(f"Impossibile avviare il servizio: {e}")
yield # L'app e in esecuzione
# Shutdown
logger.info("Shutdown applicazione - cleanup risorse...")
app_state.predictor = None
# Inizializzazione FastAPI
app = FastAPI(
title="ML Model Serving API",
description="Production-ready inference API con FastAPI e Uvicorn",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc"
)
# CORS (configura per il tuo ambiente)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In prod: specifica i domini
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Prometheus instrumentation
Instrumentator().instrument(app).expose(app)
# Router
app.include_router(predict.router, prefix="/api/v1", tags=["prediction"])
app.include_router(health.router, tags=["health"])
# Dependency injection dello stato
app.state.app_state = app_state
모델 예측자: ML 모델 래퍼
Il ModelPredictor 그리고 서비스의 핵심. ML 모델을 다음과 같이 캡슐화합니다.
깔끔한 인터페이스, 입력 전처리 및 입력 후처리 처리
출력하고 모니터링에 유용한 메타데이터를 제공합니다.
# app/models/predictor.py
import joblib
import numpy as np
import pandas as pd
from pathlib import Path
import logging
from typing import Any
import hashlib
import time
logger = logging.getLogger(__name__)
class ModelPredictor:
"""Wrapper produzione-ready per modelli scikit-learn.
Responsabilità:
- Caricamento e validazione del modello
- Pre/post processing degli input/output
- Raccolta metriche di performance
"""
def __init__(self, model_path: str, scaler_path: str):
model_file = Path(model_path)
scaler_file = Path(scaler_path)
if not model_file.exists():
raise FileNotFoundError(f"Modello non trovato: {model_path}")
if not scaler_file.exists():
raise FileNotFoundError(f"Scaler non trovato: {scaler_path}")
self._model = joblib.load(model_file)
self._scaler = joblib.load(scaler_file)
self._model_hash = self._compute_hash(model_file)
self._load_timestamp = time.time()
# Feature names attese (definite al training)
self._feature_names = [
"tenure_months", "monthly_charges", "total_charges",
"num_products", "has_phone_service", "has_internet",
"contract_type", "payment_method"
]
logger.info(f"ModelPredictor inizializzato - hash: {self._model_hash[:8]}")
def predict(self, features: dict[str, Any]) -> dict[str, Any]:
"""Predizione singola con timing e validazione."""
start_time = time.perf_counter()
# Pre-processing
df = self._preprocess(features)
# Inference
prediction = self._model.predict(df)[0]
probability = self._model.predict_proba(df)[0].tolist()
inference_time_ms = (time.perf_counter() - start_time) * 1000
return {
"prediction": int(prediction),
"probability": {
"no_churn": round(probability[0], 4),
"churn": round(probability[1], 4)
},
"inference_time_ms": round(inference_time_ms, 2),
"model_version": self.get_version()
}
def predict_batch(
self,
batch: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Predizione batch ottimizzata (una sola chiamata al modello)."""
start_time = time.perf_counter()
# Costruisci DataFrame dall'intero batch
rows = [self._preprocess(item).iloc[0] for item in batch]
df_batch = pd.DataFrame(rows)
# Inference batch (una sola chiamata)
predictions = self._model.predict(df_batch)
probabilities = self._model.predict_proba(df_batch)
inference_time_ms = (time.perf_counter() - start_time) * 1000
results = []
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
results.append({
"prediction": int(pred),
"probability": {
"no_churn": round(float(prob[0]), 4),
"churn": round(float(prob[1]), 4)
},
"batch_index": i
})
logger.info(
f"Batch inference: {len(batch)} items in {inference_time_ms:.1f}ms "
f"({inference_time_ms/len(batch):.2f}ms/item)"
)
return results
def _preprocess(self, features: dict[str, Any]) -> pd.DataFrame:
"""Preprocessing input: validazione, encoding, scaling."""
df = pd.DataFrame([features])
# Encoding categorico
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
df["contract_type"] = df["contract_type"].map(contract_map).fillna(0)
df["payment_method"] = df["payment_method"].map(payment_map).fillna(0)
# Seleziona features nell'ordine corretto
df = df[self._feature_names]
# Scaling
df_scaled = self._scaler.transform(df)
return pd.DataFrame(df_scaled, columns=self._feature_names)
def get_version(self) -> str:
return self._model_hash[:12]
def get_metadata(self) -> dict[str, Any]:
return {
"model_hash": self._model_hash[:12],
"load_timestamp": self._load_timestamp,
"feature_names": self._feature_names,
"model_type": type(self._model).__name__
}
@staticmethod
def _compute_hash(file_path: Path) -> str:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
Pydantic 스키마: 입력 유효성 검사
Pydantic v2(FastAPI 0.100+의 기본값)는 재작성 덕분에 초고속 검증을 제공합니다. 러스트에서. 엄격한 스키마를 정의하면 잘못된 입력으로부터 모델을 보호하고 다음을 제공합니다. 자동 API 문서화.
# app/models/schemas.py
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Literal
from enum import Enum
class ContractType(str, Enum):
MONTH_TO_MONTH = "month-to-month"
ONE_YEAR = "one-year"
TWO_YEAR = "two-year"
class PaymentMethod(str, Enum):
ELECTRONIC = "electronic"
MAILED = "mailed"
BANK = "bank"
CREDIT = "credit"
class PredictionRequest(BaseModel):
"""Schema input per predizione churn singola."""
tenure_months: int = Field(
..., ge=0, le=120,
description="Mesi di tenure del cliente"
)
monthly_charges: float = Field(
..., ge=0, le=500,
description="Addebito mensile in EUR"
)
total_charges: float = Field(
..., ge=0,
description="Addebito totale storico in EUR"
)
num_products: int = Field(
..., ge=1, le=10,
description="Numero di prodotti sottoscritti"
)
has_phone_service: bool = Field(
..., description="Il cliente ha servizio telefonico"
)
has_internet: bool = Field(
..., description="Il cliente ha servizio internet"
)
contract_type: ContractType = Field(
..., description="Tipo di contratto"
)
payment_method: PaymentMethod = Field(
..., description="Metodo di pagamento"
)
@model_validator(mode='after')
def validate_total_charges(self) -> 'PredictionRequest':
"""total_charges non può essere minore di monthly_charges."""
if self.total_charges < self.monthly_charges:
raise ValueError(
f"total_charges ({self.total_charges}) non può essere "
f"minore di monthly_charges ({self.monthly_charges})"
)
return self
model_config = {
"json_schema_extra": {
"example": {
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
}
}
}
class PredictionResponse(BaseModel):
"""Schema output per predizione singola."""
prediction: Literal[0, 1]
probability: dict[str, float]
inference_time_ms: float
model_version: str
class BatchPredictionRequest(BaseModel):
"""Schema input per batch prediction (max 100 items)."""
items: list[PredictionRequest] = Field(
..., min_length=1, max_length=100,
description="Lista di richieste da processare in batch"
)
class BatchPredictionResponse(BaseModel):
"""Schema output per batch prediction."""
results: list[dict]
batch_size: int
total_inference_time_ms: float
예측 엔드포인트: 동기식 및 비동기식
우리는 CPU 바인딩된 작업에 대한 올바른 패턴에 따라 예측 엔드포인트를 구현합니다. 추론은 비동기 이벤트 루프를 차단하지 않기 위해 별도의 스레드 풀에서 수행됩니다.
# app/routers/predict.py
from fastapi import APIRouter, Depends, HTTPException, Request
from starlette.concurrency import run_in_threadpool
import logging
import time
from app.models.predictor import ModelPredictor
from app.models.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse
)
from app.middleware.metrics import (
PREDICTION_COUNTER, PREDICTION_LATENCY,
BATCH_SIZE_HISTOGRAM, ERROR_COUNTER
)
logger = logging.getLogger(__name__)
router = APIRouter()
def get_predictor(request: Request) -> ModelPredictor:
"""Dependency injection del predictor."""
predictor = request.app.state.app_state.predictor
if predictor is None:
raise HTTPException(
status_code=503,
detail="Modello non disponibile - servizio in fase di avvio"
)
return predictor
@router.post(
"/predict",
response_model=PredictionResponse,
summary="Predizione singola",
description="Inferenza su un singolo cliente per churn prediction"
)
async def predict_single(
request: PredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> PredictionResponse:
"""
Endpoint di predizione singola.
Usa run_in_threadpool per eseguire l'inference CPU-bound
senza bloccare l'event loop async.
"""
try:
# CORRETTO: esegui task CPU-bound in threadpool
result = await run_in_threadpool(
predictor.predict,
request.model_dump()
)
# Aggiorna metriche Prometheus
PREDICTION_COUNTER.labels(
model_version=result["model_version"],
outcome="success"
).inc()
PREDICTION_LATENCY.observe(result["inference_time_ms"] / 1000)
return PredictionResponse(**result)
except Exception as e:
ERROR_COUNTER.labels(endpoint="predict", error_type=type(e).__name__).inc()
logger.error(f"Errore predizione: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Errore durante l'inference: {str(e)}"
)
@router.post(
"/predict/batch",
response_model=BatchPredictionResponse,
summary="Predizione batch",
description="Inferenza batch ottimizzata (max 100 items per request)"
)
async def predict_batch(
batch_request: BatchPredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> BatchPredictionResponse:
"""
Endpoint batch: una sola chiamata al modello per N items.
Throughput 3-5x superiore rispetto a N chiamate singole.
"""
start_time = time.perf_counter()
batch_size = len(batch_request.items)
try:
items_dicts = [item.model_dump() for item in batch_request.items]
results = await run_in_threadpool(
predictor.predict_batch,
items_dicts
)
total_time_ms = (time.perf_counter() - start_time) * 1000
BATCH_SIZE_HISTOGRAM.observe(batch_size)
PREDICTION_COUNTER.labels(
model_version=predictor.get_version(),
outcome="success_batch"
).inc(batch_size)
return BatchPredictionResponse(
results=results,
batch_size=batch_size,
total_inference_time_ms=round(total_time_ms, 2)
)
except Exception as e:
ERROR_COUNTER.labels(
endpoint="predict_batch",
error_type=type(e).__name__
).inc()
logger.error(f"Errore batch inference: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
상태 확인: 활성 및 준비 상태
Kubernetes(또는 상태 확인이 포함된 Docker Compose) 배포에서 다음을 구별합니다. 활성 프로브 (프로세스가 살아있나요?) e 준비 상태 프로브 (서비스가 트래픽을 수신할 준비가 되어 있습니까?) 올바른 작동을 위해 필수적입니다. 라우팅 및 롤링 배포.
# app/routers/health.py
from fastapi import APIRouter, Request
from pydantic import BaseModel
import time
import psutil
import os
router = APIRouter()
class HealthResponse(BaseModel):
status: str
timestamp: float
uptime_seconds: float
class ReadinessResponse(BaseModel):
status: str
model_loaded: bool
model_version: str
model_load_time_seconds: float
memory_usage_mb: float
cpu_percent: float
_start_time = time.time()
@router.get(
"/health",
response_model=HealthResponse,
summary="Liveness probe",
tags=["health"]
)
async def liveness() -> HealthResponse:
"""
Liveness probe: verifica che il processo sia attivo.
Kubernetes usa questo per decidere se riavviare il pod.
Risponde sempre 200 se il processo e in esecuzione.
"""
return HealthResponse(
status="alive",
timestamp=time.time(),
uptime_seconds=round(time.time() - _start_time, 1)
)
@router.get(
"/health/ready",
response_model=ReadinessResponse,
summary="Readiness probe",
tags=["health"]
)
async def readiness(request: Request) -> ReadinessResponse:
"""
Readiness probe: verifica che il servizio sia pronto a ricevere traffico.
Kubernetes usa questo per il load balancing.
Risponde 503 se il modello non e ancora caricato.
"""
from fastapi import HTTPException
app_state = request.app.state.app_state
model_loaded = app_state.predictor is not None
# Metriche sistema
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
cpu_percent = process.cpu_percent(interval=0.1)
response = ReadinessResponse(
status="ready" if model_loaded else "not_ready",
model_loaded=model_loaded,
model_version=app_state.model_version if model_loaded else "",
model_load_time_seconds=round(app_state.model_load_time, 3),
memory_usage_mb=round(memory_mb, 1),
cpu_percent=round(cpu_percent, 1)
)
if not model_loaded:
raise HTTPException(status_code=503, detail=response.model_dump())
return response
@router.get("/metrics/model", tags=["health"])
async def model_metrics(request: Request):
"""Metadata e statistiche del modello caricato."""
app_state = request.app.state.app_state
if app_state.predictor is None:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="Modello non disponibile")
return app_state.predictor.get_metadata()
Prometheus 및 Grafana를 사용한 모니터링
프로덕션에서 ML 서비스를 모니터링하는 것은 표준 HTTP 지표를 훨씬 뛰어넘습니다.
추론 지연 시간, 예측 분포, 오류율을 추적하고 싶습니다.
그리고 자원의 사용. 도서관 prometheus-fastapi-instrumentator
기본 HTTP 측정항목을 제공합니다. ML과 관련된 사용자 지정 측정항목을 추가합니다.
# app/middleware/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Summary
import time
# Contatore predizioni per versione modello e outcome
PREDICTION_COUNTER = Counter(
"ml_predictions_total",
"Numero totale di predizioni eseguite",
["model_version", "outcome"]
)
# Latenza inference (in secondi)
PREDICTION_LATENCY = Histogram(
"ml_inference_duration_seconds",
"Durata inference in secondi",
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
# Dimensione batch
BATCH_SIZE_HISTOGRAM = Histogram(
"ml_batch_size",
"Dimensione delle richieste batch",
buckets=[1, 5, 10, 25, 50, 100]
)
# Contatore errori
ERROR_COUNTER = Counter(
"ml_errors_total",
"Numero totale di errori",
["endpoint", "error_type"]
)
# Distribuzione predizioni (gauge aggiornato periodicamente)
CHURN_RATE_GAUGE = Gauge(
"ml_churn_rate_rolling",
"Tasso di churn predetto (finestra mobile 1000 predizioni)"
)
# Utilizzo memoria modello
MODEL_MEMORY_GAUGE = Gauge(
"ml_model_memory_bytes",
"Memoria utilizzata dal modello ML"
)
class PredictionTracker:
"""Tracker per statistiche rolling delle predizioni."""
def __init__(self, window_size: int = 1000):
self._window_size = window_size
self._predictions: list[int] = []
def record(self, prediction: int) -> None:
self._predictions.append(prediction)
if len(self._predictions) > self._window_size:
self._predictions = self._predictions[-self._window_size:]
# Aggiorna gauge churn rate
if self._predictions:
churn_rate = sum(self._predictions) / len(self._predictions)
CHURN_RATE_GAUGE.set(churn_rate)
# Istanza globale
prediction_tracker = PredictionTracker()
전체 모니터링 스택을 위한 Docker Compose 설정:
# docker-compose.yml
version: "3.9"
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/churn_model.pkl
- SCALER_PATH=/app/models/scaler.pkl
- LOG_LEVEL=INFO
volumes:
- ./models:/app/models:ro
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health/ready"]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s
restart: unless-stopped
deploy:
resources:
limits:
memory: 2G
cpus: "2.0"
prometheus:
image: prom/prometheus:v2.55.0
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
grafana:
image: grafana/grafana:11.3.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=mlops2025
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- grafana-data:/var/lib/grafana
- ./monitoring/dashboards:/etc/grafana/provisioning/dashboards:ro
depends_on:
- prometheus
volumes:
prometheus-data:
grafana-data:
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: "ml-api"
static_configs:
- targets: ["ml-api:8000"]
metrics_path: "/metrics"
scrape_interval: 10s
- job_name: "prometheus"
static_configs:
- targets: ["localhost:9090"]
Dockerfile 다단계 빌드
프로덕션에 최적화된 Dockerfile은 다음을 사용합니다. 다단계 빌드 런타임 종속성과 빌드 종속성을 분리하여 크게 줄입니다. 최종 이미지의 크기(scikit-learn의 경우 ~2GB ~ 400MB)
# Dockerfile
# Stage 1: Builder - installa dipendenze
FROM python:3.12-slim AS builder
WORKDIR /build
# Installa tool di build
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc g++ && \
rm -rf /var/lib/apt/lists/*
# Copia requirements e installa in directory separata
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: Runtime - immagine finale minimale
FROM python:3.12-slim AS runtime
# Utente non-root per sicurezza
RUN useradd --create-home --shell /bin/bash mlserving
WORKDIR /app
# Copia dipendenze dal builder
COPY --from=builder /install /usr/local
# Copia il codice applicativo
COPY --chown=mlserving:mlserving app/ ./app/
# Crea directory modelli (i modelli sono montati come volume)
RUN mkdir -p /app/models && chown mlserving:mlserving /app/models
USER mlserving
# Healthcheck integrato
HEALTHCHECK --interval=30s --timeout=10s --start-period=20s --retries=3 \
CMD python -c "import httpx; r = httpx.get('http://localhost:8000/health'); exit(0 if r.status_code == 200 else 1)"
# Esposizione porta
EXPOSE 8000
# Avvio con Uvicorn: 4 worker, timeouts produzione
CMD ["uvicorn", "app.main:app",
"--host", "0.0.0.0",
"--port", "8000",
"--workers", "4",
"--timeout-keep-alive", "30",
"--access-log",
"--log-level", "info"]
생산에 Uvicorn 작업자는 몇 명입니까?
경험 법칙 e 2 x CPU 코어 + 1. vCPU가 2개 있는 포드의 경우 작업자 5개를 사용합니다. 경고: 각 작업자는 모델의 복사본을 메모리에 로드합니다. 500MB 모델과 작업자가 4명인 경우 컨테이너에는 약 2GB의 RAM이 필요합니다. 작업자 수와 메모리의 균형 유지 가능합니다. 대형 모델(LLM)의 경우 일괄 처리를 수행하는 작업자 1명이 최선의 선택인 경우가 많습니다.
BentoML: 모델 제공을 위한 전문화된 프레임워크
FastAPI는 일반 서비스에는 탁월하지만, 벤토ML 그리고 디자인된 특히 모델 제공을 위해 FastAPI의 많은 문제를 자동으로 해결합니다. 수동으로 관리해야 함: 동적 일괄 처리, 통합 모델 버전 관리, 실행기 추상화 독립적인 추론 확장 및 자동 Dockerfile 생성 e Kubernetes가 나타납니다.
# bentoml_service.py
import bentoml
import numpy as np
import pandas as pd
from bentoml.io import JSON
from pydantic import BaseModel, Field
from typing import Annotated
# 1. Salva il modello nel BentoML Model Store
# (esegui una sola volta dopo il training)
def save_model_to_store(sklearn_model, scaler):
"""Salva modello e scaler nel BentoML registry locale."""
bento_model = bentoml.sklearn.save_model(
"churn_classifier",
sklearn_model,
signatures={
"predict": {"batchable": True, "batch_dim": 0},
"predict_proba": {"batchable": True, "batch_dim": 0},
},
custom_objects={"scaler": scaler},
metadata={
"framework": "scikit-learn",
"task": "churn_prediction",
"training_date": "2025-01-15",
"metrics": {"auc_roc": 0.89, "f1": 0.82}
}
)
print(f"Modello salvato: {bento_model.tag}")
return bento_model
# 2. Definisci il Runner (inferenza scalabile)
# Il runner e l'astrazione che gestisce il modello ML
churn_runner = bentoml.sklearn.get("churn_classifier:latest").to_runner()
# 3. Definisci i Pydantic schemas
class ChurnRequest(BaseModel):
tenure_months: int = Field(..., ge=0, le=120)
monthly_charges: float = Field(..., ge=0)
total_charges: float = Field(..., ge=0)
num_products: int = Field(..., ge=1, le=10)
has_phone_service: bool
has_internet: bool
contract_type: str
payment_method: str
class ChurnResponse(BaseModel):
churn_prediction: int
churn_probability: float
model_tag: str
# 4. Definisci il Service BentoML
svc = bentoml.Service(
name="churn-prediction-service",
runners=[churn_runner]
)
@svc.api(
input=JSON(pydantic_model=ChurnRequest),
output=JSON(pydantic_model=ChurnResponse),
route="/predict"
)
async def predict(request: ChurnRequest) -> ChurnResponse:
"""Predizione churn con BentoML - batching automatico."""
# BentoML gestisce il batching automaticamente
# quando batchable=True e configurato nel runner
# Prepara features (stesso preprocessing del training)
features = preprocess(request)
# Chiamata async al runner (BentoML gestisce threading/batching)
prediction = await churn_runner.predict.async_run(features)
probability = await churn_runner.predict_proba.async_run(features)
return ChurnResponse(
churn_prediction=int(prediction[0]),
churn_probability=round(float(probability[0][1]), 4),
model_tag=str(bentoml.sklearn.get("churn_classifier:latest").tag)
)
def preprocess(request: ChurnRequest) -> np.ndarray:
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
return np.array([[
request.tenure_months,
request.monthly_charges,
request.total_charges,
request.num_products,
int(request.has_phone_service),
int(request.has_internet),
contract_map.get(request.contract_type, 0),
payment_map.get(request.payment_method, 0)
]])
세 가지 명령으로 BentoML을 사용하여 배포합니다.
# 1. Build del Bento (artifact deployabile)
bentoml build
# Output:
# Successfully built Bento(tag="churn-prediction-service:a1b2c3d4")
# Bento size: 245MB (modello + codice + deps)
# 2. Genera immagine Docker automaticamente
bentoml containerize churn-prediction-service:latest
# 3. Avvia il container
docker run -p 3000:3000 churn-prediction-service:latest
# Oppure: deploy su BentoCloud (managed)
# bentoml deploy churn-prediction-service:latest --name prod-churn
동적 일괄 처리: 처리량 극대화
Il 동적 일괄 처리 여러 개의 수신 요청을 수집하는 기술 단일 모델 호출로 함께 처리합니다. GPU에서는 특히 그렇습니다. GPU는 대규모 배치의 병렬 작업을 위해 설계되었기 때문에 효과적입니다. CPU에서는 이점이 작지만 고정 오버헤드가 높은 모델의 경우 여전히 중요합니다.
# app/batching/dynamic_batcher.py
import asyncio
import time
from collections import deque
from dataclasses import dataclass
from typing import Any
import logging
logger = logging.getLogger(__name__)
@dataclass
class PendingRequest:
"""Una singola richiesta in attesa di essere processata in batch."""
data: dict[str, Any]
future: asyncio.Future
arrival_time: float
class DynamicBatcher:
"""
Dynamic batcher per ML inference.
Raccoglie richieste per max_wait_ms millisecondi (o fino a
max_batch_size richieste) e poi le processa insieme.
Parametri da tuner per il proprio use case:
- max_batch_size: dipende dalla memoria GPU/CPU disponibile
- max_wait_ms: tradeoff tra latenza singola e throughput
"""
def __init__(
self,
predictor,
max_batch_size: int = 32,
max_wait_ms: float = 10.0
):
self._predictor = predictor
self._max_batch_size = max_batch_size
self._max_wait_ms = max_wait_ms
self._queue: deque[PendingRequest] = deque()
self._lock = asyncio.Lock()
self._batch_task: asyncio.Task | None = None
async def predict(self, data: dict[str, Any]) -> dict[str, Any]:
"""
Aggiunge la richiesta alla coda e attende il risultato.
Chiamata concorrente - sicura per uso multi-thread.
"""
loop = asyncio.get_event_loop()
future = loop.create_future()
pending = PendingRequest(
data=data,
future=future,
arrival_time=time.perf_counter()
)
async with self._lock:
self._queue.append(pending)
# Avvia il task di batch processing se non e già in esecuzione
if self._batch_task is None or self._batch_task.done():
self._batch_task = asyncio.create_task(
self._process_batch()
)
return await future
async def _process_batch(self) -> None:
"""Processa un batch di richieste."""
# Attendi max_wait_ms o fino a max_batch_size richieste
await asyncio.sleep(self._max_wait_ms / 1000)
async with self._lock:
if not self._queue:
return
# Prendi fino a max_batch_size richieste dalla coda
batch = []
while self._queue and len(batch) < self._max_batch_size:
batch.append(self._queue.popleft())
if not batch:
return
# Processa il batch (CPU-bound in threadpool)
try:
from starlette.concurrency import run_in_threadpool
items = [req.data for req in batch]
results = await run_in_threadpool(
self._predictor.predict_batch,
items
)
# Distribuisci i risultati ai rispettivi futures
for pending_req, result in zip(batch, results):
if not pending_req.future.done():
pending_req.future.set_result(result)
wait_times = [
(time.perf_counter() - req.arrival_time) * 1000
for req in batch
]
logger.info(
f"Batch processato: {len(batch)} items, "
f"attesa media {sum(wait_times)/len(wait_times):.1f}ms"
)
except Exception as e:
logger.error(f"Errore batch processing: {e}")
for pending_req in batch:
if not pending_req.future.done():
pending_req.future.set_exception(e)
Locust를 사용한 부하 테스트
프로덕션에 들어가기 전에 서비스 성능을 검증하는 것이 필수적입니다. 실제 부하 상태에서. 메뚜기 부하 테스트를 위한 표준 Python 도구, 직관적인 DSL을 사용하여 복잡한 사용자 행동을 시뮬레이션합니다.
# locustfile.py
from locust import HttpUser, task, between
import json
import random
SAMPLE_REQUESTS = [
{
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
},
{
"tenure_months": 60,
"monthly_charges": 45.0,
"total_charges": 2700.0,
"num_products": 2,
"has_phone_service": True,
"has_internet": False,
"contract_type": "two-year",
"payment_method": "bank"
},
]
class MLApiUser(HttpUser):
"""Simula un utente che chiama l'API di predizione."""
wait_time = between(0.1, 0.5) # Attesa tra richieste: 100-500ms
@task(weight=8)
def predict_single(self):
"""80% delle richieste: predizione singola."""
payload = random.choice(SAMPLE_REQUESTS)
with self.client.post(
"/api/v1/predict",
json=payload,
catch_response=True
) as response:
if response.status_code == 200:
data = response.json()
if "prediction" not in data:
response.failure("Response mancante campo 'prediction'")
else:
response.failure(f"Status code: {response.status_code}")
@task(weight=2)
def predict_batch(self):
"""20% delle richieste: batch prediction (10 items)."""
batch_size = random.randint(5, 20)
payload = {
"items": [
random.choice(SAMPLE_REQUESTS)
for _ in range(batch_size)
]
}
with self.client.post(
"/api/v1/predict/batch",
json=payload,
catch_response=True
) as response:
if response.status_code != 200:
response.failure(f"Batch failed: {response.status_code}")
@task(weight=1)
def health_check(self):
"""Health check periodico."""
self.client.get("/health/ready")
# Avvio load test:
# locust --headless --users 100 --spawn-rate 10 \
# --host http://localhost:8000 --run-time 2m \
# --html report.html
프레임워크 비교: 언제 무엇을 사용할지
제공 프레임워크의 선택은 상황에 따라 다릅니다. 실용적인 가이드는 다음과 같습니다.
| 프레임워크 | 이상적인 사용 사례 | 찬성 | 에 맞서 | 지연 시간(p99) |
|---|---|---|---|---|
| FastAPI + 유비콘 | 맞춤형 API, 마이크로서비스, Python 팀 | 최대의 유연성, 풍부한 생태계, 뛰어난 문서화 | 자동 배치 없음, 수동 모니터링 | 5-20ms |
| 벤토ML | 모델 패키징, ML 중심 팀 | 자동 배치, 통합 모델 저장소, 자동 Docker/K8s gen | 오버헤드 프레임워크, 학습 곡선 | 8-30ms |
| 토치서브 | 생산 중인 PyTorch 모델 | PyTorch에 최적화, TorchScript 지원, 다중 모델 | PyTorch 전용, Java 기반 내부 | 3-15ms |
| Triton 추론 서버 | 높은 처리량 GPU 제공 | 최대 GPU 성능, TensorRT, 멀티 프레임워크 | 복잡성이 높으며 NVIDIA GPU가 필요함 | 1~5ms(GPU) |
| MLflow 모델 | 신속한 프로토타이핑, MLflow 팀 | 기본 MLflow 통합, 구성 없음 | 트래픽이 많고 사용자 정의가 제한된 경우에는 적합하지 않음 | 20-100ms |
중소기업을 위한 권장 사항(예산 < 5K EUR/년)
모델 제공으로 시작하는 대부분의 이탈리아 SME의 경우 스택 FastAPI + Uvicorn + 도커 + 프로메테우스 + 그라파나 그리고 최적의 선택: 100% 오픈 소스이며 ML 프레임워크, 규모에 대한 전문 기술이 필요하지 않습니다. 필요할 때 Kubernetes를 통해 쉽게 사용할 수 있으며 지원을 위한 대규모 커뮤니티가 있습니다. BentoML은 팀에서 관리할 모델이 더 많고 원하는 경우 탐색해 볼 가치가 있습니다. 포장을 자동화합니다. Triton 및 TorchServe는 전용 GPU에만 관련됩니다. 5ms 미만의 대기 시간 요구 사항.
모범 사례 및 안티 패턴
전체 구현을 본 후 중요한 모범 사례를 요약해 보겠습니다. FastAPI를 사용하는 모델의 가장 일반적인 안티 패턴입니다.
절대 피해야 할 안티 패턴
- 각 요청마다 모델을 로드합니다. 로딩에 1~10초 소요 공연을 파괴합니다. 항상 수명 컨텍스트 관리자를 사용하세요.
- run_in_threadpool 없이 async def에서 모델을 호출합니다. 블록 이벤트 루프를 처리하고 서비스를 사실상 단일 스레드로 만듭니다.
- 입력 유효성 검사 없음: 비정상적인 값으로 인해 모델의 모호한 예외. 항상 엄격한 제약 조건을 적용하여 Pydantic을 사용하십시오.
- 상태 확인 준비 없음: Kubernetes가 전송을 시작합니다 모델이 로드되기 전에 트래픽이 발생하여 콜드 스타트 시 500 오류가 발생합니다.
- 핫 패스의 로그가 너무 장황합니다. 각 예측을 기록하세요. INFO 수준에서는 트래픽이 많을 때 자체적으로 병목 현상이 발생할 수 있습니다. 디버그 사용 단일 예측의 경우 INFO, 집계 통계의 경우.
기본 모범 사례
-
API 버전 관리: 항상 접두사를 사용하세요
/api/v1/. 입력의 주요 변경 사항으로 모델을 업데이트하는 경우/api/v2/기존 클라이언트와의 호환성을 위해 v1을 활성 상태로 유지합니다. -
명시적 시간 초과: 추론 시간 초과 구성(예: 5초)
와
asyncio.wait_for()느린 요청이 스레드 풀을 포화시키는 것을 방지합니다. -
회로 차단기: 차단기를 설치하여 정지시키다
오류율이 임계값(예: 50%)을 초과하면 모델에 요청을 보냅니다.
60초 안에). 도서관
pybreaker그리고 좋은 옵션. -
정상 종료: Uvicorn을 다음과 같이 구성하세요.
--timeout-graceful-shutdown 30컨테이너가 종료되기 전에 진행 중인 요청을 완료합니다. -
구조화된 로깅: 미국
structlog또는 JSON 형식 생산 로그용. Elasticsearch/Loki와의 통합이 용이합니다.
서비스 시작
모든 것이 구현되면 개발 및 생산 단계에서 간단하게 서비스를 시작합니다.
# Sviluppo locale (hot reload)
uvicorn app.main:app --reload --port 8000
# Produzione con Uvicorn diretto (senza Docker)
uvicorn app.main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--timeout-keep-alive 30 \
--access-log \
--log-level info \
--timeout-graceful-shutdown 30
# Con Docker Compose (raccomandato per produzione locale)
docker compose up -d
# Verifica il servizio
curl http://localhost:8000/health/ready
curl -X POST http://localhost:8000/api/v1/predict \
-H "Content-Type: application/json" \
-d '{"tenure_months": 24, "monthly_charges": 65.5, "total_charges": 1572.0, "num_products": 3, "has_phone_service": true, "has_internet": true, "contract_type": "month-to-month", "payment_method": "electronic"}'
# Documentazione API interattiva
# http://localhost:8000/docs (Swagger UI)
# http://localhost:8000/redoc (ReDoc)
# http://localhost:8000/metrics (Prometheus metrics)
# http://localhost:3000 (Grafana dashboard)
결론 및 다음 단계
이 가이드에서는 다음을 사용하여 프로덕션에 즉시 사용할 수 있는 모델 제공 서비스를 구축했습니다. FastAPI 및 Uvicorn: 라이프사이클 관리부터 Prometheus를 사용한 모니터링, 일괄 처리까지 다단계 빌드로 최적화된 동적 Dockerfile입니다. 우리도 봤어요 BentoML을 특화된 대안으로 활용하고 사용 가능한 주요 프레임워크를 비교했습니다. 2025년에.
테스트, 사전 구성된 Grafana 대시보드를 포함한 이 가이드의 전체 코드 및 Kubernetes 매니페스트가 있으며 MLOps 시리즈 GitHub 리포지토리에서 사용할 수 있습니다. FastAPI + Uvicorn + Docker + Prometheus 스택은 대부분을 포괄합니다. 최대 20~30명의 ML 엔지니어로 구성된 팀을 위한 모델 제공 사용 사례 콘텐츠 인프라와 최대의 유연성을 제공합니다.
서빙과 로를 마스터한 후의 자연스러운 다음 단계 스케일링 쿠버네티스에서: Horizon Pod Autoscaler를 사용하여 배포하고 더 많은 것을 관리하세요. 카나리아 릴리스를 포함한 모델 릴리스 및 복잡한 ML 파이프라인 조정 KubeFlow와 함께. 이에 대해서는 시리즈의 다음 기사에서 살펴보겠습니다.
MLOps 시리즈: 관련 기사
- MLOps: 실험에서 프로덕션까지 - ML 수명주기의 기본
- CI/CD가 포함된 ML 파이프라인: GitHub Actions + Docker - 교육 및 배포 자동화
- MLflow를 사용한 실험 추적 - 실험 및 모델 레지스트리 관리
- Kubernetes에서 ML 확장 - 다음 시리즈: 확장 및 오케스트레이션







