02 - CI/CD を使用した ML パイプライン: GitHub アクション + Docker
シリーズの最初の記事では、ML プロジェクトの 85% が本番環境に到達しない理由について説明しました。 そして、MLOps がこの問題をどのように解決するかについて説明します。モノリシックなノートブックをパイプラインに変えました モジュール式で構成可能。今こそ次のステップに進むときです。 全体を自動化する CI/CD を使用したパイプライン、コード、データ、または構成への変更を防止するため、 モデルのトレーニング、検証、デプロイメントを自動的にトリガーします。
この記事では、以下を使用して機械学習用の完全な CI/CD パイプラインを構築します。 GitHub アクション オーケストレーターとして、そして ドッカー のランタイムとして 実行。理論に限定せず、分類子を使用して実用的なプロジェクトを作成します。 マルチステージの Dockerfile、YAML ワークフロー、データ検証、モデル レジストリを備えた感情の分析 そして自動展開。
何を学ぶか
- ML 用 CI/CD が従来のソフトウェア CI/CD と異なる理由
- エンドツーエンドの ML パイプラインのアーキテクチャを設計する方法
- トレーニングとサービス用に最適化されたマルチステージ Dockerfile を作成する方法
- ML 用の完全な GitHub Actions ワークフローを作成する方法
- データ検証、モデル レジストリ、自動展開を統合する方法
- パイプラインで DVC を使用してデータのバージョン管理を管理する方法
- ML 固有のテスト (ユニット、統合、スモーク) を実装する方法
- デプロイ後にモデルを監視する方法
- キャッシュと自己ホスト型ランナーでコストを最適化する方法
- チームに適した CI/CD ツールを選択する方法
ML と異なるものに CI/CD を使用する理由
従来のソフトウェア開発の世界にいた人は、単に適用するだけだと考えるかもしれません。 同じ CI/CD プラクティスを ML プロジェクトに適用します。実際には、機械学習は複雑さをもたらします 特定のアプローチを必要とするユニークなもの。根本的な違いはソフトウェアにあります。 従来、CI/CD は 1 つのアーティファクト (コード) のみを管理しますが、ML では複数のアーティファクトを管理する必要があります。 コード、データ、モデルの 3 つを一度に.
ML の 3 つの成果物
従来のソフトウェアでは、コードが変更されなければ、出力も変更されません。 ML では、たとえ 同じコードでも、データを変更すると異なるモデルが生成されます。これはつまり、 CI/CD パイプラインは 3 つの独立した次元を追跡し、検証する必要があります。
| サイズ | 従来のCI/CD | ML 用の CI/CD |
|---|---|---|
| コード | Git プッシュ トリガーのビルドとテスト | Git プッシュ トリガーのトレーニング + 評価 |
| データ | 適用できない | 新しいデータが再トレーニングをトリガーする |
| モデル | 適用できない | 新しいモデルには検証とプロモーションが必要です |
| 構成 | 機能フラグ、環境変数 | ハイパーパラメータ、機能セット、メトリックしきい値 |
| 環境 | OS+ライブラリ | OS + ライブラリ + GPU ドライバー + CUDA バージョン |
| 検証 | 合否テスト | しきい値を上回る/下回るメトリクス + 本番環境のモデルとの比較 |
| 導入 | デプロイまたはロールバック | 段階的な導入 + A/B テスト + ドリフトの監視 |
継続的なトレーニング: 重要な概念
ML 用の CI/CD では、従来のソフトウェアには存在しない概念が導入されています。 の 継続的トレーニング (CT)。継続的インテグレーションと継続的だけでなく、 導入では、CT は次の場合にモデルが自動的に再トレーニングされるようにします。
- 新しいデータが到着します: データセットは新しい観測値で更新されます
- コードを変更します。 前処理またはアルゴリズムが変更される
- これらはメトリクスを劣化させます。 監視によりデータのドリフトやパフォーマンスの低下が検出される
- タイマーが期限切れになる: スケジュールされた再トレーニング (例: 毎週) がアクティブ化されている
一般的なエラー: CT なしの CI/CD
多くのチームは ML コードの CI/CD を実装していますが、継続的トレーニングのことを忘れています。 その結果、モデルは一度デプロイされ、その後は更新されなくなります。 本番環境のデータが相違するにつれて、時間の経過とともに静かに劣化します トレーニングデータから。 CT のないパイプラインは、メンテナンスのない自動車のようなものです。 壊れるまで動作します。
ML パイプライン アーキテクチャ
コードを記述する前に、パイプラインの完全なアーキテクチャを設計します。あらゆるフェーズ 特定の入力と出力があり、1 つのフェーズで障害が発生すると、後続のフェーズがブロックされます。これ 「フェイルファスト」アプローチにより、検証されたモデルのみが本番環境に到達します。
+------------------+ +------------------+ +------------------+
| DATA INGESTION |---->| PREPROCESSING |---->| TRAINING |
| | | | | |
| - Pull dati DVC | | - Pulizia | | - Train modello |
| - Validazione | | - Feature eng. | | - Log metriche |
| - Schema check | | - Split train/ | | - Log parametri |
| | | test/val | | - Salva artefatti|
+------------------+ +------------------+ +------------------+
| |
| (trigger: dati |
| nuovi/schedule) v
| +------------------+
| | EVALUATION |
| | |
| | - Metriche |
| | - Confronto con |
| | produzione |
| | - Gate: soglie |
| +------------------+
| |
| (se metriche > threshold)
| v
+------------------+ +------------------+ +------------------+
| MONITORING |<----| SMOKE TEST |<----| DEPLOYMENT |
| | | | | |
| - Health check | | - Test endpoint | | - Push registry |
| - Drift detect. | | - Predizione | | - Stage/Prod |
| - Alert | | di prova | | - Rollback ready |
| - Trigger retrain| | - Latenza check | | |
+------------------+ +------------------+ +------------------+
各ブロックは、GitHub Actions ワークフローのステップに対応します。さあ見てみましょう Docker によるコンテナ化から始まる各フェーズの実装方法を詳しく説明します。
機械学習用の Docker
Docker は、ML における最もイライラする問題の 1 つを解決します。 「私のマシンで動作します」。 トレーニングおよびサービス環境をコンテナ化することで、コードが確実に データ サイエンティストのラップトップ、CI/CD ランナーなど、どこで実行しても同じ結果が得られます。 そして生産中。 ML の場合、Docker には特別な注意が必要です。イメージは、 サイズが非常に大きいため (科学ライブラリ + GPU ドライバー)、ビルドが遅くなる可能性があります。
ML のベースイメージ
イメージ ベースの選択は、サイズと互換性にとって重要です。ここにオプションがあります 主なものとそれらをいつ使用するか。
| ベースイメージ | サイズ | 使用 | いつ選択するか |
|---|---|---|---|
| Python:3.11-スリム | ~120MB | CPUのトレーニング/サービング | Scikit-learn モデル、XGBoost、軽量サービス |
| Python:3.11-本の虫 | ~900MB | ビルドツールを使用したトレーニング | C/C++ コンパイルが必要な依存関係 |
| nvidia/cuda:12.1-ランタイム | ~3.5GB | GPU推論 | 深層学習モデルの提供 |
| nvidia/cuda:12.1-devel | ~5.2GB | GPUトレーニング | CUDA を使用した PyTorch/TensorFlow トレーニング |
| pytorch/pytorch:2.1.0-cuda12.1 | ~6GB | PyTorch のトレーニング/サービス提供 | 手動の CUDA セットアップを避けたい PyTorch プロジェクト |
トレーニングとサービス提供のための Dockerfile マルチステージ
多段階パターンは ML の基本です。 2 つの別々のステージを使用すると、次のようになります。 完全なビルド環境 (コンパイラとビルド ツールを含む) と最終イメージ 必要なランタイムのみが含まれる合理化されたものです。これにより画像のサイズが縮小されます 最終的には60%まで。
# ============================================
# Stage 1: Builder - installa dipendenze
# ============================================
FROM python:3.11-slim AS builder
WORKDIR /build
# Installa build tools necessari per compilare dipendenze native
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Copia e installa dipendenze in un virtual environment
COPY requirements.txt .
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# ============================================
# Stage 2: Training - esegue il training
# ============================================
FROM python:3.11-slim AS trainer
WORKDIR /app
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia codice sorgente
COPY src/ ./src/
COPY config/ ./config/
COPY train.py .
COPY evaluate.py .
# Entrypoint per training
ENTRYPOINT ["python", "train.py"]
# ============================================
# Stage 3: Serving - API di produzione
# ============================================
FROM python:3.11-slim AS serving
WORKDIR /app
# Utente non-root per sicurezza
RUN useradd --create-home appuser
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia solo il codice necessario per serving
COPY src/serving/ ./src/serving/
COPY src/preprocessing/ ./src/preprocessing/
# Healthcheck endpoint
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"
# Switch a utente non-root
USER appuser
# Porta di default
EXPOSE 8000
# Entrypoint per serving
ENTRYPOINT ["uvicorn", "src.serving.app:app", "--host", "0.0.0.0", "--port", "8000"]
ML にマルチステージを使用する理由
- 安全性: 提供イメージにはコンパイラやビルド ツールが含まれていません
- サイズ: サービングステージははるかに軽量です (~300 MB 対 ~1.2 GB)
- キャッシュ: レイヤーキャッシュを利用することで、コードよりも依存関係の変更頻度が低くなります。
- 柔軟性: トレーニングステージのみ、またはサービスステージのみを構築できます
レイヤーキャッシュの最適化
Dockerfile 内の COPY ステートメントの順序はキャッシュにとって重要です。 Python の依存関係
それらはめったに変更されませんが、ソースコードは頻繁に変更されます。まずコピーすることで、
requirements.txt 次にコードを作成すると、依存関係の再インストールが回避されます。
コードを変更するたびに。
# Dati e modelli (gestiti da DVC, non da Docker)
data/
models/
*.pkl
*.h5
*.pt
# Ambiente di sviluppo
.venv/
__pycache__/
*.pyc
.pytest_cache/
.mypy_cache/
# Git e CI
.git/
.github/
.dvc/cache/
# IDE e editor
.vscode/
.idea/
*.swp
# Documentazione
docs/
*.md
LICENSE
GPU をサポートする Docker
深層学習モデルをトレーニングするには、コンテナーでの GPU サポートが必要です。 Docker のサポート NVIDIA Container Toolkit を介した NVIDIA GPU。セットアップにはNVIDIAドライバーが必要です ホスト上とインストールされているツールキット。
# Base image con CUDA runtime
FROM nvidia/cuda:12.1.1-runtime-ubuntu22.04 AS gpu-trainer
WORKDIR /app
# Installa Python e dipendenze di sistema
RUN apt-get update && apt-get install -y --no-install-recommends \
python3.11 \
python3.11-venv \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Virtual environment
RUN python3.11 -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Dipendenze PyTorch con CUDA
COPY requirements-gpu.txt .
RUN pip install --no-cache-dir -r requirements-gpu.txt
COPY src/ ./src/
COPY train.py .
# Variabili ambiente per CUDA
ENV NVIDIA_VISIBLE_DEVICES=all
ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility
ENTRYPOINT ["python", "train.py"]
# Build dell'immagine GPU
docker build -f Dockerfile.gpu -t ml-trainer:gpu .
# Esecuzione con accesso GPU
docker run --gpus all \
-v $(pwd)/data:/app/data \
-v $(pwd)/models:/app/models \
ml-trainer:gpu \
--config config/training.yaml
機械学習のための GitHub アクション
GitHub Actions は、自動化されたワークフローを実行する GitHub に統合された CI/CD サービスです。 イベント (プッシュ、プル リクエスト、スケジュール、手動ディスパッチ) への応答。 ML の場合、次のことを実現します。 大きな利点: Git リポジトリとのネイティブ統合、アクションを備えたマーケットプレイス 事前定義された認証情報のシークレット管理と最大 2,000 分/月の無料利用 パブリックリポジトリの場合。
ML ワークフロー構造
ML 用の GitHub Actions ワークフローには、複数の一致するジョブという特定の構造があります。 ジョブと実行条件間の明示的な依存関係を備えたパイプライン ステージへの移行 モデルのメトリクスに基づきます。
name: ML Pipeline - Train, Evaluate, Deploy
on:
# Trigger su push al branch main (codice o config)
push:
branches: [main]
paths:
- 'src/**'
- 'config/**'
- 'requirements.txt'
- 'train.py'
- 'evaluate.py'
# Trigger schedulato per retraining periodico
schedule:
- cron: '0 6 * * 1' # Ogni lunedi alle 6:00 UTC
# Trigger manuale con parametri
workflow_dispatch:
inputs:
force_deploy:
description: 'Forza il deployment anche se le metriche non migliorano'
required: false
default: 'false'
type: choice
options:
- 'true'
- 'false'
training_config:
description: 'File di configurazione per il training'
required: false
default: 'config/training.yaml'
env:
PYTHON_VERSION: '3.11'
DOCKER_REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/ml-model
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
jobs:
# ============================================
# Job 1: Data Validation
# ============================================
data-validation:
name: Validate Data Quality
runs-on: ubuntu-latest
outputs:
data_valid: ${{ steps.validate.outputs.valid }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Setup DVC
uses: iterative/setup-dvc@v2
- name: Pull data from DVC
run: dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Validate data quality
id: validate
run: |
python -m src.data.validate_data \
--data-path data/raw/reviews.csv \
--schema-path config/data_schema.yaml
echo "valid=true" >> $GITHUB_OUTPUT
# ============================================
# Job 2: Model Training
# ============================================
training:
name: Train Model
needs: data-validation
if: needs.data-validation.outputs.data_valid == 'true'
runs-on: ubuntu-latest
outputs:
model_version: ${{ steps.train.outputs.model_version }}
run_id: ${{ steps.train.outputs.run_id }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Setup DVC and pull data
run: |
pip install dvc[s3]
dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Train model
id: train
run: |
python train.py \
--config ${{ github.event.inputs.training_config || 'config/training.yaml' }} \
--output-dir models/
echo "model_version=$(cat models/version.txt)" >> $GITHUB_OUTPUT
echo "run_id=$(cat models/run_id.txt)" >> $GITHUB_OUTPUT
env:
MLFLOW_TRACKING_URI: ${{ env.MLFLOW_TRACKING_URI }}
MLFLOW_TRACKING_USERNAME: ${{ secrets.MLFLOW_USERNAME }}
MLFLOW_TRACKING_PASSWORD: ${{ secrets.MLFLOW_PASSWORD }}
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: trained-model
path: models/
retention-days: 30
# ============================================
# Job 3: Model Evaluation
# ============================================
evaluation:
name: Evaluate Model
needs: training
runs-on: ubuntu-latest
outputs:
metrics_pass: ${{ steps.evaluate.outputs.metrics_pass }}
accuracy: ${{ steps.evaluate.outputs.accuracy }}
f1_score: ${{ steps.evaluate.outputs.f1_score }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
path: models/
- name: Setup DVC and pull test data
run: |
pip install dvc[s3]
dvc pull data/test/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Evaluate model
id: evaluate
run: |
python evaluate.py \
--model-path models/model.pkl \
--test-data data/test/reviews_test.csv \
--thresholds config/thresholds.yaml \
--output metrics/report.json
echo "accuracy=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"accuracy\"])')" >> $GITHUB_OUTPUT
echo "f1_score=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"f1_score\"])')" >> $GITHUB_OUTPUT
echo "metrics_pass=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"pass\"])')" >> $GITHUB_OUTPUT
- name: Post metrics as PR comment
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const metrics = JSON.parse(fs.readFileSync('metrics/report.json'));
const body = `## Model Evaluation Results
| Metric | Value | Threshold | Status |
|--------|-------|-----------|--------|
| Accuracy | ${metrics.accuracy.toFixed(4)} | ${metrics.thresholds.accuracy} | ${metrics.accuracy >= metrics.thresholds.accuracy ? 'PASS' : 'FAIL'} |
| F1 Score | ${metrics.f1_score.toFixed(4)} | ${metrics.thresholds.f1_score} | ${metrics.f1_score >= metrics.thresholds.f1_score ? 'PASS' : 'FAIL'} |
| AUC-ROC | ${metrics.auc_roc.toFixed(4)} | ${metrics.thresholds.auc_roc} | ${metrics.auc_roc >= metrics.thresholds.auc_roc ? 'PASS' : 'FAIL'} |`;
github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: body
});
- name: Upload evaluation report
uses: actions/upload-artifact@v4
with:
name: evaluation-report
path: metrics/
# ============================================
# Job 4: Build and Push Docker Image
# ============================================
build-image:
name: Build Docker Image
needs: [evaluation]
if: |
needs.evaluation.outputs.metrics_pass == 'true' ||
github.event.inputs.force_deploy == 'true'
runs-on: ubuntu-latest
outputs:
image_tag: ${{ steps.meta.outputs.tags }}
steps:
- uses: actions/checkout@v4
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
path: models/
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha,prefix=
type=raw,value=latest
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.DOCKER_REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push serving image
uses: docker/build-push-action@v5
with:
context: .
target: serving
push: true
tags: ${{ steps.meta.outputs.tags }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ============================================
# Job 5: Deploy to Staging + Smoke Test
# ============================================
deploy:
name: Deploy and Smoke Test
needs: [build-image, training]
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
echo "Deploying model version ${{ needs.training.outputs.model_version }}"
# Qui il comando di deploy reale (kubectl, docker-compose, etc.)
- name: Smoke test
run: |
# Attendi che il servizio sia pronto
for i in $(seq 1 30); do
if curl -sf http://staging:8000/health; then
echo "Service is ready"
break
fi
echo "Waiting for service... attempt $i"
sleep 5
done
# Test di predizione
RESPONSE=$(curl -sf -X POST http://staging:8000/predict \
-H "Content-Type: application/json" \
-d '{"text": "This product is amazing, I love it!"}')
echo "Prediction response: $RESPONSE"
# Verifica che la risposta sia valida
echo "$RESPONSE" | python -c "
import sys, json
data = json.load(sys.stdin)
assert 'prediction' in data, 'Missing prediction field'
assert 'confidence' in data, 'Missing confidence field'
assert data['confidence'] > 0.5, 'Low confidence'
print('Smoke test PASSED')
"
パイプラインにおける秘密とセキュリティ
資格情報を YAML ワークフローに直接入力しないでください。常に GitHub シークレットを使用する AWS キー、MLflow トークン、Docker レジストリ認証情報などの場合 機密情報。シークレットを設定する 設定 > シークレットと変数 > アクション GitHub リポジトリ内。
プロジェクト例: 感情分類子
すべてを具体的なプロジェクトにまとめてみましょう。感情分類器を構築します 完全な CI/CD パイプラインを備えた製品レビュー用。このプロジェクトでは scikit-learn を使用します。 シンプルですが、アーキテクチャは PyTorch モデルまたは TensorFlow モデルに同様に適用されます。
プロジェクトの構造
sentiment-classifier/
src/
data/
__init__.py
preprocessing.py # Pulizia e trasformazione testi
validate_data.py # Validazione schema e qualità
models/
__init__.py
trainer.py # Training del classificatore
serving/
__init__.py
app.py # FastAPI application
schemas.py # Pydantic schemas
monitoring/
__init__.py
health.py # Health checks
tests/
test_preprocessing.py # Unit test preprocessing
test_trainer.py # Unit test training
test_api.py # Integration test API
config/
training.yaml # Configurazione training
thresholds.yaml # Soglie metriche
data_schema.yaml # Schema validazione dati
train.py # Entrypoint training
evaluate.py # Entrypoint evaluation
Dockerfile # Multi-stage build
requirements.txt # Dipendenze Python
.github/
workflows/
ml-pipeline.yml # Pipeline CI/CD
.dvc/ # Configurazione DVC
dvc.yaml # Pipeline DVC
dvc.lock # Lock file DVC
トレーニングスクリプト
"""Script principale di training per il classificatore di sentiment."""
import argparse
import yaml
import mlflow
import mlflow.sklearn
from pathlib import Path
from datetime import datetime
from src.data.preprocessing import load_and_preprocess_data, split_dataset
from src.models.trainer import create_pipeline, train_model
def parse_args():
"""Parse degli argomenti da riga di comando."""
parser = argparse.ArgumentParser(
description="Train sentiment classifier"
)
parser.add_argument(
"--config",
type=str,
default="config/training.yaml",
help="Path al file di configurazione"
)
parser.add_argument(
"--output-dir",
type=str,
default="models/",
help="Directory per salvare il modello"
)
parser.add_argument(
"--experiment-name",
type=str,
default="sentiment-classifier",
help="Nome dell'esperimento MLflow"
)
return parser.parse_args()
def main():
"""Esegue la pipeline di training completa."""
args = parse_args()
# 1. Carica configurazione
with open(args.config) as f:
config = yaml.safe_load(f)
# 2. Setup MLflow
mlflow.set_experiment(args.experiment_name)
with mlflow.start_run(run_name=f"train-{datetime.now().strftime('%Y%m%d-%H%M%S')}") as run:
# 3. Log dei parametri
mlflow.log_params(config["model"])
mlflow.log_param("data_path", config["data"]["train_path"])
mlflow.log_param("test_size", config["data"]["test_size"])
# 4. Caricamento e preprocessing dati
print("[1/5] Caricamento e preprocessing dati...")
X, y = load_and_preprocess_data(config["data"]["train_path"])
# 5. Split dataset
print("[2/5] Split train/validation...")
X_train, X_val, y_train, y_val = split_dataset(
X, y,
test_size=config["data"]["test_size"],
random_state=config["data"]["random_state"]
)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("val_size", len(X_val))
# 6. Crea pipeline di preprocessing + modello
print("[3/5] Creazione pipeline ML...")
pipeline = create_pipeline(config["model"])
# 7. Training
print("[4/5] Training in corso...")
trained_pipeline = train_model(pipeline, X_train, y_train)
# 8. Valutazione su validation set
print("[5/5] Valutazione...")
from src.models.trainer import evaluate_model
metrics = evaluate_model(trained_pipeline, X_val, y_val)
# 9. Log metriche in MLflow
mlflow.log_metrics(metrics)
# 10. Salva modello
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
import joblib
model_path = output_dir / "model.pkl"
joblib.dump(trained_pipeline, model_path)
# Log modello in MLflow con signature
from mlflow.models.signature import infer_signature
signature = infer_signature(X_train[:5], trained_pipeline.predict(X_train[:5]))
mlflow.sklearn.log_model(
trained_pipeline,
"model",
signature=signature,
registered_model_name="sentiment-classifier"
)
# Salva versione e run_id per il CI/CD
(output_dir / "version.txt").write_text(run.info.run_id[:8])
(output_dir / "run_id.txt").write_text(run.info.run_id)
print(f"\nTraining completato!")
print(f" Run ID: {run.info.run_id}")
for name, value in metrics.items():
print(f" {name}: {value:.4f}")
if __name__ == "__main__":
main()
前処理モジュール
"""Modulo per il preprocessing dei dati testuali."""
import re
import pandas as pd
from typing import Tuple
from sklearn.model_selection import train_test_split
def clean_text(text: str) -> str:
"""Pulisce un singolo testo rimuovendo HTML, caratteri speciali e spazi extra."""
if not isinstance(text, str):
return ""
# Rimuovi tag HTML
text = re.sub(r'<[^>]+>', '', text)
# Rimuovi URL
text = re.sub(r'http\S+|www\.\S+', '', text)
# Rimuovi caratteri speciali (mantieni lettere, numeri, spazi)
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
# Normalizza spazi
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def load_and_preprocess_data(data_path: str) -> Tuple[pd.Series, pd.Series]:
"""Carica e preprocess il dataset di recensioni."""
df = pd.read_csv(data_path)
# Validazione colonne richieste
required_cols = ["review_text", "sentiment"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
raise ValueError(f"Colonne mancanti nel dataset: {missing}")
# Rimuovi righe con valori mancanti
df = df.dropna(subset=required_cols)
# Pulisci testi
df["clean_text"] = df["review_text"].apply(clean_text)
# Rimuovi testi vuoti dopo pulizia
df = df[df["clean_text"].str.len() > 0]
return df["clean_text"], df["sentiment"]
def split_dataset(
X: pd.Series,
y: pd.Series,
test_size: float = 0.2,
random_state: int = 42
) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series]:
"""Split stratificato del dataset."""
return train_test_split(
X, y,
test_size=test_size,
random_state=random_state,
stratify=y
)
トレーニングモジュール
"""Modulo per la creazione, il training e la valutazione del modello."""
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
accuracy_score, f1_score, precision_score,
recall_score, roc_auc_score
)
from typing import Dict, Any
import pandas as pd
MODELS = {
"logistic_regression": LogisticRegression,
"random_forest": RandomForestClassifier,
}
def create_pipeline(model_config: Dict[str, Any]) -> Pipeline:
"""Crea una pipeline sklearn con TF-IDF + classificatore."""
algorithm = model_config.get("algorithm", "logistic_regression")
model_class = MODELS.get(algorithm)
if model_class is None:
raise ValueError(
f"Algoritmo non supportato: {algorithm}. "
f"Supportati: {list(MODELS.keys())}"
)
# Parametri specifici del modello
model_params = {
k: v for k, v in model_config.items()
if k not in ("algorithm", "tfidf")
}
# Parametri TF-IDF
tfidf_params = model_config.get("tfidf", {})
return Pipeline([
("tfidf", TfidfVectorizer(
max_features=tfidf_params.get("max_features", 10000),
ngram_range=tuple(tfidf_params.get("ngram_range", [1, 2])),
min_df=tfidf_params.get("min_df", 2),
max_df=tfidf_params.get("max_df", 0.95),
)),
("classifier", model_class(**model_params)),
])
def train_model(
pipeline: Pipeline,
X_train: pd.Series,
y_train: pd.Series
) -> Pipeline:
"""Addestra la pipeline sul training set."""
pipeline.fit(X_train, y_train)
return pipeline
def evaluate_model(
pipeline: Pipeline,
X_test: pd.Series,
y_test: pd.Series
) -> Dict[str, float]:
"""Valuta il modello e restituisce tutte le metriche."""
y_pred = pipeline.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted"),
"precision": precision_score(y_test, y_pred, average="weighted"),
"recall": recall_score(y_test, y_pred, average="weighted"),
}
# AUC-ROC solo per classificazione binaria
if len(set(y_test)) == 2:
y_proba = pipeline.predict_proba(X_test)[:, 1]
metrics["auc_roc"] = roc_auc_score(y_test, y_proba)
return metrics
評価スクリプト
"""Script di valutazione del modello con confronto soglie."""
import argparse
import json
import yaml
import joblib
import pandas as pd
from pathlib import Path
from src.data.preprocessing import clean_text
from src.models.trainer import evaluate_model
def parse_args():
parser = argparse.ArgumentParser(description="Evaluate trained model")
parser.add_argument("--model-path", required=True, help="Path al modello .pkl")
parser.add_argument("--test-data", required=True, help="Path ai dati di test")
parser.add_argument("--thresholds", required=True, help="Path al file soglie YAML")
parser.add_argument("--output", required=True, help="Path per il report JSON")
return parser.parse_args()
def main():
args = parse_args()
# 1. Carica modello e dati
pipeline = joblib.load(args.model_path)
df = pd.read_csv(args.test_data)
df["clean_text"] = df["review_text"].apply(clean_text)
X_test = df["clean_text"]
y_test = df["sentiment"]
# 2. Valuta
metrics = evaluate_model(pipeline, X_test, y_test)
# 3. Confronta con soglie
with open(args.thresholds) as f:
thresholds = yaml.safe_load(f)["thresholds"]
all_pass = True
results = {}
for metric_name, threshold_value in thresholds.items():
actual = metrics.get(metric_name, 0.0)
passed = actual >= threshold_value
if not passed:
all_pass = False
results[metric_name] = {
"value": actual,
"threshold": threshold_value,
"pass": passed
}
# 4. Genera report
report = {
**metrics,
"thresholds": thresholds,
"details": results,
"pass": all_pass
}
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(report, f, indent=2)
# 5. Stampa risultati
print("\n=== Evaluation Report ===")
for name, detail in results.items():
status = "PASS" if detail["pass"] else "FAIL"
print(f" {name}: {detail['value']:.4f} (threshold: {detail['threshold']}) [{status}]")
print(f"\nOverall: {'PASS' if all_pass else 'FAIL'}")
# Exit code non-zero se le metriche non passano
if not all_pass:
print("\nWARNING: Le metriche non raggiungono le soglie minime!")
# Non usiamo exit(1) perchè il workflow legge l'output
if __name__ == "__main__":
main()
トレーニングとしきい値の構成
# Configurazione pipeline di training
data:
train_path: "data/raw/reviews.csv"
test_size: 0.2
random_state: 42
model:
algorithm: "logistic_regression"
max_iter: 1000
C: 1.0
random_state: 42
tfidf:
max_features: 15000
ngram_range: [1, 2]
min_df: 3
max_df: 0.9
mlflow:
experiment_name: "sentiment-classifier"
registered_model_name: "sentiment-classifier"
# Soglie minime per approvare il deployment
thresholds:
accuracy: 0.85
f1_score: 0.83
precision: 0.80
recall: 0.80
auc_roc: 0.90
# Confronto con modello in produzione
comparison:
# Il nuovo modello deve migliorare almeno dello 0.5%
min_improvement: 0.005
# Metriche su cui e richiesto il miglioramento
compare_metrics:
- f1_score
- auc_roc
ファイル要件
# Core ML
scikit-learn==1.4.0
pandas==2.2.0
numpy==1.26.3
# NLP preprocessing
nltk==3.8.1
# Experiment tracking
mlflow==2.10.0
# Model serving
fastapi==0.109.0
uvicorn==0.27.0
pydantic==2.5.3
# Data validation
pandera==0.18.0
great-expectations==0.18.8
# Data versioning
dvc[s3]==3.42.0
# Configuration
pyyaml==6.0.1
python-dotenv==1.0.1
# Serialization
joblib==1.3.2
# Testing
pytest==7.4.4
httpx==0.26.0
パイプラインでのデータ検証
モデルをトレーニングする前に、データが有効であることを確認する必要があります。 破損したデータでトレーニングされたモデルは、予測できない結果を生成します。検証日 そしてパイプラインの最初のゲート: データがチェックに合格しなかった場合、トレーニングは それは残らない。
"""Validazione della qualità dei dati con Pandera."""
import argparse
import sys
import yaml
import pandas as pd
import pandera as pa
from pandera import Column, Check, DataFrameSchema
def build_schema(schema_config: dict) -> DataFrameSchema:
"""Costruisce uno schema Pandera dalla configurazione YAML."""
columns = {}
for col_name, col_spec in schema_config["columns"].items():
checks = []
if "min_length" in col_spec:
checks.append(Check.str_length(min_value=col_spec["min_length"]))
if "max_length" in col_spec:
checks.append(Check.str_length(max_value=col_spec["max_length"]))
if "allowed_values" in col_spec:
checks.append(Check.isin(col_spec["allowed_values"]))
if "min_value" in col_spec:
checks.append(Check.greater_than_or_equal_to(col_spec["min_value"]))
if "max_value" in col_spec:
checks.append(Check.less_than_or_equal_to(col_spec["max_value"]))
columns[col_name] = Column(
dtype=col_spec.get("dtype", "object"),
nullable=col_spec.get("nullable", False),
checks=checks if checks else None
)
return DataFrameSchema(
columns=columns,
coerce=True,
strict=schema_config.get("strict", False)
)
def validate_data(data_path: str, schema_path: str) -> bool:
"""Valida il dataset contro lo schema definito."""
# Carica schema
with open(schema_path) as f:
schema_config = yaml.safe_load(f)
schema = build_schema(schema_config)
# Carica dati
df = pd.read_csv(data_path)
# Controlla dimensione minima
min_rows = schema_config.get("min_rows", 100)
if len(df) < min_rows:
print(f"FAIL: Dataset ha {len(df)} righe, minimo richiesto: {min_rows}")
return False
# Controlla duplicati
max_duplicates_pct = schema_config.get("max_duplicates_pct", 0.05)
duplicates_pct = df.duplicated().mean()
if duplicates_pct > max_duplicates_pct:
print(f"FAIL: {duplicates_pct:.1%} duplicati (max: {max_duplicates_pct:.1%})")
return False
# Valida schema
try:
schema.validate(df, lazy=True)
print(f"PASS: Dataset valido ({len(df)} righe, {len(df.columns)} colonne)")
return True
except pa.errors.SchemaErrors as e:
print(f"FAIL: Schema validation errors:")
print(e.failure_cases.head(20))
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", required=True)
parser.add_argument("--schema-path", required=True)
args = parser.parse_args()
valid = validate_data(args.data_path, args.schema_path)
sys.exit(0 if valid else 1)
# Schema per il dataset di recensioni
min_rows: 1000
max_duplicates_pct: 0.05
strict: false
columns:
review_text:
dtype: "object"
nullable: false
min_length: 10
max_length: 5000
sentiment:
dtype: "int64"
nullable: false
allowed_values: [0, 1]
rating:
dtype: "float64"
nullable: true
min_value: 1.0
max_value: 5.0
review_date:
dtype: "object"
nullable: true
パイプライン内の DVC によるデータのバージョニング
データは Git には大きすぎますが、バージョン管理して同期する必要があります
CI/CD パイプライン内。 DVC (データ バージョン コントロール) この問題を解決します:
データをリモート ストレージ (S3、GCS、Azure BLOB) に保存し、Git のみで追跡する
メタデータ (ハッシュ、サイズ)。 GitHub アクションで使用できるもの dvc pull のために
現在のコミットに関連付けられているデータのバージョンを正確にダウンロードします。
stages:
prepare:
cmd: python -m src.data.preprocessing --config config/training.yaml
deps:
- src/data/preprocessing.py
- config/training.yaml
- data/raw/reviews.csv
outs:
- data/processed/train.csv
- data/processed/test.csv
train:
cmd: python train.py --config config/training.yaml
deps:
- train.py
- src/models/trainer.py
- data/processed/train.csv
- config/training.yaml
outs:
- models/model.pkl
metrics:
- metrics/train_metrics.json:
cache: false
evaluate:
cmd: >-
python evaluate.py
--model-path models/model.pkl
--test-data data/processed/test.csv
--thresholds config/thresholds.yaml
--output metrics/eval_metrics.json
deps:
- evaluate.py
- models/model.pkl
- data/processed/test.csv
- config/thresholds.yaml
metrics:
- metrics/eval_metrics.json:
cache: false
# Inizializza DVC
dvc init
# Configura storage remoto (S3 in questo esempio)
dvc remote add -d myremote s3://my-ml-data-bucket/sentiment-classifier
dvc remote modify myremote region eu-west-1
# Traccia il dataset
dvc add data/raw/reviews.csv
# Committa i file DVC in Git
git add data/raw/reviews.csv.dvc data/raw/.gitignore dvc.yaml dvc.lock
git commit -m "feat: add DVC tracking for training data"
# Push dei dati su S3
dvc push
DVC + GitHub アクション: 仕組み
- Git トラック ファイル
.dvc(メタデータ: SHA256 ハッシュ、サイズ) - 実際のデータは S3 (または GCS、Azure Blob、Google Drive) に存在します。
- GitHub Actions ワークフローが実行されます
dvc pullデータをダウンロードするには - S3 認証情報は GitHub シークレット経由で渡されます
- 各 Git コミットはデータの正確なバージョンに対応します
MLflow を使用したモデル レジストリ
モデル レジストリは、モデルのライフサイクルを管理するコンポーネントです。 トレーニング。トレーニングされた各モデルは、名前、バージョン、および ステータス (ステージング、本番、アーカイブ)。 CI/CD パイプラインはレジストリと対話します。 検証のしきい値を超えるモデルをプロモートします。
"""Script per la promozione del modello nel registry MLflow."""
import mlflow
from mlflow.tracking import MlflowClient
def promote_model(
model_name: str,
run_id: str,
target_stage: str = "Production"
) -> None:
"""Promuove un modello a Production nel registry."""
client = MlflowClient()
# Cerca la versione del modello associata al run
model_versions = client.search_model_versions(
f"name='{model_name}'"
)
target_version = None
for mv in model_versions:
if mv.run_id == run_id:
target_version = mv.version
break
if target_version is None:
raise ValueError(
f"Nessun modello trovato per run_id={run_id}"
)
# Archivia il modello attualmente in Production
for mv in model_versions:
if mv.current_stage == "Production":
client.transition_model_version_stage(
name=model_name,
version=mv.version,
stage="Archived",
archive_existing_versions=False
)
print(f"Archiviato modello v{mv.version} (precedente Production)")
# Promuovi il nuovo modello
client.transition_model_version_stage(
name=model_name,
version=target_version,
stage=target_stage
)
print(f"Promosso modello v{target_version} a {target_stage}")
def load_production_model(model_name: str):
"""Carica il modello attualmente in Production."""
model_uri = f"models:/{model_name}/Production"
return mlflow.sklearn.load_model(model_uri)
モデルの署名と入力例
モデル署名は、モデルの入力と出力のパターンを文書化します。 これは、文書化と自動検証の両方として機能します。 異なるスキーマで入力を渡そうとすると、MLflow は明確なエラーをスローします。
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec
# Definisci la signature esplicita
input_schema = Schema([
ColSpec("string", "text")
])
output_schema = Schema([
ColSpec("long", "prediction")
])
signature = ModelSignature(
inputs=input_schema,
outputs=output_schema
)
# Esempio di input per documentazione
input_example = {
"text": "This product is excellent, highly recommended!"
}
# Registra il modello con signature e esempio
mlflow.sklearn.log_model(
sk_model=trained_pipeline,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="sentiment-classifier"
)
ML パイプラインでのテスト
ML のテストは、従来のソフトウェア テストよりも複雑です。チェックするだけでは不十分です コードが「機能する」こと: データの品質、コードの正確さをテストする必要があります。 前処理、トレーニングの安定性、API の動作の提供。の CI/CD パイプラインは 3 レベルのテストを実行します。
前処理の単体テスト
"""Unit test per il modulo di preprocessing."""
import pytest
import pandas as pd
from src.data.preprocessing import clean_text, load_and_preprocess_data
class TestCleanText:
"""Test per la funzione clean_text."""
def test_removes_html_tags(self):
assert clean_text("<p>Hello</p>") == "hello"
def test_removes_urls(self):
assert clean_text("Visit http://example.com for info") == "visit for info"
def test_removes_special_characters(self):
assert clean_text("Hello!!! World???") == "hello world"
def test_normalizes_whitespace(self):
assert clean_text("Hello World") == "hello world"
def test_lowercases(self):
assert clean_text("HELLO WORLD") == "hello world"
def test_empty_string(self):
assert clean_text("") == ""
def test_none_input(self):
assert clean_text(None) == ""
def test_numeric_preserved(self):
assert clean_text("Rating 5 out of 10") == "rating 5 out of 10"
class TestLoadAndPreprocess:
"""Test per il caricamento e preprocessing dei dati."""
def test_missing_columns_raises(self, tmp_path):
"""Verifica che colonne mancanti generino un errore."""
df = pd.DataFrame({"wrong_col": ["text"]})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
with pytest.raises(ValueError, match="Colonne mancanti"):
load_and_preprocess_data(str(csv_path))
def test_drops_na_rows(self, tmp_path):
"""Verifica che le righe con NA vengano rimosse."""
df = pd.DataFrame({
"review_text": ["Good product", None, "Bad product"],
"sentiment": [1, 0, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert len(X) == 2
def test_output_types(self, tmp_path):
"""Verifica i tipi di output."""
df = pd.DataFrame({
"review_text": ["Great product", "Terrible service"],
"sentiment": [1, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert isinstance(X, pd.Series)
assert isinstance(y, pd.Series)
パイプラインの統合テスト
"""Integration test per il training e la valutazione del modello."""
import pytest
import pandas as pd
from src.models.trainer import create_pipeline, train_model, evaluate_model
@pytest.fixture
def sample_data():
"""Crea un dataset di test sintetico."""
texts = [
"amazing product love it", "terrible waste of money",
"great quality recommended", "horrible experience never again",
"excellent value for price", "poor quality disappointed",
"best purchase ever made", "worst product i bought",
"fantastic results happy", "awful terrible regret buying",
] * 10 # Ripetuto per avere abbastanza dati
sentiments = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0] * 10
return pd.Series(texts), pd.Series(sentiments)
class TestPipeline:
"""Test per la pipeline di training."""
def test_create_pipeline_logistic(self):
"""Verifica creazione pipeline con LogisticRegression."""
config = {"algorithm": "logistic_regression", "max_iter": 100}
pipeline = create_pipeline(config)
assert len(pipeline.steps) == 2
def test_create_pipeline_invalid_algorithm(self):
"""Verifica errore con algoritmo non supportato."""
with pytest.raises(ValueError, match="non supportato"):
create_pipeline({"algorithm": "invalid_algo"})
def test_train_and_evaluate(self, sample_data):
"""Test end-to-end: training + evaluation."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
pipeline = create_pipeline(config)
trained = train_model(pipeline, X[:80], y[:80])
metrics = evaluate_model(trained, X[80:], y[80:])
assert "accuracy" in metrics
assert "f1_score" in metrics
assert 0.0 <= metrics["accuracy"] <= 1.0
assert 0.0 <= metrics["f1_score"] <= 1.0
def test_model_deterministic(self, sample_data):
"""Verifica che il training sia deterministico con seed fisso."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
p1 = train_model(create_pipeline(config), X[:80], y[:80])
p2 = train_model(create_pipeline(config), X[:80], y[:80])
m1 = evaluate_model(p1, X[80:], y[80:])
m2 = evaluate_model(p2, X[80:], y[80:])
assert m1["accuracy"] == m2["accuracy"]
給仕用の煙テスト
"""Smoke test per l'API di serving FastAPI."""
import pytest
from httpx import AsyncClient, ASGITransport
from src.serving.app import app
@pytest.fixture
def client():
"""Client HTTP per testare l'API."""
transport = ASGITransport(app=app)
return AsyncClient(transport=transport, base_url="http://test")
@pytest.mark.asyncio
async def test_health_endpoint(client):
"""Verifica che l'endpoint /health risponda 200."""
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
@pytest.mark.asyncio
async def test_predict_positive(client):
"""Verifica predizione per testo positivo."""
response = await client.post(
"/predict",
json={"text": "This product is amazing, I love it!"}
)
assert response.status_code == 200
data = response.json()
assert "prediction" in data
assert "confidence" in data
assert data["confidence"] > 0.0
@pytest.mark.asyncio
async def test_predict_empty_text(client):
"""Verifica errore con testo vuoto."""
response = await client.post(
"/predict",
json={"text": ""}
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_predict_batch(client):
"""Verifica predizione batch."""
response = await client.post(
"/predict/batch",
json={"texts": [
"Great product",
"Terrible experience"
]}
)
assert response.status_code == 200
data = response.json()
assert len(data["predictions"]) == 2
# Aggiungere questo job prima del training nel workflow
tests:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest-asyncio pytest-cov
- name: Run unit tests
run: pytest tests/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: coverage.xml
FastAPI による API の提供
トレーニングされたモデルには、REST API 経由でアクセスできる必要があります。 FastAPI と選択肢 Python での ML 提供に最適です。また、高速で、自動入力検証機能を備えています。 Pydantic は OpenAPI ドキュメントを自動的に生成します。
"""API di serving per il classificatore di sentiment."""
import os
import time
import joblib
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from src.serving.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse,
HealthResponse
)
from src.data.preprocessing import clean_text
# Variabili globali per il modello
model_pipeline = None
model_version = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
global model_pipeline, model_version
model_path = os.getenv("MODEL_PATH", "models/model.pkl")
if not Path(model_path).exists():
raise RuntimeError(f"Modello non trovato: {model_path}")
model_pipeline = joblib.load(model_path)
model_version = os.getenv("MODEL_VERSION", "unknown")
print(f"Modello caricato: v{model_version}")
yield
model_pipeline = None
app = FastAPI(
title="Sentiment Classifier API",
version="1.0.0",
lifespan=lifespan
)
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check dell'API."""
return HealthResponse(
status="healthy" if model_pipeline is not None else "unhealthy",
model_version=model_version or "not loaded"
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Predizione singola."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned = clean_text(request.text)
if not cleaned:
raise HTTPException(status_code=422, detail="Text is empty after cleaning")
prediction = model_pipeline.predict([cleaned])[0]
probabilities = model_pipeline.predict_proba([cleaned])[0]
confidence = float(max(probabilities))
latency_ms = (time.time() - start) * 1000
return PredictionResponse(
prediction=int(prediction),
confidence=confidence,
label="positive" if prediction == 1 else "negative",
latency_ms=round(latency_ms, 2)
)
@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch su più testi."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned_texts = [clean_text(t) for t in request.texts]
predictions = model_pipeline.predict(cleaned_texts)
probabilities = model_pipeline.predict_proba(cleaned_texts)
latency_ms = (time.time() - start) * 1000
results = []
for i, text in enumerate(request.texts):
results.append(PredictionResponse(
prediction=int(predictions[i]),
confidence=float(max(probabilities[i])),
label="positive" if predictions[i] == 1 else "negative",
latency_ms=0
))
return BatchPredictionResponse(
predictions=results,
total_latency_ms=round(latency_ms, 2)
)
"""Pydantic schemas per l'API di serving."""
from pydantic import BaseModel, Field
from typing import List
class PredictionRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=5000,
description="Testo della recensione")
class PredictionResponse(BaseModel):
prediction: int = Field(..., description="0=negativo, 1=positivo")
confidence: float = Field(..., ge=0.0, le=1.0,
description="Confidenza della predizione")
label: str = Field(..., description="Label leggibile")
latency_ms: float = Field(..., description="Latenza in millisecondi")
class BatchPredictionRequest(BaseModel):
texts: List[str] = Field(..., min_length=1, max_length=100,
description="Lista di testi")
class BatchPredictionResponse(BaseModel):
predictions: List[PredictionResponse]
total_latency_ms: float
class HealthResponse(BaseModel):
status: str
model_version: str
導入後のモニタリング
デプロイはパイプラインの終わりではなく、最も重要なフェーズの始まりです。 本番環境でのモニタリング。 ML モデルは、次の場合に静かに劣化する可能性があります。 実際のデータはトレーニング データとは異なります。パイプラインにはヘルスチェックが含まれている必要があります 継続的な予測ログと自動再トレーニング トリガー。
"""Monitoring post-deployment per il modello ML."""
import time
import logging
from datetime import datetime, timedelta
from collections import deque
from typing import Dict, Optional
from dataclasses import dataclass, field
@dataclass
class PredictionLog:
"""Log di una singola predizione."""
timestamp: datetime
input_text: str
prediction: int
confidence: float
latency_ms: float
class ModelMonitor:
"""Monitora le performance del modello in produzione."""
def __init__(
self,
window_size: int = 1000,
min_confidence_threshold: float = 0.6,
max_latency_ms: float = 500.0,
drift_check_interval: int = 100
):
self.window_size = window_size
self.min_confidence = min_confidence_threshold
self.max_latency = max_latency_ms
self.drift_check_interval = drift_check_interval
self.predictions: deque = deque(maxlen=window_size)
self.alert_callbacks = []
self.prediction_count = 0
self.logger = logging.getLogger("model_monitor")
def log_prediction(self, log: PredictionLog) -> None:
"""Registra una predizione e verifica le metriche."""
self.predictions.append(log)
self.prediction_count += 1
# Check latenza
if log.latency_ms > self.max_latency:
self._alert(
"HIGH_LATENCY",
f"Latenza {log.latency_ms:.0f}ms supera soglia {self.max_latency}ms"
)
# Check confidenza bassa
if log.confidence < self.min_confidence:
self._alert(
"LOW_CONFIDENCE",
f"Confidenza {log.confidence:.2f} sotto soglia {self.min_confidence}"
)
# Check periodico per drift
if self.prediction_count % self.drift_check_interval == 0:
self._check_distribution_drift()
def get_metrics(self) -> Dict:
"""Restituisce le metriche correnti della finestra."""
if not self.predictions:
return {"status": "no_data"}
recent = list(self.predictions)
confidences = [p.confidence for p in recent]
latencies = [p.latency_ms for p in recent]
predictions = [p.prediction for p in recent]
positive_rate = sum(1 for p in predictions if p == 1) / len(predictions)
return {
"total_predictions": self.prediction_count,
"window_size": len(recent),
"avg_confidence": sum(confidences) / len(confidences),
"min_confidence": min(confidences),
"avg_latency_ms": sum(latencies) / len(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"positive_rate": positive_rate,
"low_confidence_pct": sum(
1 for c in confidences if c < self.min_confidence
) / len(confidences),
}
def _check_distribution_drift(self) -> None:
"""Verifica se la distribuzione delle predizioni e cambiata."""
if len(self.predictions) < self.window_size:
return
recent = list(self.predictions)
half = len(recent) // 2
first_half = [p.prediction for p in recent[:half]]
second_half = [p.prediction for p in recent[half:]]
rate_first = sum(first_half) / len(first_half)
rate_second = sum(second_half) / len(second_half)
# Se la distribuzione cambia più del 15%, segnala drift
if abs(rate_first - rate_second) > 0.15:
self._alert(
"DISTRIBUTION_DRIFT",
f"Positive rate cambiato: {rate_first:.2f} -> {rate_second:.2f}"
)
def _alert(self, alert_type: str, message: str) -> None:
"""Invia un alert."""
self.logger.warning(f"[{alert_type}] {message}")
for callback in self.alert_callbacks:
callback(alert_type, message)
監視すべき主要な指標
- レイテンシー (p50、p95、p99): APIの応答時間
- スループット: 1 秒あたりの予測数
- 予測分布: ポジティブ/ネガティブ比率の変化
- 中程度の信頼度: 減少はモデルが「不確実」であることを示します
- エラー率: HTTPエラー率5xx
- 日付のずれ: 本番データとトレーニングデータの間の相違
コストと最適化
GitHub Actions では、パブリック リポジトリの場合は 2,000 分/月、リポジトリの場合は 500 分が無料で提供されます。 プライベートリポジトリ(無料プラン)。 ML トレーニングでは、これらの時間をすぐに消費してしまう可能性があります。 最適化する方法は次のとおりです。
キャッシュ戦略
# Cache delle dipendenze Python
- name: Cache pip packages
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
# Cache del dataset DVC (evita download ripetuti)
- name: Cache DVC data
uses: actions/cache@v4
with:
path: |
data/
.dvc/cache/
key: dvc-${{ hashFiles('data/*.dvc', 'dvc.lock') }}
restore-keys: |
dvc-
# Cache della Docker layer
- name: Build with cache
uses: docker/build-push-action@v5
with:
context: .
cache-from: type=gha
cache-to: type=gha,mode=max
GPU を備えたセルフホスト ランナー
GPU トレーニングの場合、GitHub でホストされているランナーだけでは十分ではありません (GPU がありません)。解決策 GPU を備えたマシン上のセルフホスト ランナー。これにより、1分あたりのコストも削減されます GitHub アクションによる。
# 1. Sulla macchina con GPU, scarica il runner
mkdir actions-runner && cd actions-runner
curl -o actions-runner.tar.gz -L \
https://github.com/actions/runner/releases/download/v2.311.0/actions-runner-linux-x64-2.311.0.tar.gz
tar xzf actions-runner.tar.gz
# 2. Configura il runner
./config.sh --url https://github.com/YOUR_ORG/YOUR_REPO \
--token YOUR_TOKEN \
--labels gpu,cuda12,ml-training
# 3. Installa come servizio
sudo ./svc.sh install
sudo ./svc.sh start
training:
name: Train Model (GPU)
runs-on: [self-hosted, gpu, cuda12]
# Il job viene eseguito sulla macchina con GPU
steps:
- uses: actions/checkout@v4
- name: Train with GPU
run: |
nvidia-smi # Verifica GPU disponibile
python train.py --config config/training-gpu.yaml --device cuda
シナリオのコスト見積もり
| シナリオ | 分/月 | GitHub アクションのコスト | 総コスト |
|---|---|---|---|
| プロトタイプ (手動トレーニング、scikit-learn) | ~200 | 無料(フリープラン) | ~0 ユーロ/月 |
| PMI (毎週のトレーニング、中型モデル) | ~800 | ~12ユーロ/月 | ~50 ユーロ/月 (S3 を含む) |
| スケールアップ(日々のトレーニング、ディープラーニング) | ~3,000 | ~48ユーロ/月 | ~200 ユーロ/月 (クラウド GPU を使用) |
| エンタープライズ (マルチモデル、継続的トレーニング) | ~10,000以上 | 自己ホスト型ランナー | ~500ユーロ以上/月 |
ML 用の CI/CD ツールの比較
GitHub Actions が唯一のオプションではありません。各ツールには、それに応じた特有の利点があります コンテキストの。ここでは、選択に役立つ実用的な比較を示します。
| 特性 | GitHub アクション | GitLab CI | ジェンキンス | アルゴのワークフロー |
|---|---|---|---|---|
| 設定 | ゼロ(一体型) | ゼロ(一体型) | 専用サーバー | Kubernetes クラスター |
| GPUのサポート | 自己ホスト型ランナー | 自己ホスト型ランナー | NVIDIA プラグイン | ネイティブ (K8s GPU) |
| コスト(小規模チーム) | 無料/低料金 | 無料/低料金 | サーバーコスト | K8s クラスターのコスト |
| 平行度 | 良い (マトリックス) | 良い | 最適 | 優れた (DAG) |
| コードとしてのパイプライン | YAML | YAML | グルービー/YAML | YAML/Python SDK |
| ML エコシステム | 広大な市場 | 良い | プラグイン | クラウドネイティブ |
| 学習曲線 | 低い | 低い | 平均 | 高い |
| に最適 | 中小規模のチーム、GitHub リポジトリ | GitLab 上のチーム、自己管理 | エンタープライズ、オンプレミス | チーム K8、複雑なパイプライン |
どれを選びますか?
- 始まりとプロトタイプ: GitHub Actions - ゼロセットアップ、ネイティブ統合、パブリックリポジトリは無料
- GitLab のチーム: GitLab CI - GitLab とのネイティブ統合、優れたレジストリ コンテナ
- エンタープライズオンプレミス: Jenkins - 最大限の柔軟性、成熟したプラグイン エコシステム
- K8 を使用したクラウドネイティブ チーム: Argo ワークフロー - DAG パイプライン、ネイティブ スケーリング、複雑な ML に最適
年間 5,000 ユーロ未満でセットアップを完了
完全な ML CI/CD パイプラインを実装したい SMB の場合、次のことが可能です。 オープンソース ツールとクラウド サービスを使用して年間 5,000 ユーロ未満に抑える 無料または低コストのレベルで。おすすめのスタックはこちらです。
| 成分 | ツール | 年間コスト | 注意事項 |
|---|---|---|---|
| リポジトリ + CI/CD | GitHub (チーム) | ~400ユーロ | 3,000 分/月 アクションを含む |
| データストレージ | AWS S3 | ~120ユーロ | ~500 GBのデータセット、転送を含む |
| 実験の追跡 | MLflow (自己ホスト型) | ~0ユーロ | オープンソース、クラウド VM 上にデプロイ |
| モデルレジストリ | MLflow モデル レジストリ | ~0ユーロ | MLflow に含まれる |
| コンテナレジストリ | GitHub コンテナ レジストリ | ~0ユーロ | GitHub に付属 |
| ホスティングモデル | クラウド VM (e2-medium) | ~500ユーロ | FastAPI + MLflow サーバーを提供する場合 |
| データのバージョン管理 | DVC | ~0ユーロ | オープンソース、ストレージはすでに上記にカウントされています |
| 監視 | プロメテウス + グラファナ | ~0ユーロ | オープンソース、同じ VM 上 |
| データの検証 | パンデラ / グレート・エクスペクテーション | ~0ユーロ | オープンソース |
| GPU トレーニング (不定期) | クラウド GPU スポット インスタンス | ~600ユーロ | ~50 時間/月 T4 スポット |
推定合計額: ~1,620 ユーロ/年 - 予算は 5,000 ユーロを大幅に下回ります。 スケーリングのためのマージンを持たせます。最大のコストは GPU トレーニングです: モデルを使用する場合 クラシック (scikit-learn、XGBoost) では、GPU コストがゼロになります。
隠れたコストに注意
工具の費用はほんの一部です。最も重要なコストとチーム時間は次のとおりです。 初期パイプラインのセットアップには、経験豊富なエンジニアの場合約 2 ~ 4 週間かかります。 継続的なメンテナンスには週に約 2 ~ 4 時間かかります。のコストも計算します クラウド サービス間のデータ転送 (送信)。大規模なデータセットでは急速に増加する可能性があります。
結論と次のステップ
この記事では、機械学習用の完全な CI/CD パイプラインを構築しました。 マルチステージの Dockerfile から、データ検証、トレーニングを含む GitHub Actions ワークフローまで 評価と自動展開。覚えておくべき重要な概念は次のとおりです。
- ML の CI/CD は 3 つのアーティファクトを処理します (コード、データ、モデル) と継続的トレーニングの導入
- 多段階Docker 最適化されたイメージの個別のビルド、トレーニング、提供
- GitHub アクション 依存ジョブと条件付きジョブを使用してパイプライン全体を調整します
- データの検証 そして最初のゲート: 破損したデータにより使用できないモデルが生成される
- モデルレジストリ テンプレートのリリースとプロモーションを管理します
- ML テスト ユニット、統合、スモークテストの 3 つのレベルをカバーします。
- 監視 導入後、ドリフトと劣化を検出するために重要
- コストは管理可能です: オープンソース ツールを使用すると、年間 2,000 ユーロ未満に抑えられます
私たちが構築したパイプラインは、 成熟度モデルのレベル 2 Google MLOps: トレーニングと導入を完全に自動化します。 統合された検証と監視。これを踏まえて次の記事で さらに深く掘り下げていきます MLフロー 高度な実験追跡用のモデル レジストリ そしてアーティファクトの管理。
シリーズロードマップ
- 第1条: MLOps: 実験から実稼働へ (完了)
- 第2条: CI/CD を使用した ML パイプライン: GitHub Actions + Docker (この記事)
- 第3条: MLflow の詳細 - 実験の追跡とモデル レジストリ
- 第4条: DVC - ML のデータのバージョニング
- 第5条: FastAPI と Docker で提供されるスケーラブルなモデル
- 第6条: ML 用 Kubernetes: オーケストレーションとスケーリング
- 第7条: 高度な監視: データドリフトと明らかに AI
- 第8条: 本番環境での ML モデルの A/B テスト
- 第9条: ガバナンス、コンプライアンス、責任ある ML
- 第10条: ケーススタディ: エンドツーエンドの MLOps パイプライン







