本番環境の RAG: アーキテクチャ、スケーリング、モニタリング
ローカルで動作する RAG プロトタイプの構築は比較的簡単です。持ち込んでください 実稼働環境では、数千の同時クエリを処理する必要があり、2 回未満で応答します 秒、長期間にわたって高品質を維持し、データを失わないこととは、まったく異なるものです。 「私のラップトップで動作する」と「10,000 ユーザーの本番環境で動作する」との間の距離 は巨大であり、多くの RAG プロジェクトはこの段階で失敗します。
この記事では、実際の課題について取り上げます。 実稼働中の RAG: スケーラブルなアーキテクチャ、最適なチャンキング、再ランキング、更新管理 RAG 固有のメトリクスによるモニタリングとコーパスの自動評価 RAGAS のようなフレームワークで品質を向上させます。理論に関するものではありません。各セクションには次の内容が含まれます。 実行可能な Python コードとパターンは実際のシステムでテストされています。
何を学ぶか
- スケーラブルな RAG システムの運用対応アーキテクチャ
- 高度なチャンキング戦略 (再帰的、セマンティック、センテンス ウィンドウ)
- 精度を向上させるためのクロスエンコーダ再ランキングパイプライン
- 増分ベクトル コーパス更新の管理
- RAG 固有の指標 (忠実度、関連性、再現率) によるモニタリング
- RAGASフレームワークによる自動評価
- インテリジェントなキャッシュによりレイテンシとコストを最適化
- 本番環境でのエラー処理と正常な機能低下
シリーズ概要
| # | アイテム | 集中 |
|---|---|---|
| 1 | RAGの説明 | 基礎と建築 |
| 2 | 埋め込みとセマンティック検索 | バート、スバート、フェイス |
| 3 | ベクターデータベース | クドラント、松ぼっくり、ミルバス |
| 4 | ハイブリッド検索 | BM25 + ベクトル検索 |
| 5 | 本番環境の RAG (ここにいます) | スケーリング、モニタリング、評価 |
| 6 | RAG 用の LangChain | 高度なフレームワークとパターン |
| 7 | コンテキストウィンドウの管理 | LLM入力の最適化 |
| 8 | マルチエージェントシステム | オーケストレーションと調整 |
| 9 | 生産における迅速なエンジニアリング | テンプレート、バージョン管理、テスト |
| 10 | AI のナレッジ グラフ | LLM における構造化された知識 |
1. 本番環境に対応した RAG システムのアーキテクチャ
本番環境の RAG システムは、単純なシーケンシャル パイプラインではありません。 それぞれがスケーラビリティ要件を備えた特殊なコンポーネントで分散され、 フォールトトレランスとモニタリングが異なります。完全なアーキテクチャと 実稼働環境に耐えられるものを構築するための最初のステップ。
ARCHITETTURA RAG PRODUCTION
┌─────────────────────────────────────────────────────────┐
│ API GATEWAY │
│ (Rate limiting, auth, routing) │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
┌──────▼──────┐ ┌───────▼────────┐
│ QUERY │ │ INGESTION │
│ SERVICE │ │ SERVICE │
└──────┬──────┘ └───────┬────────┘
│ │
┌───────▼──────┐ ┌───────▼────────┐
│ RETRIEVAL │ │ DOCUMENT │
│ ENGINE │ │ PROCESSOR │
│ ┌─────────┐ │ │ ┌──────────┐ │
│ │Embedding│ │ │ │Chunking │ │
│ │Cache │ │ │ │Embedding │ │
│ └────┬────┘ │ │ │Indexing │ │
│ │ │ │ └──────────┘ │
│ ┌────▼────┐ │ └───────┬────────┘
│ │Vector │ │ │
│ │Search │ │ ┌───────▼────────┐
│ └────┬────┘ │ │ VECTOR DB │
│ │ │ │ (Qdrant/Pine) │
│ ┌────▼────┐ │ └────────────────┘
│ │Reranker │ │
│ └────┬────┘ │
└───────┼──────┘
│
┌───────▼──────┐ ┌────────────────┐
│ GENERATION │ │ CACHE │
│ SERVICE │◄──►│ (Redis/ │
│ (LLM) │ │ Semantic) │
└───────┬──────┘ └────────────────┘
│
┌───────▼──────┐ ┌────────────────┐
│ MONITORING │ │ EVALUATION │
│ SERVICE │ │ SERVICE │
│ (Prometheus)│ │ (RAGAS) │
└──────────────┘ └────────────────┘
1.1 関心事の分離: 取り込みとクエリ
RAG プロダクション システムの基本的なパターンは次のとおりです。 間の分離 取り込みプランとクエリプラン。これら 2 つのパスには要件があります とても違う:
インジェストとクエリ: 異なる要件
| サイズ | 取り込みパス | クエリパス |
|---|---|---|
| レイテンシ | 重要ではない (バッチ) | 批判 (<2 秒 p95) |
| スループット | 低~中(文書) | 高 (数千リクエスト/秒) |
| CPU/GPU | エンベディング生成 (GPU) | クエリの埋め込み + 再ランク付け (GPU) |
| エラー | バックオフを使用して再試行します | グレースフルフォールバック |
| スケーリング | 水平バッチ | 水平ステートレス |
2. 高度なチャンク化戦略
チャンク化はおそらく RAG システムで最も見落とされている変数ですが、 最終的な品質に大きな影響を与えます。小さすぎるチャンクはコンテキストを失います。 1つ 大きすぎるとノイズが発生し、埋め込みモデルのコンテキストを超えます。
2.1 再帰的文字テキスト分割器
ほとんどのユースケースで最も効果的な方法は、 再帰的な文字分割: テキストを区切り記号で段階的に分割します 文書の自然な構造を尊重するために、より細かく (段落、文、単語) ます。
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict, Any
import re
class AdvancedChunker:
"""Chunker avanzato con metadati e strategie multiple"""
def __init__(
self,
chunk_size: int = 512,
chunk_overlap: int = 64,
strategy: str = "recursive"
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.strategy = strategy
self.recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", " ", ""],
length_function=len,
is_separator_regex=False
)
def chunk_with_metadata(
self,
text: str,
doc_metadata: Dict[str, Any]
) -> List[Dict]:
"""Crea chunks con metadati completi per il retrieval"""
if self.strategy == "recursive":
chunks = self.recursive_splitter.split_text(text)
elif self.strategy == "semantic":
chunks = self._semantic_split(text)
elif self.strategy == "sentence_window":
chunks = self._sentence_window_split(text)
else:
raise ValueError(f"Strategia non supportata: {self.strategy}")
result = []
for i, chunk in enumerate(chunks):
chunk_meta = {
**doc_metadata,
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size": len(chunk),
"strategy": self.strategy,
# Snippet per il contesto nella risposta
"prev_chunk": chunks[i-1][:100] if i > 0 else None,
"next_chunk": chunks[i+1][:100] if i < len(chunks)-1 else None,
}
result.append({"text": chunk, "metadata": chunk_meta})
return result
def _sentence_window_split(
self,
text: str,
window_size: int = 3
) -> List[str]:
"""
Sentence Window: indicizza singole frasi ma recupera
le frasi vicine per mantenere il contesto.
Tecnica avanzata che migliora il recall mantenendo la precisione.
"""
# Split in frasi
sentences = re.split(r'(?<=[.!?])\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks = []
for i, sentence in enumerate(sentences):
# Finestra di contesto: frase centrale + vicine
start = max(0, i - window_size // 2)
end = min(len(sentences), i + window_size // 2 + 1)
window = " ".join(sentences[start:end])
chunks.append(window)
return chunks
def _semantic_split(self, text: str) -> List[str]:
"""
Semantic split: divide in base ai cambi di topic
usando embeddings per individuare boundary semantici.
"""
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
sentences = re.split(r'(?<=[.!?])\s+', text)
if len(sentences) < 3:
return [text]
# Calcola embeddings di ogni frase
embeddings = model.encode(sentences)
# Calcola similarità tra frasi consecutive
similarities = []
for i in range(len(embeddings) - 1):
sim = np.dot(embeddings[i], embeddings[i+1]) / (
np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i+1])
)
similarities.append(sim)
# Trova boundary semantici (bassa similarità = cambio di topic)
threshold = np.mean(similarities) - np.std(similarities)
boundaries = [0]
for i, sim in enumerate(similarities):
if sim < threshold:
boundaries.append(i + 1)
boundaries.append(len(sentences))
# Crea chunks dai segmenti
chunks = []
for i in range(len(boundaries) - 1):
segment = " ".join(sentences[boundaries[i]:boundaries[i+1]])
if len(segment) > self.chunk_size * 2:
# Chunk troppo grande: dividi ulteriormente
sub_chunks = self.recursive_splitter.split_text(segment)
chunks.extend(sub_chunks)
elif segment.strip():
chunks.append(segment)
return chunks
# Esempio di utilizzo
chunker = AdvancedChunker(chunk_size=512, chunk_overlap=64, strategy="recursive")
document = """
Cos'è il RAG?
Il Retrieval-Augmented Generation (RAG) è un'architettura che combina la ricerca su basi
di conoscenza con la generazione di testo degli LLM. Il principio fondamentale è semplice:
invece di lasciare che l'LLM risponda solo con la sua conoscenza interna (soggetta ad
allucinazioni), RAG prima recupera documenti rilevanti e poi li fornisce come contesto.
Come funziona il retrieval?
Il processo di retrieval avviene in due fasi. Prima, i documenti vengono convertiti in
embeddings vettoriali e memorizzati in un vector database. Poi, la query dell'utente viene
anch'essa convertita in un embedding e viene effettuata una ricerca di similarità per
trovare i documenti più rilevanti.
"""
chunks = chunker.chunk_with_metadata(
document,
doc_metadata={"source": "rag-intro.txt", "author": "AI Team"}
)
for chunk in chunks:
print(f"Chunk {chunk['metadata']['chunk_index']}: {len(chunk['text'])} chars")
print(f" {chunk['text'][:100]}...")
2.2 親子チャンク化
高度な RAG システムにとって最も効果的な戦略の 1 つは、 親子チャンク化: 小さなチャンク (子) にインデックスが付けられます。 取得の精度は向上しますが、提供するために大きなチャンク (親) が返されます。 LLM に十分なコンテキスト。
from langchain.storage import InMemoryStore
from langchain.retrievers import ParentDocumentRetriever
from langchain.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Splitter per chunks grandi (parent) - per il contesto
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
# Splitter per chunks piccoli (child) - per l'indicizzazione
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40)
# Storage: chunks grandi in memory store, chunks piccoli in vector store
vectorstore = Qdrant.from_documents(
[], # inizialmente vuoto
OpenAIEmbeddings(),
url="http://localhost:6333",
collection_name="rag_child_chunks"
)
docstore = InMemoryStore() # o Redis per persistenza
# Parent-Child Retriever
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=docstore,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# Come funziona:
# 1. add_documents() divide i documenti in parent e child chunks
# 2. I child chunks vengono indicizzati nel vector store
# 3. I parent chunks vengono salvati nel docstore con un ID univoco
# 4. I child chunks hanno un metadato "doc_id" che punta al parent
# Retrieval:
# 1. Cerca i child chunks più simili alla query
# 2. Recupera i parent chunks corrispondenti
# Risultato: massima precisione (child) + massimo contesto (parent)
3. 再ランキング: 検索精度の向上
Il 再ランキング 品質を大幅に向上させる技術です 2 番目のより正確な (しかし遅い) モデルを結果に適用することによる取得の効率化 イニシャル。典型的なフローは次のとおりです。 高速ベクトル検索で 50 ~ 100 の候補を取得、 それから 正確なクロスエンコーダーによる並べ替え、 最後に トップKを獲得する.
3.1 バイエンコーダーとクロスエンコーダー
2 つのアプローチの基本的な違いは次のとおりです。
- バイエンコーダー (取得): クエリとドキュメントを別々にベクトルにエンコードします。ドキュメントは事前に計算されるため、取得は非常に高速ですが、ドキュメントのエンコード中にクエリが認識されないため、精度は低くなります。
- クロスエンコーダー (再ランキング): クエリとドキュメントを連結して取得し、関連性スコアを生成します。検索よりもはるかに正確ですが、クエリとドキュメントの各ペアをリアルタイムで処理する必要があるため、スケーラビリティは検索ほど高くありません。
from sentence_transformers import CrossEncoder
from typing import List, Tuple
import time
class RerankingRetriever:
"""Retriever con reranking a due stadi"""
def __init__(
self,
bi_encoder, # Sentence Transformer per retrieval veloce
vector_index, # FAISS o vector DB
cross_encoder_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
):
self.bi_encoder = bi_encoder
self.index = vector_index
self.cross_encoder = CrossEncoder(cross_encoder_name)
self.documents = []
def retrieve_and_rerank(
self,
query: str,
initial_k: int = 50,
final_k: int = 5
) -> List[Tuple[str, float]]:
"""
Pipeline completa:
1. Retrieval veloce con bi-encoder (top-50)
2. Reranking preciso con cross-encoder (top-5)
"""
t0 = time.time()
# STAGE 1: Retrieval veloce
query_emb = self.bi_encoder.encode([query], normalize_embeddings=True)
scores, indices = self.index.search(query_emb.astype('float32'), initial_k)
candidates = [
(self.documents[i], float(s))
for s, i in zip(scores[0], indices[0])
if i != -1
]
t1 = time.time()
print(f"Retrieval: {(t1-t0)*1000:.1f}ms - {len(candidates)} candidati")
# STAGE 2: Reranking preciso con cross-encoder
if not candidates:
return []
# Prepara coppie (query, documento) per il cross-encoder
cross_pairs = [(query, doc) for doc, _ in candidates]
cross_scores = self.cross_encoder.predict(cross_pairs)
# Combina e riordina
reranked = sorted(
zip([doc for doc, _ in candidates], cross_scores),
key=lambda x: x[1],
reverse=True
)
t2 = time.time()
print(f"Reranking: {(t2-t1)*1000:.1f}ms")
return reranked[:final_k]
def reciprocal_rank_fusion(
self,
results_list: List[List[Tuple[str, float]]],
k: int = 60
) -> List[Tuple[str, float]]:
"""
Reciprocal Rank Fusion: combina risultati da multiple strategie
di retrieval (es. dense + sparse) in un singolo ranking.
Formula: score = sum(1 / (k + rank_i)) per ogni lista i
"""
doc_scores = {}
for results in results_list:
for rank, (doc, _) in enumerate(results):
if doc not in doc_scores:
doc_scores[doc] = 0.0
doc_scores[doc] += 1.0 / (k + rank + 1)
return sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
再ランキング用のクロスエンコーダー モデル
| モデル | スピード | 品質 | 推奨される使用方法 |
|---|---|---|---|
| クロスエンコーダー/ms-marco-MiniLM-L-6-v2 | 高い | 良い | 本番環境、レイテンシが重要 |
| クロスエンコーダー/ms-marco-electra-base | 平均 | 素晴らしい | バランスが良い |
| BAAI/bge-reranker-large | 低い | 素晴らしい | 最大の品質、重大ではない遅延 |
| Cohere再ランクAPI | API | 素晴らしい | 試作、予算あり |
4. レイテンシーとコストを考慮したインテリジェントなキャッシュ
実稼働 RAG システムでは、大部分のクエリが類似しているか、 同一です (FAQ、よくある質問など)。の セマンティックキャッシング それはさらに進みます 正確なキャッシュ (同一のクエリでのみキャッシュヒット) とクエリごとの結果の再利用 意味的には類似しているため、LLM 推論コストが大幅に削減されます。
import redis
import numpy as np
import json
import hashlib
from sentence_transformers import SentenceTransformer
from typing import Optional, Tuple
import time
class SemanticCache:
"""Cache semantica che riusa risposte per query simili"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
embedding_model: str = "all-MiniLM-L6-v2",
similarity_threshold: float = 0.95,
ttl_seconds: int = 3600 # 1 ora
):
self.redis = redis.from_url(redis_url)
self.model = SentenceTransformer(embedding_model)
self.threshold = similarity_threshold
self.ttl = ttl_seconds
def _get_cache_key(self, text: str) -> str:
return f"rag_cache:{hashlib.md5(text.encode()).hexdigest()}"
def get(self, query: str) -> Optional[Tuple[str, float]]:
"""
Cerca in cache:
1. Prima cerca match esatto (O(1))
2. Poi cerca match semantico (O(n) - ottimizzabile con FAISS)
Ritorna (risposta, similarity_score) o None
"""
# Cache esatto
exact_key = self._get_cache_key(query)
cached = self.redis.get(exact_key)
if cached:
data = json.loads(cached)
return data['response'], 1.0
# Cache semantico
query_emb = self.model.encode([query], normalize_embeddings=True)[0]
# Ottimizzazione: usa Redis SCAN per iterare sulle chiavi cached
# In produzione, usa un indice FAISS separato per il lookup semantico
best_score = 0.0
best_response = None
for key in self.redis.scan_iter("rag_cache:*"):
cached = self.redis.get(key)
if not cached:
continue
data = json.loads(cached)
cached_emb = np.array(data['embedding'])
sim = float(np.dot(query_emb, cached_emb))
if sim > best_score:
best_score = sim
best_response = data['response']
if best_score >= self.threshold:
return best_response, best_score
return None
def set(self, query: str, response: str):
"""Salva la risposta in cache con l'embedding della query"""
query_emb = self.model.encode([query], normalize_embeddings=True)[0]
key = self._get_cache_key(query)
data = {
'query': query,
'response': response,
'embedding': query_emb.tolist(),
'timestamp': time.time()
}
self.redis.setex(key, self.ttl, json.dumps(data))
def get_stats(self) -> dict:
"""Statistiche del cache"""
keys = list(self.redis.scan_iter("rag_cache:*"))
return {
'total_cached': len(keys),
'memory_bytes': sum(
self.redis.memory_usage(k) or 0 for k in keys[:100]
)
}
# Integrazione nel RAG pipeline
class CachedRAGPipeline:
def __init__(self, rag_pipeline, cache: SemanticCache):
self.rag = rag_pipeline
self.cache = cache
self.hits = 0
self.misses = 0
def query(self, question: str) -> dict:
# 1. Prova cache
cached = self.cache.get(question)
if cached:
response, sim = cached
self.hits += 1
return {
"answer": response,
"cached": True,
"similarity": sim,
"latency_ms": 1 # quasi istantaneo
}
# 2. Full RAG pipeline
t0 = time.time()
response = self.rag.generate(question)
latency = (time.time() - t0) * 1000
# 3. Salva in cache per future query simili
self.cache.set(question, response)
self.misses += 1
return {
"answer": response,
"cached": False,
"similarity": None,
"latency_ms": latency
}
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
5. 監視と可観測性
実稼働環境の RAG システムは、すべてのレベルで監視可能である必要があります。十分ではありません HTTP 遅延を監視する: 取得の品質、速度を測定する必要があります。 幻覚、ユーザーの満足度、クエリごとのコスト。
5.1 インフラストラクチャのメトリクス
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps
# Metriche infrastrutturali
rag_queries_total = Counter(
'rag_queries_total',
'Numero totale di query RAG',
['status', 'cached']
)
rag_query_duration = Histogram(
'rag_query_duration_seconds',
'Durata delle query RAG',
['component'],
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
)
rag_retrieval_chunks = Histogram(
'rag_retrieval_chunks',
'Numero di chunks recuperati per query',
buckets=[1, 3, 5, 10, 20, 50]
)
rag_retrieval_score = Histogram(
'rag_retrieval_score',
'Score di rilevanza del top-1 chunk',
buckets=[0.1, 0.3, 0.5, 0.7, 0.8, 0.9, 0.95, 1.0]
)
rag_cache_hit_rate = Gauge(
'rag_cache_hit_rate',
'Tasso di hit del semantic cache'
)
rag_llm_tokens_total = Counter(
'rag_llm_tokens_total',
'Totale token LLM utilizzati',
['type'] # 'prompt' o 'completion'
)
# Metriche qualità (aggiornate da evaluation service)
rag_faithfulness_score = Gauge(
'rag_faithfulness_score',
'Score medio di faithfulness (risposta supportata da contesto)'
)
rag_answer_relevance = Gauge(
'rag_answer_relevance',
'Score medio di rilevanza della risposta'
)
# Decorator per timing automatico
def track_timing(component: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
rag_queries_total.labels(status='success', cached='false').inc()
return result
except Exception as e:
rag_queries_total.labels(status='error', cached='false').inc()
raise
finally:
duration = time.time() - start
rag_query_duration.labels(component=component).observe(duration)
return wrapper
return decorator
# Esempio utilizzo
class MonitoredRAGService:
@track_timing('full_query')
def query(self, question: str) -> dict:
# Retrieval
with rag_query_duration.labels(component='retrieval').time():
chunks = self.retrieve(question)
rag_retrieval_chunks.observe(len(chunks))
if chunks:
rag_retrieval_score.observe(chunks[0][1]) # score top-1
# Generation
with rag_query_duration.labels(component='generation').time():
response = self.generate(question, chunks)
return response
5.2 RAG の構造化ロギング
import structlog
import uuid
from contextlib import contextmanager
# Configura structlog per output JSON
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_log_level,
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
class TracedRAGPipeline:
"""RAG pipeline con tracing distribuito"""
def query(self, question: str, user_id: str = None) -> dict:
trace_id = str(uuid.uuid4())
log = logger.bind(
trace_id=trace_id,
user_id=user_id,
question_hash=hashlib.md5(question.encode()).hexdigest()[:8]
)
log.info("rag_query_start", question_length=len(question))
# Retrieval
t0 = time.time()
try:
chunks = self.retrieve(question)
retrieval_time = time.time() - t0
log.info(
"rag_retrieval_complete",
chunks_retrieved=len(chunks),
top_score=chunks[0][1] if chunks else 0,
retrieval_ms=retrieval_time * 1000
)
except Exception as e:
log.error("rag_retrieval_error", error=str(e))
raise
# Generation
t1 = time.time()
try:
response = self.generate(question, chunks)
generation_time = time.time() - t1
log.info(
"rag_generation_complete",
response_length=len(response),
generation_ms=generation_time * 1000,
total_ms=(time.time() - t0) * 1000
)
except Exception as e:
log.error("rag_generation_error", error=str(e))
raise
return {
"answer": response,
"trace_id": trace_id,
"chunks_used": len(chunks),
"total_ms": (time.time() - t0) * 1000
}
6. RAGASによる自動評価
ラガス (RAG 評価) は最も普及している評価フレームワークです 自動 RAG システム。品質の 4 つの基本的な側面を測定します。
- 忠実: 応答は取得されたコンテキストによってサポートされていますか? (幻覚防止)
- 回答の関連性: 答えは質問に関連していますか?
- コンテキストリコール: 取得したコンテキストには必要な情報が含まれていますか?
- コンテキストの正確さ: 回復されたチャンクはすべて関連していますか?
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_recall,
context_precision,
answer_correctness
)
from datasets import Dataset
from typing import List, Dict
import pandas as pd
class RAGEvaluator:
"""Valutazione automatica sistema RAG con RAGAS"""
def __init__(self, rag_pipeline):
self.rag = rag_pipeline
def create_evaluation_dataset(
self,
test_questions: List[str],
ground_truths: List[str]
) -> Dataset:
"""
Crea dataset di valutazione processando le domande con il RAG.
Ogni row contiene: domanda, risposta, contesti recuperati, verita.
"""
questions = []
answers = []
contexts = []
for question, gt in zip(test_questions, ground_truths):
# Recupera contesti
chunks = self.rag.retrieve(question, top_k=5)
context_list = [chunk for chunk, _ in chunks]
# Genera risposta
answer = self.rag.generate(question)
questions.append(question)
answers.append(answer)
contexts.append(context_list)
return Dataset.from_dict({
"question": questions,
"answer": answers,
"contexts": contexts,
"ground_truth": ground_truths
})
def evaluate_pipeline(
self,
test_questions: List[str],
ground_truths: List[str]
) -> pd.DataFrame:
"""
Valuta il pipeline RAG e ritorna un report dettagliato.
"""
dataset = self.create_evaluation_dataset(test_questions, ground_truths)
results = evaluate(
dataset,
metrics=[
faithfulness, # 0-1: risposta supportata da contesto?
answer_relevancy, # 0-1: risposta pertinente alla domanda?
context_recall, # 0-1: contesto copre la ground truth?
context_precision, # 0-1: contesto è tutto pertinente?
answer_correctness # 0-1: risposta corretta vs ground truth?
]
)
# Report
df = results.to_pandas()
print("\n=== RAGAS Evaluation Report ===")
print(f"Faithfulness: {df['faithfulness'].mean():.3f}")
print(f"Answer Relevancy: {df['answer_relevancy'].mean():.3f}")
print(f"Context Recall: {df['context_recall'].mean():.3f}")
print(f"Context Precision:{df['context_precision'].mean():.3f}")
print(f"Answer Correct.: {df['answer_correctness'].mean():.3f}")
# Identifica casi problematici
low_faith = df[df['faithfulness'] < 0.5]
if len(low_faith) > 0:
print(f"\nATTENZIONE: {len(low_faith)} domande con faithfulness bassa:")
for _, row in low_faith.iterrows():
print(f" Q: {row['question'][:80]}...")
return df
def continuous_evaluation(self, sample_rate: float = 0.05):
"""
Valutazione continua in produzione: campiona il 5% delle query
e le valuta automaticamente per rilevare degradazione.
"""
import random
def evaluate_sample(question: str, answer: str, contexts: List[str]):
if random.random() > sample_rate:
return
# Stima qualità senza ground truth usando LLM-as-judge
from openai import OpenAI
client = OpenAI()
judge_prompt = f"""Valuta la qualità di questa risposta RAG.
Domanda: {question}
Contesti recuperati: {" | ".join(contexts[:2])}
Risposta generata: {answer}
Valuta su scala 1-5:
1. La risposta è supportata dai contesti? (faithfulness)
2. La risposta è pertinente alla domanda? (relevance)
Rispondi SOLO con JSON: {"faithfulness": X, "relevance": X}"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": judge_prompt}],
temperature=0
)
try:
import json
scores = json.loads(response.choices[0].message.content)
# Aggiorna metriche Prometheus
rag_faithfulness_score.set(scores['faithfulness'] / 5.0)
rag_answer_relevance.set(scores['relevance'] / 5.0)
except:
pass
return evaluate_sample
# Test set di esempio
test_questions = [
"Cos'è il RAG e come funziona?",
"Qual è la differenza tra BERT e Sentence Transformers?",
"Come si sceglie un vector database per la produzione?"
]
ground_truths = [
"RAG (Retrieval-Augmented Generation) combina la ricerca su basi di conoscenza con la generazione LLM per ridurre le allucinazioni.",
"BERT produce embeddings contestuali per token, mentre Sentence Transformers è ottimizzato per produrre embeddings a livello di frase per la similarity search.",
"La scelta dipende da scala, requisiti di latenza, budget e se serve hosting gestito o self-hosted."
]
7. コーパス更新の管理
本番環境の RAG システムは、時間の経過とともに変化するコーパス、つまり新しいドキュメントを管理する必要があります。 追加される、古いドキュメントが削除される、既存のドキュメントが削除される 更新されました。これが問題です コーパス管理.
from qdrant_client import QdrantClient
from qdrant_client.models import (
PointStruct, UpdateStatus, Filter, FieldCondition, MatchValue
)
from sentence_transformers import SentenceTransformer
import hashlib
import time
from typing import List, Dict, Optional
class IncrementalRAGIndex:
"""Gestione aggiornamenti incrementali del corpus RAG"""
def __init__(
self,
collection_name: str = "rag_corpus",
embedding_model: str = "all-MiniLM-L6-v2"
):
self.client = QdrantClient(url="http://localhost:6333")
self.model = SentenceTransformer(embedding_model)
self.collection = collection_name
def _doc_hash(self, text: str) -> str:
"""Hash deterministico per deduplicazione"""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def upsert_document(
self,
doc_id: str,
text: str,
metadata: Dict,
chunk_size: int = 512
) -> int:
"""
Upsert (insert o update) di un documento.
Se il documento è già presente e non modificato, skippa.
"""
# Calcola hash per deduplication
content_hash = self._doc_hash(text)
# Controlla se già presente e non modificato
existing = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
),
limit=1
)
if existing[0] and existing[0][0].payload.get('content_hash') == content_hash:
return 0 # Nessun aggiornamento necessario
# Rimuovi vecchi chunks del documento
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
)
)
# Crea nuovi chunks
words = text.split()
chunk_words = chunk_size // 5 # ~5 chars/word
chunks = [
' '.join(words[i:i+chunk_words])
for i in range(0, len(words), chunk_words - 10) # 10 word overlap
if words[i:i+chunk_words]
]
if not chunks:
return 0
# Genera embeddings
embeddings = self.model.encode(chunks, normalize_embeddings=True)
# Crea points
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point_id = int(hashlib.md5(f"{doc_id}_{i}".encode()).hexdigest()[:8], 16)
points.append(PointStruct(
id=point_id,
vector=embedding.tolist(),
payload={
**metadata,
"doc_id": doc_id,
"chunk_index": i,
"content_hash": content_hash,
"text": chunk,
"updated_at": time.time()
}
))
# Upsert in Qdrant
self.client.upsert(
collection_name=self.collection,
points=points
)
return len(chunks)
def delete_document(self, doc_id: str):
"""Rimuove tutti i chunks di un documento"""
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
)
)
def get_corpus_stats(self) -> Dict:
"""Statistiche sul corpus"""
info = self.client.get_collection(self.collection)
return {
"total_chunks": info.points_count,
"index_status": info.status,
"vectors_config": str(info.config.params.vectors)
}
8. 本番環境におけるベストプラクティスとアンチパターン
RAG 本番対応チェックリスト
- チャンク化: チャンク サイズ 400 ~ 600 のトークンを 10 ~ 15% の重複で使用します。特定のコーパスでさまざまな戦略をテストする
- 再ランキング: 精度が重要なクエリには常にクロスエンコーダを実装します。追加の遅延 (100 ~ 300 ミリ秒) を受け入れます
- キャッシング: しきい値 0.92 ~ 0.97 のセマンティック キャッシュにより、FAQ のようなコーパスで LLM コストが 30 ~ 60% 削減されます。
- 監視: 忠実度、回答の関連性、取得待ち時間、クエリごとの LLM トークン コストを追跡します。
- 評価: ゴールデン テスト セット (グラウンド トゥルースを含む 100 ~ 200 の質問) を維持し、展開ごとに評価する
- フォールバック: 検索で関連性のあるものが何も見つからなかった場合 (最高スコア < 0.5)、幻覚を見る代わりに「わかりません」と宣言します。
- バージョン管理: 埋め込みテンプレートのバージョンを変更し、変更時にインデックスを再作成します。移行中にインデックスの並列性を維持する
本番環境で避けるべきアンチパターン
- 品質監視なし: 遅延と稼働時間を監視するだけでは十分ではありません。 RAG は技術的には「機能」しますが、間違った答えを返す可能性があります。
- コーパスが更新されていません: 古いドキュメントを使用する RAG システムは、RAG を使用しないよりも悪く、誤った情報にも自信を持って応答します。
- 固定トップ k: 取得されるチャンクの数をクエリの長さに合わせて調整します。複雑なクエリにはより多くのコンテキストが必要です。
- 埋め込みレイテンシを無視する: クエリの埋め込みの生成には 10 ~ 50 ミリ秒かかります。 1000 req/s を掛けると、ボトルネックになります。
- 絶対的な判断者としてのLLM: 生成モデルは、RAG であってもコンテキストを無視して「幻覚」を起こす可能性があります。応答のためのガードレールを実装します。
結論
Bringing a RAG system to production requires much more than just a pipeline 順次。 We've seen how production-ready architecture separates plans ingestion and queries, how advanced chunking directly impacts quality, how cross-encoder reranking improves retrieval accuracy, and how monitoring with RAGAS allows you to track quality over time.
重要なポイント:
- 取り込みパスとクエリ パスを常に分離します - それぞれの要件は根本的に異なります
- チャンキングへの投資: チャンキングは最も影響力があり、見落とされがちな変数です
- 精度が重要なユースケース向けにクロスエンコーダーの再ランキングを実装する
- セマンティック キャッシュを使用して、反復的なクエリのコストと待機時間を削減します。
- 遅延や稼働時間だけでなく、忠実性と回答の関連性を測定します
- RAGAS を使用してゴールデン テスト セットを維持し、展開ごとに評価します
次の記事では、詳しく見ていきます RAG 用の LangChain: フレームワーク LLM アプリケーションの構築に最も一般的で、次のような高度なパターンに重点を置いています。 会話型 RAG、マルチホップ検索、ツール呼び出し。
シリーズは続く
- 記事 1: RAG の説明 - 基礎
- 第 2 条: 埋め込みとセマンティック検索
- 記事 3: ベクター データベース - Qdrant と松ぼっくり
- 第 4 条: ハイブリッド検索: BM25 + Vector
- 第 5 条: 運用中の RAG (現在)
- 第 6 条: RAG 用の LangChain
さらに詳しく: RAG 用の pgvector PostgreSQL 上で e MLOps: モデルの提供 生産中.







