ベンチマークと最適化: 48GB GPU から 8GB RTX まで
あなたにはモデルがいます。 80GB A100で動作します。ただし、24GB RTX 3090 にデプロイする必要があります。 RTX 4060 8GB ラップトップ、あるいは Raspberry Pi 上でも使用できます。いくらかどうやってわかりますか FP32 から INT4 になると精度が低下しますか?フラッシュアテンションを使用するとどれくらいスピードが上がりますか? 定量化する価値はあるのでしょうか、それとも蒸留したほうが良いのでしょうか?勾配チェックポイント設定によりどのくらいのメモリが節約されますか?
体系的なベンチマークがなければ、これらの質問は答えられないままになり、最終的には 直感または公開されている構成のベンチマークに基づいて次善の選択を行う 自分のものとは違う。このシリーズの最終記事では、フレームワークを構築します。 測定するための包括的なベンチマーク すべてのサイズ パフォーマンスの向上: 記憶力、 レイテンシー、スループット、精度、消費電力。
次に、シリーズで見られたすべてのテクニック (量子化、 枝刈り、蒸留、フラッシュ アテンション、グラジエント チェックポイント、混合精度 — 48 GB を必要とするモデルから 8 GB で動作するモデルに移行する方法を示します。 品質に関して支払う金額を正確に示す指標を使用します。
何を学ぶか
- DL モデルの体系的なベンチマーク フレームワーク
- VRAM、レイテンシ、スループット、FLOP を正確に測定
- 混合精度トレーニング: FP16 vs BF16 vs FP32
- フラッシュ アテンション 2/3: どれだけ節約し、いつ使用するか
- 勾配チェックポインティング: メモリとコンピューティングのトレードオフ
- グラジエント累積: 実質的に大きなバッチサイズ
- Torch.compile とランタイムの最適化
- KV キャッシュ: LLM 自己回帰推論の最適化
- 体系的な比較: すべてのテクニックを比較
- 意思決定のガイダンス: どのシナリオに対してどの最適化を行うか
体系的なベンチマークフレームワーク
最適化する前に、正確に測定する必要があります。ベンチマークフレームワーク 専門的な測定: ピーク VRAM 使用量、平均レイテンシーおよび P95、スループット (トークン/秒または画像/秒)、 特定のタスクにおける FLOP、エネルギー消費、および精度。鍵となるのは、 再現性: 実行間で 10% 異なるベンチマークは役に立ちません。
import torch
import torch.nn as nn
import time
import numpy as np
from dataclasses import dataclass, asdict
from typing import Optional, Callable
import gc
# ============================================================
# DATACLASS PER RISULTATI BENCHMARK
# ============================================================
@dataclass
class BenchmarkResult:
"""Risultati completi di un benchmark."""
name: str
# Memoria
vram_allocated_mb: float
vram_reserved_mb: float
vram_peak_mb: float
# Velocita
latency_ms_mean: float
latency_ms_p50: float
latency_ms_p95: float
latency_ms_p99: float
throughput_per_sec: float
# Modello
params_total: int
params_trainable: int
model_size_mb: float
# Opzionali
accuracy: Optional[float] = None
flops_total: Optional[float] = None
power_watts: Optional[float] = None
def print_summary(self):
print(f"\n=== {self.name} ===")
print(f" VRAM: {self.vram_peak_mb:.0f} MB peak, {self.vram_allocated_mb:.0f} MB alloc")
print(f" Latenza: {self.latency_ms_mean:.1f}ms mean, "
f"{self.latency_ms_p95:.1f}ms p95, {self.latency_ms_p99:.1f}ms p99")
print(f" Throughput: {self.throughput_per_sec:.1f}/s")
print(f" Parametri: {self.params_total:,} ({self.model_size_mb:.1f} MB)")
if self.accuracy:
print(f" Accuratezza: {self.accuracy:.4f}")
# ============================================================
# CLASSE PRINCIPALE DI BENCHMARKING
# ============================================================
class DeepLearningBenchmark:
def __init__(self, device: str = "cuda"):
self.device = device
self.results = []
def _count_params(self, model: nn.Module) -> tuple:
total = sum(p.numel() for p in model.parameters())
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
return total, trainable
def _model_size_mb(self, model: nn.Module) -> float:
total_bytes = sum(p.numel() * p.element_size() for p in model.parameters())
return total_bytes / (1024 ** 2)
def _reset_memory(self):
"""Reset GPU memory per benchmark pulito."""
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
def benchmark_inference(
self,
name: str,
model: nn.Module,
input_fn: Callable[[], tuple],
n_warmup: int = 10,
n_runs: int = 100,
batch_size: int = 1
) -> BenchmarkResult:
"""
Benchmark completo di inferenza.
input_fn: funzione che restituisce input per il modello
"""
model = model.to(self.device).eval()
self._reset_memory()
# Warmup
with torch.no_grad():
for _ in range(n_warmup):
inputs = input_fn()
if isinstance(inputs, dict):
model(**{k: v.to(self.device) for k, v in inputs.items()})
else:
model(inputs.to(self.device))
# Misura memoria post-warmup
if torch.cuda.is_available():
mem_alloc = torch.cuda.memory_allocated() / (1024**2)
mem_reserved = torch.cuda.memory_reserved() / (1024**2)
# Benchmark vero
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies = []
for _ in range(n_runs):
inputs = input_fn()
t0 = time.perf_counter()
with torch.no_grad():
if isinstance(inputs, dict):
_ = model(**{k: v.to(self.device) for k, v in inputs.items()})
else:
_ = model(inputs.to(self.device))
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
if torch.cuda.is_available():
mem_peak = torch.cuda.max_memory_allocated() / (1024**2)
else:
mem_alloc = mem_reserved = mem_peak = 0.0
latencies = np.array(latencies)
total_params, trainable_params = self._count_params(model)
result = BenchmarkResult(
name=name,
vram_allocated_mb=mem_alloc,
vram_reserved_mb=mem_reserved,
vram_peak_mb=mem_peak,
latency_ms_mean=float(np.mean(latencies)),
latency_ms_p50=float(np.percentile(latencies, 50)),
latency_ms_p95=float(np.percentile(latencies, 95)),
latency_ms_p99=float(np.percentile(latencies, 99)),
throughput_per_sec=1000 / np.mean(latencies) * batch_size,
params_total=total_params,
params_trainable=trainable_params,
model_size_mb=self._model_size_mb(model)
)
result.print_summary()
self.results.append(result)
return result
def benchmark_training_step(
self,
name: str,
model: nn.Module,
optimizer: torch.optim.Optimizer,
loss_fn: Callable,
input_fn: Callable,
n_steps: int = 50
) -> dict:
"""Benchmark di un singolo step di training."""
model = model.to(self.device).train()
self._reset_memory()
latencies = []
for step in range(n_steps):
inputs, labels = input_fn()
inputs = inputs.to(self.device)
labels = labels.to(self.device)
t0 = time.perf_counter()
optimizer.zero_grad()
outputs = model(inputs)
loss = loss_fn(outputs, labels)
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
return {
"name": name,
"vram_peak_mb": torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0,
"step_ms_mean": float(np.mean(latencies[5:])), # Skip warmup
"step_ms_p95": float(np.percentile(latencies[5:], 95))
}
def compare_results(self) -> None:
"""Stampa tabella comparativa di tutti i risultati."""
if not self.results:
print("Nessun risultato disponibile.")
return
baseline = self.results[0]
print(f"\n{'Config':<30} {'VRAM (MB)':>12} {'Latency (ms)':>14} {'Throughput':>12} {'Speedup':>10}")
print("-" * 82)
for r in self.results:
speedup = baseline.latency_ms_mean / r.latency_ms_mean
print(f"{r.name:<30} {r.vram_peak_mb:>12.0f} {r.latency_ms_mean:>14.2f} "
f"{r.throughput_per_sec:>12.1f} {speedup:>10.2f}x")
# Uso:
bench = DeepLearningBenchmark(device="cuda" if torch.cuda.is_available() else "cpu")
print("Framework di benchmarking inizializzato")
混合精度: FP32 vs FP16 vs BF16
Il 混合精度トレーニング そして最初に有効にする最適化:
構成にかかるオーバーヘッドがゼロ、メモリが 2 倍節約され、多くの場合ハードウェアで 2 ~ 3 倍のスピードアップが実現します。
アンペア+。 torch.autocast どの操作を実行するかを自動的に管理します
精度が低下します。
FP16 と BF16 およびバイナリ形式の主な違い: FP16 には指数に 5 ビットがあります 仮数部は 10 (6e-5 ~ 6.5e4 の範囲)、BF16 は指数部に 8 ビット、仮数部に 7 ビットを持ちます。 仮数 (FP32 と同じ範囲、1.2e-38 ~ 3.4e38)。 BF16 以降はさらに安定しました 大きな勾配でオーバーフロー/アンダーフローが発生しないため、トレーニングに最適です。
import torch
import torch.nn as nn
from torch.cuda.amp import GradScaler
# ============================================================
# CONFRONTO FP32 vs FP16 vs BF16
# ============================================================
def train_step_fp32(model, optimizer, imgs, labels, criterion):
"""Training step standard FP32."""
optimizer.zero_grad()
output = model(imgs)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
return loss.item()
def train_step_fp16(model, optimizer, imgs, labels, criterion, scaler: GradScaler):
"""
Training step con AMP FP16.
GradScaler necessario: FP16 ha range limitato, loss scaling evita underflow.
"""
optimizer.zero_grad()
with torch.autocast(device_type="cuda", dtype=torch.float16):
output = model(imgs)
loss = criterion(output, labels)
# Scala la loss per evitare underflow in FP16
scaler.scale(loss).backward()
# Decomprime gradienti prima di clip
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
# Aggiorna pesi (salta se ci sono NaN/Inf nei gradienti)
scaler.step(optimizer)
scaler.update()
return loss.item()
def train_step_bf16(model, optimizer, imgs, labels, criterion):
"""
Training step con BF16.
BF16 NON richiede GradScaler: ha range dinamico uguale a FP32.
Disponibile su: A100, RTX 3000+, Apple M-series.
"""
optimizer.zero_grad()
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
output = model(imgs)
loss = criterion(output, labels)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
return loss.item()
# Benchmark comparativo
from torchvision import models
import time, gc
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def compare_precisions(model_fn=models.resnet50, n_steps=100,
batch_size=32, img_size=224):
"""Confronta FP32, FP16, BF16 per training e inferenza."""
criterion = nn.CrossEntropyLoss()
configs = [
("FP32", torch.float32, False),
("FP16", torch.float16, True), # Richiede GradScaler
("BF16", torch.bfloat16, False) # No GradScaler
]
results = {}
for name, dtype, use_scaler in configs:
model = model_fn(pretrained=False).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scaler = GradScaler() if use_scaler else None
# Reset memory stats
torch.cuda.reset_peak_memory_stats() if torch.cuda.is_available() else None
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
timings = []
for step in range(n_steps):
imgs = torch.randn(batch_size, 3, img_size, img_size, device=device)
labels = torch.randint(0, 1000, (batch_size,), device=device)
t0 = time.perf_counter()
with torch.autocast(device_type="cuda", dtype=dtype, enabled=(dtype != torch.float32)):
out = model(imgs)
loss = criterion(out, labels)
if scaler:
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
optimizer.zero_grad()
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
timings.append((time.perf_counter() - t0) * 1000)
vram_peak = torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0
results[name] = {
"vram_mb": round(vram_peak, 1),
"step_ms": round(np.mean(timings[10:]), 2),
"throughput_imgs_s": round(batch_size * 1000 / np.mean(timings[10:]), 1)
}
print(f"{name}: VRAM={vram_peak:.0f}MB, {np.mean(timings[10:]):.1f}ms/step, "
f"{batch_size*1000/np.mean(timings[10:]):.0f} img/s")
return results
# Risultati tipici ResNet-50 BS=32 su RTX 4090:
# FP32: VRAM=6200MB, 95ms/step, 336 img/s
# FP16: VRAM=3100MB, 41ms/step, 780 img/s (2x velocità, 50% VRAM)
# BF16: VRAM=3100MB, 38ms/step, 842 img/s (2.2x velocità, 50% VRAM)
フラッシュアテンション: ルールを変える最適化
フラッシュアテンション (Dao et al., 2022)、おそらく最も影響力のある最適化 近年のトランスフォーマーに関しては。注意の計算を再定式化すると、 IOバウンド対応: HBM で完全なアテンション マトリックスを具体化する代わりに (メモリ内で O(n^2) の複雑さを持つ)、SRAM 内に留まりながらブロック アテンションを計算します。 結果: メモリ内の複雑さは O(n^2) ではなく O(n) になり、長いシーケンスでは 2 ~ 4 倍の速度が向上します。
Flash Attendance 2 (2023) では、GPU での並列処理がさらに向上し、 FP16 FLOPS の理論上の使用量の 72%。 Flash アテンション 3 (2024) でサポートが追加されました FP8 および Hopper 固有の最適化により、FA2 と比較して最大 2 倍のスピードアップが実現します。
import torch
import torch.nn as nn
import torch.nn.functional as F
import time, math
# ============================================================
# FLASH ATTENTION vs STANDARD ATTENTION: CONFRONTO
# ============================================================
def standard_attention(q, k, v, scale=None):
"""
Attention standard: materializza la matrice NxN completa in GPU memory.
Complessità memoria: O(N^2 * d_head)
"""
if scale is None:
scale = q.size(-1) ** -0.5
# [B, heads, N, N] - questa matrice può essere ENORME per seq lunghe!
attn = torch.softmax((q @ k.transpose(-2, -1)) * scale, dim=-1)
return attn @ v
def flash_attention_native(q, k, v):
"""
Flash Attention tramite PyTorch 2.0+ scaled_dot_product_attention.
Sceglie automaticamente l'implementazione ottimale:
- FlashAttention-2 se disponibile (CUDA Ampere+)
- Memory-efficient attention (xFormers) come fallback
- Standard attention come ultimo fallback
"""
# Automaticamente ottimizzato da PyTorch
return F.scaled_dot_product_attention(q, k, v, is_causal=False)
def benchmark_attention_implementations(
batch_size=4, n_heads=12, seq_lengths=[512, 1024, 2048, 4096, 8192],
d_head=64, device="cuda"
):
"""
Confronta Standard vs Flash Attention su diverse lunghezze di sequenza.
"""
print(f"{'Seq Len':>10} | {'Standard (ms)':>15} | {'Flash (ms)':>12} | "
f"{'Speedup':>10} | {'VRAM Std (MB)':>15} | {'VRAM Flash (MB)':>15}")
print("-" * 90)
for seq_len in seq_lengths:
q = torch.randn(batch_size, n_heads, seq_len, d_head, device=device, dtype=torch.float16)
k = torch.randn_like(q)
v = torch.randn_like(q)
# Warmup
for _ in range(5):
standard_attention(q, k, v)
flash_attention_native(q, k, v)
# Benchmark Standard
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(20):
out_std = standard_attention(q, k, v)
torch.cuda.synchronize()
std_ms = (time.perf_counter() - t0) / 20 * 1000
vram_std = torch.cuda.max_memory_allocated() / (1024**2)
# Benchmark Flash Attention
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(20):
out_flash = flash_attention_native(q, k, v)
torch.cuda.synchronize()
flash_ms = (time.perf_counter() - t0) / 20 * 1000
vram_flash = torch.cuda.max_memory_allocated() / (1024**2)
speedup = std_ms / flash_ms
print(f"{seq_len:>10} | {std_ms:>15.2f} | {flash_ms:>12.2f} | "
f"{speedup:>10.2f}x | {vram_std:>15.0f} | {vram_flash:>15.0f}")
# Risultati tipici su RTX 4090 (FP16, B=4, heads=12, d_head=64):
# Seq Len | Standard (ms) | Flash (ms) | Speedup | VRAM Std (MB) | VRAM Flash (MB)
# -----------------------------------------------------------------------------------
# 512 | 0.82 | 0.31 | 2.65x | 48 | 12
# 1024 | 2.45 | 0.58 | 4.22x | 192 | 24
# 2048 | 9.12 | 1.12 | 8.14x | 768 | 48
# 4096 | 35.80 | 2.21 | 16.20x | 3072 | 96
# 8192 | 144.20 | 4.38 | 32.92x | 12288 | 192
# Flash Attention scala LINEARMENTE: a seq=8192 usa 64x meno VRAM!
勾配チェックポインティングと勾配累積
トレーニング中に VRAM がボトルネックになる場合は、2 つの補完的なテクニックを使用します ハードウェアをアップグレードせずに、より大きなバッチをトレーニングできるようになります。
import torch
import torch.nn as nn
from torch.utils.checkpoint import checkpoint_sequential
import gc
# ============================================================
# GRADIENT CHECKPOINTING
# ============================================================
# Idea: invece di salvare tutte le attivazioni intermedie per il backward pass,
# le ricalcola al momento (tradeoff: +33% compute, -50-70% memoria)
class CheckpointedTransformerBlock(nn.Module):
"""Transformer block con gradient checkpointing."""
def __init__(self, d_model=768, n_heads=12):
super().__init__()
self.attn = nn.MultiheadAttention(d_model, n_heads, batch_first=True)
self.norm1 = nn.LayerNorm(d_model)
self.ff = nn.Sequential(
nn.Linear(d_model, d_model * 4), nn.GELU(),
nn.Linear(d_model * 4, d_model)
)
self.norm2 = nn.LayerNorm(d_model)
def _attn_block(self, x):
attn_out, _ = self.attn(x, x, x)
return self.norm1(x + attn_out)
def _ff_block(self, x):
return self.norm2(x + self.ff(x))
def forward(self, x):
# Gradient checkpointing: ogni sotto-modulo viene ricalcolato
# durante il backward invece di essere salvato
x = torch.utils.checkpoint.checkpoint(self._attn_block, x, use_reentrant=False)
x = torch.utils.checkpoint.checkpoint(self._ff_block, x, use_reentrant=False)
return x
def enable_gradient_checkpointing_hf(model):
"""Abilita gradient checkpointing su modelli HuggingFace."""
model.gradient_checkpointing_enable()
print(f"Gradient checkpointing abilitato su {type(model).__name__}")
# Benchmark Gradient Checkpointing
def compare_checkpointing(seq_len=2048, batch_size=8, d_model=768,
n_layers=12, n_heads=12, device="cuda"):
"""Confronta training con e senza gradient checkpointing."""
class SimpleTransformer(nn.Module):
def __init__(self, use_checkpoint=False):
super().__init__()
self.use_checkpoint = use_checkpoint
self.blocks = nn.ModuleList([
CheckpointedTransformerBlock(d_model, n_heads) if use_checkpoint
else CheckpointedTransformerBlock(d_model, n_heads)
for _ in range(n_layers)
])
self.head = nn.Linear(d_model, 1000)
def forward(self, x):
for block in self.blocks:
if self.use_checkpoint:
x = torch.utils.checkpoint.checkpoint(block, x, use_reentrant=False)
else:
x = block(x)
return self.head(x[:, 0])
results = {}
for use_ckpt in [False, True]:
name = "con checkpointing" if use_ckpt else "senza checkpointing"
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
torch.cuda.reset_peak_memory_stats() if torch.cuda.is_available() else None
model = SimpleTransformer(use_checkpoint=use_ckpt).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
x = torch.randn(batch_size, seq_len, d_model, device=device)
labels = torch.randint(0, 1000, (batch_size,), device=device)
# Forward + backward
torch.cuda.synchronize() if torch.cuda.is_available() else None
t0 = time.perf_counter()
for _ in range(10):
optimizer.zero_grad()
out = model(x)
loss = nn.CrossEntropyLoss()(out, labels)
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
elapsed = (time.perf_counter() - t0) / 10 * 1000
vram = torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0
results[name] = {"vram_mb": round(vram, 1), "step_ms": round(elapsed, 1)}
print(f"{name}: VRAM={vram:.0f}MB, Step={elapsed:.1f}ms")
return results
# Risultati tipici (Transformer 12 layer, seq=2048, BS=8, RTX 3090):
# Senza checkpointing: VRAM=18.4GB, Step=285ms
# Con checkpointing: VRAM= 7.8GB, Step=378ms (-58% VRAM, +33% compute)
# ============================================================
# GRADIENT ACCUMULATION
# ============================================================
def train_with_gradient_accumulation(
model, optimizer, train_loader, criterion,
accumulation_steps: int = 4,
device: str = "cuda"
):
"""
Gradient accumulation: simula batch_size * accumulation_steps
con la memoria di batch_size.
Utile quando il batch_size reale e troppo piccolo per convergenza ottimale.
"""
model = model.to(device).train()
optimizer.zero_grad()
for step, (imgs, labels) in enumerate(train_loader):
imgs, labels = imgs.to(device), labels.to(device)
# Forward pass
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
output = model(imgs)
# Dividi loss per accumulation steps (mantiene la scala corretta)
loss = criterion(output, labels) / accumulation_steps
loss.backward()
# Aggiorna i pesi ogni N step
if (step + 1) % accumulation_steps == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
optimizer.zero_grad()
effective_batch = imgs.size(0) * accumulation_steps
print(f"Step {(step+1)//accumulation_steps} | "
f"Effective batch: {effective_batch} | Loss: {loss.item()*accumulation_steps:.4f}")
torch.compile: グラフの最適化
トーチ.コンパイル (PyTorch 2.0+) モデルを最適化されたカーネルにコンパイルします Triton またはその他のバックエンド経由。適用する最も簡単な最適化は 1 つだけです コード行により、推論が 1.5 ~ 2.5 倍高速化される可能性があります。
import torch
from torchvision import models
import time, numpy as np
def benchmark_torch_compile():
device = "cuda" if torch.cuda.is_available() else "cpu"
# ============================================================
# MODALITA DI COMPILAZIONE
# ============================================================
# "default": Bilanciamento compile time / speedup
# "reduce-overhead": Minimizza overhead, ottimale per piccoli batch
# "max-autotune": Massima velocità (compile time molto più lungo, ~5-10 min)
# "inductor": Backend default (usa Triton su CUDA, C++ su CPU)
model_fp32 = models.resnet50(pretrained=False).to(device).eval()
# Compilazione eager (default)
model_compiled_default = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="default"
)
# Compilazione per massima velocità
model_compiled_max = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="max-autotune",
fullgraph=True # Evita graph breaks per massimo speedup
)
x = torch.randn(32, 3, 224, 224, device=device)
def time_model(model, x, n=100):
"""Benchmark con warmup."""
# Warmup (specialmente importante per torch.compile)
with torch.no_grad():
for _ in range(20):
model(x)
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies = []
with torch.no_grad():
for _ in range(n):
t0 = time.perf_counter()
model(x)
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
return np.mean(latencies)
ms_eager = time_model(model_fp32, x)
ms_default = time_model(model_compiled_default, x)
# ms_max = time_model(model_compiled_max, x) # Richiede molto tempo di compile
print(f"Eager (FP32): {ms_eager:.2f} ms")
print(f"Compiled default: {ms_default:.2f} ms ({ms_eager/ms_default:.2f}x speedup)")
# Con BF16 + compile: effetto moltiplicativo
model_bf16_compiled = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="default"
)
x_bf16 = x.to(torch.bfloat16)
model_bf16_compiled = model_bf16_compiled.to(torch.bfloat16)
ms_bf16_compiled = time_model(model_bf16_compiled, x_bf16)
print(f"BF16 + Compiled: {ms_bf16_compiled:.2f} ms ({ms_eager/ms_bf16_compiled:.2f}x speedup)")
# Risultati tipici RTX 4090:
# Eager FP32: 12.4 ms/step (BS=32)
# Compiled default: 7.8 ms/step (1.59x)
# BF16 + Compiled: 5.1 ms/step (2.43x)
benchmark_torch_compile()
KV キャッシュ: LLM 自己回帰推論の最適化
自己回帰モデルでは、生成された各トークンは、すべてのトークンが注目されるまで待機する必要があります。 以前のトークン。最適化を行わないと、キー (K) と値 (V) が再計算されます。 各ステップで、n 個のトークンのシーケンスに対して複雑さ O(n^2) になります。の KVキャッシュ 各ステップの後に各層の K と V を節約し、コストを削減します O(n^2) から O(n) までの世代。
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
# ============================================================
# TRANSFORMER CON KV CACHE
# ============================================================
class CachedMultiHeadAttention(nn.Module):
"""
Multi-head attention con KV cache per generazione autogressiva.
Il cache evita di ricalcolare K, V per token passati.
"""
def __init__(self, d_model: int, n_heads: int):
super().__init__()
self.n_heads = n_heads
self.d_head = d_model // n_heads
self.scale = self.d_head ** -0.5
self.q_proj = nn.Linear(d_model, d_model, bias=False)
self.k_proj = nn.Linear(d_model, d_model, bias=False)
self.v_proj = nn.Linear(d_model, d_model, bias=False)
self.out_proj = nn.Linear(d_model, d_model, bias=False)
def forward(
self,
x: torch.Tensor, # [B, seq_len, d_model]
kv_cache: Optional[Tuple] = None # (K_cache, V_cache) o None
) -> Tuple[torch.Tensor, Tuple]:
B, T, D = x.shape
# Proietta Q, K, V
q = self.q_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
k = self.k_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
v = self.v_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
# Concatena con cache esistente
if kv_cache is not None:
k_cache, v_cache = kv_cache
k = torch.cat([k_cache, k], dim=2) # [B, heads, T_total, d_head]
v = torch.cat([v_cache, v], dim=2)
# Attention (Flash Attention automatica con PyTorch 2.0+)
out = F.scaled_dot_product_attention(q, k, v, is_causal=(kv_cache is None))
out = out.transpose(1, 2).contiguous().view(B, T, D)
return self.out_proj(out), (k, v) # Ritorna output + nuovo cache
class CachedTransformerDecoder(nn.Module):
"""Decoder Transformer con KV cache per generazione efficiente."""
def __init__(self, vocab_size: int, d_model: int = 512,
n_heads: int = 8, n_layers: int = 6):
super().__init__()
self.embed = nn.Embedding(vocab_size, d_model)
self.pos_embed = nn.Embedding(2048, d_model)
self.layers = nn.ModuleList([
CachedMultiHeadAttention(d_model, n_heads)
for _ in range(n_layers)
])
self.norms = nn.ModuleList([nn.LayerNorm(d_model) for _ in range(n_layers)])
self.head = nn.Linear(d_model, vocab_size)
self.n_layers = n_layers
@torch.no_grad()
def generate(
self,
input_ids: torch.Tensor, # [B, seq_len]
max_new_tokens: int = 100,
temperature: float = 1.0
) -> torch.Tensor:
"""
Generazione autogressiva con KV cache.
Ogni step utilizza il cache dei token precedenti.
"""
B, T = input_ids.shape
device = input_ids.device
# Processa il prompt (prefill)
x = self.embed(input_ids)
positions = torch.arange(T, device=device).unsqueeze(0)
x = x + self.pos_embed(positions)
# Inizializza cache per ogni layer
kv_caches = [None] * self.n_layers
for i, (layer, norm) in enumerate(zip(self.layers, self.norms)):
x_norm = norm(x)
attn_out, kv_caches[i] = layer(x_norm, kv_caches[i])
x = x + attn_out
# Generazione token per token (usando il cache)
generated = []
for step in range(max_new_tokens):
# Solo l'ultimo token come query
last_token = input_ids[:, -1:] if step == 0 else new_token
x_new = self.embed(last_token)
pos = torch.tensor([[T + step]], device=device)
x_new = x_new + self.pos_embed(pos)
for i, (layer, norm) in enumerate(zip(self.layers, self.norms)):
x_norm = norm(x_new)
attn_out, kv_caches[i] = layer(x_norm, kv_caches[i])
x_new = x_new + attn_out
# Campiona prossimo token
logits = self.head(x_new[:, -1, :]) / temperature
new_token = torch.multinomial(torch.softmax(logits, -1), 1)
generated.append(new_token)
return torch.cat(generated, dim=1)
# Benchmark KV cache vs no cache
def benchmark_generation(model, vocab_size=32000, seq_len=128,
max_new=50, device="cuda"):
model = model.to(device).eval()
input_ids = torch.randint(0, vocab_size, (1, seq_len), device=device)
# Con KV cache (normale)
t0 = time.perf_counter()
with torch.no_grad():
output = model.generate(input_ids, max_new_tokens=max_new)
t_cached = (time.perf_counter() - t0) * 1000
tokens_per_sec = max_new / (t_cached / 1000)
print(f"Con KV Cache: {t_cached:.1f}ms totale, {tokens_per_sec:.1f} token/s")
体系的な比較: 48GB から 8GB RTX まで
シリーズで見られたすべての最適化を段階的に適用して要約します。 基本モデルに変換し、精度/メモリ/速度のトレードオフを示します。
完全な比較: RTX 3090 (24GB) 上の Llama-3.1-8B
| 構成 | VRAM | スループット | ヘラスワッグ | 困惑 | 注意事項 |
|---|---|---|---|---|---|
| BF16ベースライン | 16.0GB | 38トン/秒 | 82.1% | 6.14 | 参考ベンチマーク |
| +フラッシュアテンション2 | 14.2GB | 52トン/秒 | 82.1% | 6.14 | -11% VRAM、+37% 速度 |
| + トーチ.コンパイル | 14.2GB | 68トン/秒 | 82.1% | 6.14 | フラッシュ アテンション +31% |
| INT8 (ビットサンドバイト) | 8.5GB | 35トン/秒 | 81.8% | 6.21 | -47% VRAM、-0.3% acc |
| INT4 NF4 (bnb) | 4.9GB | 42トン/秒 | 81.2% | 6.47 | -69% VRAM、-0.9% acc |
| GPTQ INT4 | 4.8GB | 55トン/秒 | 81.5% | 6.39 | -70% VRAM、-0.6% acc |
| AWQ INT4 | 4.7GB | 52トン/秒 | 81.6% | 6.35 | -71% VRAM、-0.5% acc |
| GGUF Q4_K_M (CPU) | 0 VRAM (5 GB RAM) | 18トン/秒 | 81.3% | 6.42 | GPUは必要ありません |
RTX 3090 (24GB VRAM) での概算値。スループットはバッチ = 1、シーケンス = 512 で測定されました。
意思決定ガイド: どのシナリオにどの最適化を行うか
# ALBERO DECISIONALE PER OTTIMIZZAZIONE DL
def recommend_optimization(
vram_available_gb: float,
task: str, # "training" | "inference" | "edge"
accuracy_critical: bool,
hardware: str # "server_gpu" | "consumer_gpu" | "cpu" | "edge"
) -> dict:
"""
Raccomanda le ottimizzazioni più appropriate per il proprio scenario.
"""
recommendations = []
priority = []
# === SEMPRE DA FARE (zero o quasi zero costo) ===
priority.append("1. Mixed Precision (BF16/FP16): abilita SEMPRE su GPU Ampere+")
priority.append("2. Flash Attention: abilita se seq_len > 512")
priority.append("3. torch.compile: abilita se PyTorch 2.0+, +30-50% speedup inference")
priority.append("4. KV Cache: abilita SEMPRE per LLM autoregressive generation")
if task == "training":
if vram_available_gb < 24:
priority.append("5. Gradient Checkpointing: -50% VRAM, +33% compute")
priority.append("6. Gradient Accumulation: simula batch più grandi")
if hardware in ["consumer_gpu", "edge"]:
priority.append("7. QLoRA: fine-tuning con INT4 + LoRA su GPU consumer")
if task in ["inference", "edge"]:
if not accuracy_critical:
if hardware == "server_gpu":
priority.append("5. GPTQ INT4: massimo throughput su GPU NVIDIA")
elif hardware in ["consumer_gpu", "cpu"]:
priority.append("5. AWQ INT4 o GGUF Q4_K_M: per hardware eterogeneo")
elif hardware == "edge":
priority.append("5. GGUF Q3_K_M o Q4_K_M: per Raspberry Pi / embedded")
else:
priority.append("5. INT8 (bitsandbytes): minima perdita di accuratezza")
if vram_available_gb < 16:
priority.append("6. ONNX Export: riduzione overhead runtime +20-40%")
priority.append("7. Considera distillazione verso modello più piccolo")
print("=== RACCOMANDAZIONI OTTIMIZZAZIONE ===")
for p in priority:
print(f" {p}")
return {"priorities": priority}
# Esempi:
print("--- Scenario 1: Fine-tuning su RTX 4080 (16GB) ---")
recommend_optimization(16, "training", True, "consumer_gpu")
print("\n--- Scenario 2: Inferenza su Raspberry Pi ---")
recommend_optimization(0, "inference", False, "edge")
print("\n--- Scenario 3: Produzione su A100 (80GB) ---")
recommend_optimization(80, "inference", True, "server_gpu")
最適化の概要: 予想される影響
| 技術 | VRAMの保存 | 高速化 | アクセス損失 | 複雑 |
|---|---|---|---|---|
| 混合精度 BF16 | -50% | 2~3倍 | 0% | 低(1行) |
| フラッシュアテンション2 | -50-90% | 2~8倍 | 0% | 低(1行) |
| トーチ.コンパイル | 0% | 1.5~2.5倍 | 0% | 低(1行) |
| KVキャッシュ | +VRAM | 10~50倍の世代 | 0% | 低い |
| 勾配チェックポイント | -50-70% | -0.7x | 0% | 低い |
| INT8量子化 | -50% | 0.9~1.1倍 | 0~0.5% | 低い |
| INT4 GPTQ/AWQ | -75% | 1.3~1.8倍 | 0.5~1.5% | 平均 |
| 蒸留 | -70-90% | 5~20倍 | 5~15% | 高い |
| 構造化された枝刈り | -30-70% | 2~5倍 | 2-10% | 高い |
シリーズの結論
シリーズ全体を通しました 高度なディープラーニングとエッジ展開: Transformers のアテンション メカニズムから LoRA による微調整、GPTQ 量子化まで 構造化プルーニング、蒸留からビジョントランスフォーマー、NAS からエッジ展開まで Raspberry Pi と Jetson を使用して、Ollama からこの最終ベンチマークまで。
中心となる明確なメッセージは、単一の「最良の」テクニックは存在しないということです。最適な選択 利用可能なハードウェア、精度要件、ターゲットのレイテンシ、 運営コスト。しかし、この記事で紹介した体系的なベンチマーク フレームワークを使用すると、 できます 測定 の代わりに 推測、情報に基づいた意思決定を行います。
2026 年の傾向は明らかです。モデルはエッジに向かって進んでいます。 Gartner 2027 は次のように予測しています。 SLM は、使用時にクラウド LLM を 3 倍上回るパフォーマンスを発揮します。このシリーズのテクニック — 量子化、 蒸留、エッジ展開、オラマ — これらは学術分野ではなく、スキルです これは、今後数年間で AI を使って仕事をしたいと考えている人にとって、基本的なものです。
シリーズの概要: 高度なディープラーニング
- 第 1 条: 変圧器におけるアテンション メカニズム
- 記事 2: LoRA と QLoRA による微調整
- 第3条:量子化 GPTQ、AWQ、INT8
- 第 4 条: 知識の蒸留
- 第5条: ニューラルネットワークの枝刈り
- 第6条:ビジョントランスフォーマー(ViT)
- 第 7 条: ニューラル アーキテクチャの検索
- 第 8 条: エッジデバイス上のディープラーニング
- 第 9 条: オラマおよび LLM の敷地
- 第 10 条 (本): ベンチマークと最適化
関連シリーズ: MLOps | コンピュータビジョン | AIエンジニアリング







