RAG in productie: architectuur, schaling en monitoring
Het bouwen van een RAG-prototype dat lokaal werkt, is relatief eenvoudig. Breng het binnen productie, waar het duizenden gelijktijdige vragen moet verwerken, reageert in minder dan twee seconden, behoud van hoge kwaliteit in de loop van de tijd en geen gegevensverlies, en iets heel anders. De afstand tussen "werkt op mijn laptop" en "werkt in productie voor 10.000 gebruikers" is enorm, en veel RAG-projecten mislukken op dit punt.
In dit artikel gaan we in op de echte uitdagingen van RAG in productie: schaalbare architectuur, optimale chunking, herrangschikking, updatebeheer aan het corpus, monitoring met RAG-specifieke statistieken en automatische evaluatie van de kwaliteit met raamwerken zoals RAGAS. Het gaat niet om theorie: elke sectie bevat Uitvoerbare Python-code en patronen getest op echte systemen.
Wat je gaat leren
- Productieklare architectuur van een schaalbaar RAG-systeem
- Geavanceerde chunking-strategieën (recursief, semantisch, zinsvenster)
- Cross-encoder herschikt de pijplijn om de nauwkeurigheid te verbeteren
- Beheer van incrementele vectorcorpusupdates
- Monitoring met RAG-specifieke statistieken (trouw, relevantie, herinnering)
- Automatische evaluatie met RAGAS-framework
- Intelligente caching om de latentie en kosten te optimaliseren
- Foutafhandeling en sierlijke degradatie in de productie
Serieoverzicht
| # | Item | Focus |
|---|---|---|
| 1 | RAG uitgelegd | Funderingen en architectuur |
| 2 | Inbedding en semantisch zoeken | BERT, SBERT, FAISS |
| 3 | Vectordatabase | Qdrant, Dennenappel, Milvus |
| 4 | Hybride ophalen | BM25 + vector zoeken |
| 5 | RAG in productie (u bent hier) | Schalen, monitoren, evalueren |
| 6 | LangChain voor RAG | Geavanceerde raamwerken en patronen |
| 7 | Beheer van contextvensters | Optimaliseer LLM-invoer |
| 8 | Multi-agentsystemen | Orkestratie en coördinatie |
| 9 | Snelle engineering in productie | Sjabloon, versiebeheer, testen |
| 10 | Kennisgrafieken voor AI | Gestructureerde kennis in LLM's |
1. Architectuur van een productieklaar RAG-systeem
Een RAG-systeem in productie is geen eenvoudige sequentiële pijplijn: het is een systeem gedistribueerd met gespecialiseerde componenten, elk met schaalbaarheidsvereisten, verschillende fouttolerantie en monitoring. Begrijp de volledige architectuur en de eerste stap om iets te bouwen dat standhoudt in de productie.
ARCHITETTURA RAG PRODUCTION
┌─────────────────────────────────────────────────────────┐
│ API GATEWAY │
│ (Rate limiting, auth, routing) │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
┌──────▼──────┐ ┌───────▼────────┐
│ QUERY │ │ INGESTION │
│ SERVICE │ │ SERVICE │
└──────┬──────┘ └───────┬────────┘
│ │
┌───────▼──────┐ ┌───────▼────────┐
│ RETRIEVAL │ │ DOCUMENT │
│ ENGINE │ │ PROCESSOR │
│ ┌─────────┐ │ │ ┌──────────┐ │
│ │Embedding│ │ │ │Chunking │ │
│ │Cache │ │ │ │Embedding │ │
│ └────┬────┘ │ │ │Indexing │ │
│ │ │ │ └──────────┘ │
│ ┌────▼────┐ │ └───────┬────────┘
│ │Vector │ │ │
│ │Search │ │ ┌───────▼────────┐
│ └────┬────┘ │ │ VECTOR DB │
│ │ │ │ (Qdrant/Pine) │
│ ┌────▼────┐ │ └────────────────┘
│ │Reranker │ │
│ └────┬────┘ │
└───────┼──────┘
│
┌───────▼──────┐ ┌────────────────┐
│ GENERATION │ │ CACHE │
│ SERVICE │◄──►│ (Redis/ │
│ (LLM) │ │ Semantic) │
└───────┬──────┘ └────────────────┘
│
┌───────▼──────┐ ┌────────────────┐
│ MONITORING │ │ EVALUATION │
│ SERVICE │ │ SERVICE │
│ (Prometheus)│ │ (RAGAS) │
└──────────────┘ └────────────────┘
1.1 Scheiding van zorgen: inname versus vraag
Het fundamentele patroon in een RAG-productiesysteem is het scheiding tussen het opnameplan en het queryplan. Deze twee paden hebben vereisten heel anders:
Inname versus zoekopdracht: verschillende vereisten
| Maat | Innamepad | Querypad |
|---|---|---|
| Latentie | Niet kritisch (batch) | Kritiek (<2s p95) |
| Doorvoer | Laag-medium (documenten) | Hoog (duizenden vereiste/s) |
| CPU/GPU | Generatie inbedden (GPU) | Query's insluiten + opnieuw rangschikken (GPU) |
| Fouten | Probeer het opnieuw met uitstel | Terugval sierlijk |
| Schalen | Horizontale batch | Horizontaal staatloos |
2. Geavanceerde chunkingstrategieën
Chunking is waarschijnlijk de meest over het hoofd geziene variabele in RAG-systemen, maar toch is dat zo een enorme impact op de uiteindelijke kwaliteit. Een te klein deel verliest context; één te groot introduceert ruis en overschrijdt de context van het inbeddingsmodel.
2.1 Recursieve tekentekstsplitter
De meest effectieve methode voor de meeste gebruikssituaties is de recursieve karaktersplitsing: verdeelt de tekst progressief over scheidingstekens fijner (paragrafen, zinnen, woorden) om de natuurlijke structuur van het document te respecteren.
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict, Any
import re
class AdvancedChunker:
"""Chunker avanzato con metadati e strategie multiple"""
def __init__(
self,
chunk_size: int = 512,
chunk_overlap: int = 64,
strategy: str = "recursive"
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.strategy = strategy
self.recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", " ", ""],
length_function=len,
is_separator_regex=False
)
def chunk_with_metadata(
self,
text: str,
doc_metadata: Dict[str, Any]
) -> List[Dict]:
"""Crea chunks con metadati completi per il retrieval"""
if self.strategy == "recursive":
chunks = self.recursive_splitter.split_text(text)
elif self.strategy == "semantic":
chunks = self._semantic_split(text)
elif self.strategy == "sentence_window":
chunks = self._sentence_window_split(text)
else:
raise ValueError(f"Strategia non supportata: {self.strategy}")
result = []
for i, chunk in enumerate(chunks):
chunk_meta = {
**doc_metadata,
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size": len(chunk),
"strategy": self.strategy,
# Snippet per il contesto nella risposta
"prev_chunk": chunks[i-1][:100] if i > 0 else None,
"next_chunk": chunks[i+1][:100] if i < len(chunks)-1 else None,
}
result.append({"text": chunk, "metadata": chunk_meta})
return result
def _sentence_window_split(
self,
text: str,
window_size: int = 3
) -> List[str]:
"""
Sentence Window: indicizza singole frasi ma recupera
le frasi vicine per mantenere il contesto.
Tecnica avanzata che migliora il recall mantenendo la precisione.
"""
# Split in frasi
sentences = re.split(r'(?<=[.!?])\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks = []
for i, sentence in enumerate(sentences):
# Finestra di contesto: frase centrale + vicine
start = max(0, i - window_size // 2)
end = min(len(sentences), i + window_size // 2 + 1)
window = " ".join(sentences[start:end])
chunks.append(window)
return chunks
def _semantic_split(self, text: str) -> List[str]:
"""
Semantic split: divide in base ai cambi di topic
usando embeddings per individuare boundary semantici.
"""
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
sentences = re.split(r'(?<=[.!?])\s+', text)
if len(sentences) < 3:
return [text]
# Calcola embeddings di ogni frase
embeddings = model.encode(sentences)
# Calcola similarità tra frasi consecutive
similarities = []
for i in range(len(embeddings) - 1):
sim = np.dot(embeddings[i], embeddings[i+1]) / (
np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i+1])
)
similarities.append(sim)
# Trova boundary semantici (bassa similarità = cambio di topic)
threshold = np.mean(similarities) - np.std(similarities)
boundaries = [0]
for i, sim in enumerate(similarities):
if sim < threshold:
boundaries.append(i + 1)
boundaries.append(len(sentences))
# Crea chunks dai segmenti
chunks = []
for i in range(len(boundaries) - 1):
segment = " ".join(sentences[boundaries[i]:boundaries[i+1]])
if len(segment) > self.chunk_size * 2:
# Chunk troppo grande: dividi ulteriormente
sub_chunks = self.recursive_splitter.split_text(segment)
chunks.extend(sub_chunks)
elif segment.strip():
chunks.append(segment)
return chunks
# Esempio di utilizzo
chunker = AdvancedChunker(chunk_size=512, chunk_overlap=64, strategy="recursive")
document = """
Cos'è il RAG?
Il Retrieval-Augmented Generation (RAG) è un'architettura che combina la ricerca su basi
di conoscenza con la generazione di testo degli LLM. Il principio fondamentale è semplice:
invece di lasciare che l'LLM risponda solo con la sua conoscenza interna (soggetta ad
allucinazioni), RAG prima recupera documenti rilevanti e poi li fornisce come contesto.
Come funziona il retrieval?
Il processo di retrieval avviene in due fasi. Prima, i documenti vengono convertiti in
embeddings vettoriali e memorizzati in un vector database. Poi, la query dell'utente viene
anch'essa convertita in un embedding e viene effettuata una ricerca di similarità per
trovare i documenti più rilevanti.
"""
chunks = chunker.chunk_with_metadata(
document,
doc_metadata={"source": "rag-intro.txt", "author": "AI Team"}
)
for chunk in chunks:
print(f"Chunk {chunk['metadata']['chunk_index']}: {len(chunk['text'])} chars")
print(f" {chunk['text'][:100]}...")
2.2 Ouder-kind chunking
Een van de meest effectieve strategieën voor geavanceerde RAG-systemen is de Ouder-kind chunking: kleine stukjes (kinderen) worden geïndexeerd voor de nauwkeurigheid van het ophalen, maar grote stukken (ouder) worden geretourneerd om te leveren voldoende context voor de LLM.
from langchain.storage import InMemoryStore
from langchain.retrievers import ParentDocumentRetriever
from langchain.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Splitter per chunks grandi (parent) - per il contesto
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
# Splitter per chunks piccoli (child) - per l'indicizzazione
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40)
# Storage: chunks grandi in memory store, chunks piccoli in vector store
vectorstore = Qdrant.from_documents(
[], # inizialmente vuoto
OpenAIEmbeddings(),
url="http://localhost:6333",
collection_name="rag_child_chunks"
)
docstore = InMemoryStore() # o Redis per persistenza
# Parent-Child Retriever
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=docstore,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# Come funziona:
# 1. add_documents() divide i documenti in parent e child chunks
# 2. I child chunks vengono indicizzati nel vector store
# 3. I parent chunks vengono salvati nel docstore con un ID univoco
# 4. I child chunks hanno un metadato "doc_id" che punta al parent
# Retrieval:
# 1. Cerca i child chunks più simili alla query
# 2. Recupera i parent chunks corrispondenti
# Risultato: massima precisione (child) + massimo contesto (parent)
3. Herschikking: verbeter de nauwkeurigheid van het ophalen
Il herrangschikken het is een techniek die de kwaliteit aanzienlijk verbetert van het ophalen door een tweede, nauwkeuriger (maar langzamer) model op de resultaten toe te passen initialen. De typische stroom is: haal 50-100 kandidaten op met snel zoeken naar vectoren, Dan opnieuw ordenen met nauwkeurige cross-encoder, Uiteindelijk haal de top-k's.
3.1 Bi-encoder versus cross-encoder
Het verschil tussen de twee benaderingen is fundamenteel:
- Bi-encoder (ophalen): codeer zoekopdrachten en documenten afzonderlijk in vectoren. Zeer snel op te halen omdat de documenten vooraf zijn berekend, maar minder nauwkeurig omdat de query niet wordt gezien tijdens het coderen van de documenten.
- Cross-encoder (herrangschikking): neemt query + document aaneengeschakeld en produceert een relevantiescore. Veel nauwkeuriger, maar niet zo schaalbaar als ophalen, omdat het elk query-documentpaar in realtime moet verwerken.
from sentence_transformers import CrossEncoder
from typing import List, Tuple
import time
class RerankingRetriever:
"""Retriever con reranking a due stadi"""
def __init__(
self,
bi_encoder, # Sentence Transformer per retrieval veloce
vector_index, # FAISS o vector DB
cross_encoder_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
):
self.bi_encoder = bi_encoder
self.index = vector_index
self.cross_encoder = CrossEncoder(cross_encoder_name)
self.documents = []
def retrieve_and_rerank(
self,
query: str,
initial_k: int = 50,
final_k: int = 5
) -> List[Tuple[str, float]]:
"""
Pipeline completa:
1. Retrieval veloce con bi-encoder (top-50)
2. Reranking preciso con cross-encoder (top-5)
"""
t0 = time.time()
# STAGE 1: Retrieval veloce
query_emb = self.bi_encoder.encode([query], normalize_embeddings=True)
scores, indices = self.index.search(query_emb.astype('float32'), initial_k)
candidates = [
(self.documents[i], float(s))
for s, i in zip(scores[0], indices[0])
if i != -1
]
t1 = time.time()
print(f"Retrieval: {(t1-t0)*1000:.1f}ms - {len(candidates)} candidati")
# STAGE 2: Reranking preciso con cross-encoder
if not candidates:
return []
# Prepara coppie (query, documento) per il cross-encoder
cross_pairs = [(query, doc) for doc, _ in candidates]
cross_scores = self.cross_encoder.predict(cross_pairs)
# Combina e riordina
reranked = sorted(
zip([doc for doc, _ in candidates], cross_scores),
key=lambda x: x[1],
reverse=True
)
t2 = time.time()
print(f"Reranking: {(t2-t1)*1000:.1f}ms")
return reranked[:final_k]
def reciprocal_rank_fusion(
self,
results_list: List[List[Tuple[str, float]]],
k: int = 60
) -> List[Tuple[str, float]]:
"""
Reciprocal Rank Fusion: combina risultati da multiple strategie
di retrieval (es. dense + sparse) in un singolo ranking.
Formula: score = sum(1 / (k + rank_i)) per ogni lista i
"""
doc_scores = {}
for results in results_list:
for rank, (doc, _) in enumerate(results):
if doc not in doc_scores:
doc_scores[doc] = 0.0
doc_scores[doc] += 1.0 / (k + rank + 1)
return sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
Cross-Encoder-modellen voor herrangschikking
| Model | Snelheid | kwaliteit | Aanbevolen gebruik |
|---|---|---|---|
| cross-encoder/ms-marco-MiniLM-L-6-v2 | Hoog | Goed | Productie, latentie belangrijk |
| cross-encoder/ms-marco-electra-base | Gemiddeld | Uitstekend | Goede balans |
| BAAI/bge-reranker-groot | Laag | Uitstekend | Maximale kwaliteit, niet-kritieke latentie |
| Cohere Rerank-API | API's | Uitstekend | Prototyping, budget beschikbaar |
4. Intelligente caching voor latentie en kosten
In een productie-RAG-systeem is een groot percentage van de zoekopdrachten vergelijkbaar identiek (bijvoorbeeld FAQ, veelgestelde vragen). De semantische caching het gaat verder exacte cache (cache-hit alleen bij identieke zoekopdrachten) en hergebruik van resultaten per zoekopdracht semantisch vergelijkbaar, waardoor de LLM-inferentiekosten dramatisch worden verlaagd.
import redis
import numpy as np
import json
import hashlib
from sentence_transformers import SentenceTransformer
from typing import Optional, Tuple
import time
class SemanticCache:
"""Cache semantica che riusa risposte per query simili"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
embedding_model: str = "all-MiniLM-L6-v2",
similarity_threshold: float = 0.95,
ttl_seconds: int = 3600 # 1 ora
):
self.redis = redis.from_url(redis_url)
self.model = SentenceTransformer(embedding_model)
self.threshold = similarity_threshold
self.ttl = ttl_seconds
def _get_cache_key(self, text: str) -> str:
return f"rag_cache:{hashlib.md5(text.encode()).hexdigest()}"
def get(self, query: str) -> Optional[Tuple[str, float]]:
"""
Cerca in cache:
1. Prima cerca match esatto (O(1))
2. Poi cerca match semantico (O(n) - ottimizzabile con FAISS)
Ritorna (risposta, similarity_score) o None
"""
# Cache esatto
exact_key = self._get_cache_key(query)
cached = self.redis.get(exact_key)
if cached:
data = json.loads(cached)
return data['response'], 1.0
# Cache semantico
query_emb = self.model.encode([query], normalize_embeddings=True)[0]
# Ottimizzazione: usa Redis SCAN per iterare sulle chiavi cached
# In produzione, usa un indice FAISS separato per il lookup semantico
best_score = 0.0
best_response = None
for key in self.redis.scan_iter("rag_cache:*"):
cached = self.redis.get(key)
if not cached:
continue
data = json.loads(cached)
cached_emb = np.array(data['embedding'])
sim = float(np.dot(query_emb, cached_emb))
if sim > best_score:
best_score = sim
best_response = data['response']
if best_score >= self.threshold:
return best_response, best_score
return None
def set(self, query: str, response: str):
"""Salva la risposta in cache con l'embedding della query"""
query_emb = self.model.encode([query], normalize_embeddings=True)[0]
key = self._get_cache_key(query)
data = {
'query': query,
'response': response,
'embedding': query_emb.tolist(),
'timestamp': time.time()
}
self.redis.setex(key, self.ttl, json.dumps(data))
def get_stats(self) -> dict:
"""Statistiche del cache"""
keys = list(self.redis.scan_iter("rag_cache:*"))
return {
'total_cached': len(keys),
'memory_bytes': sum(
self.redis.memory_usage(k) or 0 for k in keys[:100]
)
}
# Integrazione nel RAG pipeline
class CachedRAGPipeline:
def __init__(self, rag_pipeline, cache: SemanticCache):
self.rag = rag_pipeline
self.cache = cache
self.hits = 0
self.misses = 0
def query(self, question: str) -> dict:
# 1. Prova cache
cached = self.cache.get(question)
if cached:
response, sim = cached
self.hits += 1
return {
"answer": response,
"cached": True,
"similarity": sim,
"latency_ms": 1 # quasi istantaneo
}
# 2. Full RAG pipeline
t0 = time.time()
response = self.rag.generate(question)
latency = (time.time() - t0) * 1000
# 3. Salva in cache per future query simili
self.cache.set(question, response)
self.misses += 1
return {
"answer": response,
"cached": False,
"similarity": None,
"latency_ms": latency
}
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
5. Monitoring en waarneembaarheid
Een RAG-systeem in de productie moet op alle niveaus waarneembaar zijn. Het is niet genoeg HTTP-latentie bewaken: u moet de kwaliteit van het ophalen, de snelheid, meten hallucinaties, gebruikerstevredenheid en kosten per zoekopdracht.
5.1 Infrastructuurstatistieken
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps
# Metriche infrastrutturali
rag_queries_total = Counter(
'rag_queries_total',
'Numero totale di query RAG',
['status', 'cached']
)
rag_query_duration = Histogram(
'rag_query_duration_seconds',
'Durata delle query RAG',
['component'],
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
)
rag_retrieval_chunks = Histogram(
'rag_retrieval_chunks',
'Numero di chunks recuperati per query',
buckets=[1, 3, 5, 10, 20, 50]
)
rag_retrieval_score = Histogram(
'rag_retrieval_score',
'Score di rilevanza del top-1 chunk',
buckets=[0.1, 0.3, 0.5, 0.7, 0.8, 0.9, 0.95, 1.0]
)
rag_cache_hit_rate = Gauge(
'rag_cache_hit_rate',
'Tasso di hit del semantic cache'
)
rag_llm_tokens_total = Counter(
'rag_llm_tokens_total',
'Totale token LLM utilizzati',
['type'] # 'prompt' o 'completion'
)
# Metriche qualità (aggiornate da evaluation service)
rag_faithfulness_score = Gauge(
'rag_faithfulness_score',
'Score medio di faithfulness (risposta supportata da contesto)'
)
rag_answer_relevance = Gauge(
'rag_answer_relevance',
'Score medio di rilevanza della risposta'
)
# Decorator per timing automatico
def track_timing(component: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
rag_queries_total.labels(status='success', cached='false').inc()
return result
except Exception as e:
rag_queries_total.labels(status='error', cached='false').inc()
raise
finally:
duration = time.time() - start
rag_query_duration.labels(component=component).observe(duration)
return wrapper
return decorator
# Esempio utilizzo
class MonitoredRAGService:
@track_timing('full_query')
def query(self, question: str) -> dict:
# Retrieval
with rag_query_duration.labels(component='retrieval').time():
chunks = self.retrieve(question)
rag_retrieval_chunks.observe(len(chunks))
if chunks:
rag_retrieval_score.observe(chunks[0][1]) # score top-1
# Generation
with rag_query_duration.labels(component='generation').time():
response = self.generate(question, chunks)
return response
5.2 Gestructureerde registratie voor RAG
import structlog
import uuid
from contextlib import contextmanager
# Configura structlog per output JSON
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_log_level,
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
class TracedRAGPipeline:
"""RAG pipeline con tracing distribuito"""
def query(self, question: str, user_id: str = None) -> dict:
trace_id = str(uuid.uuid4())
log = logger.bind(
trace_id=trace_id,
user_id=user_id,
question_hash=hashlib.md5(question.encode()).hexdigest()[:8]
)
log.info("rag_query_start", question_length=len(question))
# Retrieval
t0 = time.time()
try:
chunks = self.retrieve(question)
retrieval_time = time.time() - t0
log.info(
"rag_retrieval_complete",
chunks_retrieved=len(chunks),
top_score=chunks[0][1] if chunks else 0,
retrieval_ms=retrieval_time * 1000
)
except Exception as e:
log.error("rag_retrieval_error", error=str(e))
raise
# Generation
t1 = time.time()
try:
response = self.generate(question, chunks)
generation_time = time.time() - t1
log.info(
"rag_generation_complete",
response_length=len(response),
generation_ms=generation_time * 1000,
total_ms=(time.time() - t0) * 1000
)
except Exception as e:
log.error("rag_generation_error", error=str(e))
raise
return {
"answer": response,
"trace_id": trace_id,
"chunks_used": len(chunks),
"total_ms": (time.time() - t0) * 1000
}
6. Automatische evaluatie met RAGAS
RAGAS (RAG Assessment) is het meest verspreide beoordelingskader automatische RAG-systemen. Het meet vier fundamentele dimensies van kwaliteit:
- Trouw: Wordt het antwoord ondersteund door de opgehaalde context? (anti-hallucinatie)
- Antwoordrelevantie: Is het antwoord relevant voor de vraag?
- Contextherinnering: Bevat de opgehaalde context de benodigde informatie?
- Contextnauwkeurigheid: Zijn de herstelde brokken allemaal relevant?
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_recall,
context_precision,
answer_correctness
)
from datasets import Dataset
from typing import List, Dict
import pandas as pd
class RAGEvaluator:
"""Valutazione automatica sistema RAG con RAGAS"""
def __init__(self, rag_pipeline):
self.rag = rag_pipeline
def create_evaluation_dataset(
self,
test_questions: List[str],
ground_truths: List[str]
) -> Dataset:
"""
Crea dataset di valutazione processando le domande con il RAG.
Ogni row contiene: domanda, risposta, contesti recuperati, verita.
"""
questions = []
answers = []
contexts = []
for question, gt in zip(test_questions, ground_truths):
# Recupera contesti
chunks = self.rag.retrieve(question, top_k=5)
context_list = [chunk for chunk, _ in chunks]
# Genera risposta
answer = self.rag.generate(question)
questions.append(question)
answers.append(answer)
contexts.append(context_list)
return Dataset.from_dict({
"question": questions,
"answer": answers,
"contexts": contexts,
"ground_truth": ground_truths
})
def evaluate_pipeline(
self,
test_questions: List[str],
ground_truths: List[str]
) -> pd.DataFrame:
"""
Valuta il pipeline RAG e ritorna un report dettagliato.
"""
dataset = self.create_evaluation_dataset(test_questions, ground_truths)
results = evaluate(
dataset,
metrics=[
faithfulness, # 0-1: risposta supportata da contesto?
answer_relevancy, # 0-1: risposta pertinente alla domanda?
context_recall, # 0-1: contesto copre la ground truth?
context_precision, # 0-1: contesto è tutto pertinente?
answer_correctness # 0-1: risposta corretta vs ground truth?
]
)
# Report
df = results.to_pandas()
print("\n=== RAGAS Evaluation Report ===")
print(f"Faithfulness: {df['faithfulness'].mean():.3f}")
print(f"Answer Relevancy: {df['answer_relevancy'].mean():.3f}")
print(f"Context Recall: {df['context_recall'].mean():.3f}")
print(f"Context Precision:{df['context_precision'].mean():.3f}")
print(f"Answer Correct.: {df['answer_correctness'].mean():.3f}")
# Identifica casi problematici
low_faith = df[df['faithfulness'] < 0.5]
if len(low_faith) > 0:
print(f"\nATTENZIONE: {len(low_faith)} domande con faithfulness bassa:")
for _, row in low_faith.iterrows():
print(f" Q: {row['question'][:80]}...")
return df
def continuous_evaluation(self, sample_rate: float = 0.05):
"""
Valutazione continua in produzione: campiona il 5% delle query
e le valuta automaticamente per rilevare degradazione.
"""
import random
def evaluate_sample(question: str, answer: str, contexts: List[str]):
if random.random() > sample_rate:
return
# Stima qualità senza ground truth usando LLM-as-judge
from openai import OpenAI
client = OpenAI()
judge_prompt = f"""Valuta la qualità di questa risposta RAG.
Domanda: {question}
Contesti recuperati: {" | ".join(contexts[:2])}
Risposta generata: {answer}
Valuta su scala 1-5:
1. La risposta è supportata dai contesti? (faithfulness)
2. La risposta è pertinente alla domanda? (relevance)
Rispondi SOLO con JSON: {"faithfulness": X, "relevance": X}"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": judge_prompt}],
temperature=0
)
try:
import json
scores = json.loads(response.choices[0].message.content)
# Aggiorna metriche Prometheus
rag_faithfulness_score.set(scores['faithfulness'] / 5.0)
rag_answer_relevance.set(scores['relevance'] / 5.0)
except:
pass
return evaluate_sample
# Test set di esempio
test_questions = [
"Cos'è il RAG e come funziona?",
"Qual è la differenza tra BERT e Sentence Transformers?",
"Come si sceglie un vector database per la produzione?"
]
ground_truths = [
"RAG (Retrieval-Augmented Generation) combina la ricerca su basi di conoscenza con la generazione LLM per ridurre le allucinazioni.",
"BERT produce embeddings contestuali per token, mentre Sentence Transformers è ottimizzato per produrre embeddings a livello di frase per la similarity search.",
"La scelta dipende da scala, requisiti di latenza, budget e se serve hosting gestito o self-hosted."
]
7. Beheer van Corpusupdates
Een RAG-systeem in productie moet een corpus beheren dat in de loop van de tijd verandert: nieuwe documenten worden toegevoegd, verouderde documenten worden verwijderd, bestaande documenten worden verwijderd bijgewerkt. Dit is het probleem met corpusbeheer.
from qdrant_client import QdrantClient
from qdrant_client.models import (
PointStruct, UpdateStatus, Filter, FieldCondition, MatchValue
)
from sentence_transformers import SentenceTransformer
import hashlib
import time
from typing import List, Dict, Optional
class IncrementalRAGIndex:
"""Gestione aggiornamenti incrementali del corpus RAG"""
def __init__(
self,
collection_name: str = "rag_corpus",
embedding_model: str = "all-MiniLM-L6-v2"
):
self.client = QdrantClient(url="http://localhost:6333")
self.model = SentenceTransformer(embedding_model)
self.collection = collection_name
def _doc_hash(self, text: str) -> str:
"""Hash deterministico per deduplicazione"""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def upsert_document(
self,
doc_id: str,
text: str,
metadata: Dict,
chunk_size: int = 512
) -> int:
"""
Upsert (insert o update) di un documento.
Se il documento è già presente e non modificato, skippa.
"""
# Calcola hash per deduplication
content_hash = self._doc_hash(text)
# Controlla se già presente e non modificato
existing = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
),
limit=1
)
if existing[0] and existing[0][0].payload.get('content_hash') == content_hash:
return 0 # Nessun aggiornamento necessario
# Rimuovi vecchi chunks del documento
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
)
)
# Crea nuovi chunks
words = text.split()
chunk_words = chunk_size // 5 # ~5 chars/word
chunks = [
' '.join(words[i:i+chunk_words])
for i in range(0, len(words), chunk_words - 10) # 10 word overlap
if words[i:i+chunk_words]
]
if not chunks:
return 0
# Genera embeddings
embeddings = self.model.encode(chunks, normalize_embeddings=True)
# Crea points
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point_id = int(hashlib.md5(f"{doc_id}_{i}".encode()).hexdigest()[:8], 16)
points.append(PointStruct(
id=point_id,
vector=embedding.tolist(),
payload={
**metadata,
"doc_id": doc_id,
"chunk_index": i,
"content_hash": content_hash,
"text": chunk,
"updated_at": time.time()
}
))
# Upsert in Qdrant
self.client.upsert(
collection_name=self.collection,
points=points
)
return len(chunks)
def delete_document(self, doc_id: str):
"""Rimuove tutti i chunks di un documento"""
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
)
)
def get_corpus_stats(self) -> Dict:
"""Statistiche sul corpus"""
info = self.client.get_collection(self.collection)
return {
"total_chunks": info.points_count,
"index_status": info.status,
"vectors_config": str(info.config.params.vectors)
}
8. Beste praktijken en antipatronen in de productie
RAG-productieklare checklist
- Chunking: gebruik tokens van chunkgrootte 400-600 met een overlap van 10-15%; test verschillende strategieën op uw specifieke corpus
- Herrangschikking: implementeer altijd een cross-encoder voor zoekopdrachten waarbij precisie van cruciaal belang is; accepteert extra latentie (100-300 ms)
- Caching: semantische cache met drempelwaarde 0,92-0,97 verlaagt LLM-kosten met 30-60% op FAQ-achtig corpus
- Toezicht: volg de betrouwbaarheid, antwoordrelevantie, ophaallatentie en LLM-tokenkosten per zoekopdracht
- Evaluatie: onderhoud een gouden testset (100-200 vragen met grondwaarheid) en evalueer bij elke implementatie
- Terugval: als bij het ophalen niets relevants wordt gevonden (topscore < 0,5), verklaar dan "Ik weet het niet" in plaats van te hallucineren
- Versiebeheer: versie-insluitingssjablonen en opnieuw indexeren wanneer u wijzigt; houd indexen parallel tijdens de migratie
Antipatronen die u tijdens de productie moet vermijden
- Geen kwaliteitsbewaking: Alleen latentie en uptime monitoren is niet voldoende. De RAG kan technisch gezien "werken", maar geeft verkeerde antwoorden.
- Corpus niet bijgewerkt: een RAG-systeem op verouderde documentatie is erger dan geen RAG: het reageert vol vertrouwen op onjuiste informatie.
- Vaste top-k: Pas het aantal opgehaalde chunks aan aan de lengte van de query. Complexe zoekopdrachten vereisen meer context.
- Insluitingslatentie negeren: Het genereren van de insluiting van een query duurt 10-50 ms. Vermenigvuldig dit met 1000 req/s en dit wordt het knelpunt.
- LLM als absolute rechter: het generatieve model kan zelfs met RAG uit zijn context "hallucineren". Implementeer vangrails voor reacties.
Conclusies
Om een RAG-systeem in productie te nemen is veel meer nodig dan alleen een pijpleiding opeenvolgend. We hebben gezien hoe productieklare architectuur plannen scheidt opname en vragen, hoe geavanceerde chunking rechtstreeks van invloed is op de kwaliteit, hoe cross-encoder herrangschikking de nauwkeurigheid van het ophalen verbetert, en hoe Met monitoring met RAGAS kunt u de kwaliteit in de loop van de tijd volgen.
De belangrijkste punten:
- Scheid altijd het opname- en querypad: ze hebben radicaal verschillende vereisten
- Investeer in chunking: het is de meest impactvolle en vaak over het hoofd geziene variabele
- Implementeer cross-encoder-herschikking voor gebruiksscenario's waarbij nauwkeurigheid van cruciaal belang is
- Gebruik semantische caching om de kosten en latentie bij repetitieve zoekopdrachten te verminderen
- Meet de betrouwbaarheid en beantwoord de relevantie, niet alleen latentie en uptime
- Onderhoud een gouden testset en evalueer deze bij elke implementatie met RAGAS
In het volgende artikel zullen we het verkennen LangChain voor RAG: het raamwerk meest populair voor het bouwen van LLM-applicaties, met een focus op geavanceerde patronen zoals conversationele RAG, multi-hop ophalen en toolcalling.
De serie gaat verder
- Artikel 1: RAG uitgelegd - Grondbeginselen
- Artikel 2: Inbedding en semantisch zoeken
- Artikel 3: Vectordatabase - Qdrant versus Pinecone
- Artikel 4: Hybride ophalen: BM25 + Vector
- Artikel 5: RAG in productie (huidig)
- Artikel 6: LangChain voor RAG
Leer meer met: pgvector voor RAG op PostgreSQL e MLOps: Model Serving in productie.







