프로덕션 중인 RAG: 아키텍처, 확장 및 모니터링
로컬에서 작동하는 RAG 프로토타입을 구축하는 것은 비교적 간단합니다. 가져오세요 수천 개의 동시 쿼리를 처리해야 하는 프로덕션에서는 2분 이내에 응답합니다. 초, 시간이 지나도 고품질을 유지하고 데이터 손실이 없는 완전히 다른 것입니다. "내 노트북에서 작동"과 "10,000명의 사용자를 위한 프로덕션에서 작동" 사이의 거리 규모가 크며 많은 RAG 프로젝트가 바로 이 단계에서 실패합니다.
이 기사에서는 실제 과제를 다룹니다. 생산 중인 RAG: 확장 가능한 아키텍처, 최적의 청크, 순위 재지정, 업데이트 관리 RAG 관련 측정항목으로 모니터링하고 자동 평가를 수행합니다. RAGAS와 같은 프레임워크를 사용하면 품질이 향상됩니다. 이론에 관한 것이 아닙니다. 각 섹션에는 다음이 포함됩니다. 실제 시스템에서 테스트된 실행 가능한 Python 코드 및 패턴입니다.
무엇을 배울 것인가
- 확장 가능한 RAG 시스템의 프로덕션 지원 아키텍처
- 고급 청킹 전략(재귀적, 의미론적, 문장 창)
- 정확도 향상을 위한 크로스 인코더 재순위 파이프라인
- 증분 벡터 말뭉치 업데이트 관리
- RAG 관련 측정항목(충실성, 관련성, 재현율)을 사용한 모니터링
- RAGAS 프레임워크를 사용한 자동 평가
- 지연 시간과 비용을 최적화하는 지능형 캐싱
- 프로덕션의 오류 처리 및 정상적인 성능 저하
시리즈 개요
| # | Articolo | 집중하다 |
|---|---|---|
| 1 | RAG 설명 | 기초와 건축 |
| 2 | 임베딩 및 의미 검색 | 버트, SBERT, FAISS |
| 3 | 벡터 데이터베이스 | Qdrant, 솔방울, Milvus |
| 4 | 하이브리드 검색 | BM25 + 벡터 검색 |
| 5 | 생산 중인 RAG(현재 위치) | 확장, 모니터링, 평가 |
| 6 | RAG용 LangChain | 고급 프레임워크 및 패턴 |
| 7 | 컨텍스트 창 관리 | LLM 입력 최적화 |
| 8 | 다중 에이전트 시스템 | 오케스트레이션 및 조정 |
| 9 | 생산 현장의 신속한 엔지니어링 | 템플릿, 버전 관리, 테스트 |
| 10 | AI를 위한 지식 그래프 | LLM의 구조화된 지식 |
1. 생산 준비가 완료된 RAG 시스템의 아키텍처
생산 중인 RAG 시스템은 단순한 순차 파이프라인이 아닙니다. 각각의 확장성 요구 사항을 충족하는 특수 구성 요소와 함께 배포됩니다. 다양한 내결함성 및 모니터링. 전체 아키텍처와 프로덕션에서 지속 가능한 무언가를 구축하는 첫 번째 단계입니다.
ARCHITETTURA RAG PRODUCTION
┌─────────────────────────────────────────────────────────┐
│ API GATEWAY │
│ (Rate limiting, auth, routing) │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
┌──────▼──────┐ ┌───────▼────────┐
│ QUERY │ │ INGESTION │
│ SERVICE │ │ SERVICE │
└──────┬──────┘ └───────┬────────┘
│ │
┌───────▼──────┐ ┌───────▼────────┐
│ RETRIEVAL │ │ DOCUMENT │
│ ENGINE │ │ PROCESSOR │
│ ┌─────────┐ │ │ ┌──────────┐ │
│ │Embedding│ │ │ │Chunking │ │
│ │Cache │ │ │ │Embedding │ │
│ └────┬────┘ │ │ │Indexing │ │
│ │ │ │ └──────────┘ │
│ ┌────▼────┐ │ └───────┬────────┘
│ │Vector │ │ │
│ │Search │ │ ┌───────▼────────┐
│ └────┬────┘ │ │ VECTOR DB │
│ │ │ │ (Qdrant/Pine) │
│ ┌────▼────┐ │ └────────────────┘
│ │Reranker │ │
│ └────┬────┘ │
└───────┼──────┘
│
┌───────▼──────┐ ┌────────────────┐
│ GENERATION │ │ CACHE │
│ SERVICE │◄──►│ (Redis/ │
│ (LLM) │ │ Semantic) │
└───────┬──────┘ └────────────────┘
│
┌───────▼──────┐ ┌────────────────┐
│ MONITORING │ │ EVALUATION │
│ SERVICE │ │ SERVICE │
│ (Prometheus)│ │ (RAGAS) │
└──────────────┘ └────────────────┘
1.1 우려 사항의 분리: 수집과 쿼리
RAG 생산 시스템의 기본 패턴은 사이의 분리 수집 계획 및 쿼리 계획. 이 두 경로에는 요구 사항이 있습니다. 매우 다르다:
수집과 쿼리: 다양한 요구 사항
| 크기 | 수집 경로 | 쿼리 경로 |
|---|---|---|
| 숨어 있음 | 중요하지 않음(일괄) | 비판(<2s p95) |
| 처리량 | 낮음(문서) | 높음(천 요청/초) |
| CPU/GPU | 임베딩 생성(GPU) | 쿼리 포함 + 순위 재지정(GPU) |
| 오류 | 백오프로 재시도 | 대체 우아함 |
| 스케일링 | 수평 배치 | 수평적 무상태 |
2. 고급 청킹 전략
청킹은 아마도 RAG 시스템에서 가장 간과되는 변수일 것입니다. 최종 품질에 큰 영향을 미칩니다. 너무 작은 청크는 컨텍스트를 잃습니다. 하나 너무 크면 노이즈가 발생하고 임베딩 모델의 컨텍스트를 초과합니다.
2.1 재귀 문자 텍스트 분할기
대부분의 사용 사례에 가장 효과적인 방법은 재귀 문자 분할: 구분 기호의 텍스트를 점진적으로 나눕니다. 문서의 자연스러운 구조를 존중하기 위해 더 세밀한(단락, 문장, 단어) 작업을 수행합니다.
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict, Any
import re
class AdvancedChunker:
"""Chunker avanzato con metadati e strategie multiple"""
def __init__(
self,
chunk_size: int = 512,
chunk_overlap: int = 64,
strategy: str = "recursive"
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.strategy = strategy
self.recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", " ", ""],
length_function=len,
is_separator_regex=False
)
def chunk_with_metadata(
self,
text: str,
doc_metadata: Dict[str, Any]
) -> List[Dict]:
"""Crea chunks con metadati completi per il retrieval"""
if self.strategy == "recursive":
chunks = self.recursive_splitter.split_text(text)
elif self.strategy == "semantic":
chunks = self._semantic_split(text)
elif self.strategy == "sentence_window":
chunks = self._sentence_window_split(text)
else:
raise ValueError(f"Strategia non supportata: {self.strategy}")
result = []
for i, chunk in enumerate(chunks):
chunk_meta = {
**doc_metadata,
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size": len(chunk),
"strategy": self.strategy,
# Snippet per il contesto nella risposta
"prev_chunk": chunks[i-1][:100] if i > 0 else None,
"next_chunk": chunks[i+1][:100] if i < len(chunks)-1 else None,
}
result.append({"text": chunk, "metadata": chunk_meta})
return result
def _sentence_window_split(
self,
text: str,
window_size: int = 3
) -> List[str]:
"""
Sentence Window: indicizza singole frasi ma recupera
le frasi vicine per mantenere il contesto.
Tecnica avanzata che migliora il recall mantenendo la precisione.
"""
# Split in frasi
sentences = re.split(r'(?<=[.!?])\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks = []
for i, sentence in enumerate(sentences):
# Finestra di contesto: frase centrale + vicine
start = max(0, i - window_size // 2)
end = min(len(sentences), i + window_size // 2 + 1)
window = " ".join(sentences[start:end])
chunks.append(window)
return chunks
def _semantic_split(self, text: str) -> List[str]:
"""
Semantic split: divide in base ai cambi di topic
usando embeddings per individuare boundary semantici.
"""
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
sentences = re.split(r'(?<=[.!?])\s+', text)
if len(sentences) < 3:
return [text]
# Calcola embeddings di ogni frase
embeddings = model.encode(sentences)
# Calcola similarità tra frasi consecutive
similarities = []
for i in range(len(embeddings) - 1):
sim = np.dot(embeddings[i], embeddings[i+1]) / (
np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i+1])
)
similarities.append(sim)
# Trova boundary semantici (bassa similarità = cambio di topic)
threshold = np.mean(similarities) - np.std(similarities)
boundaries = [0]
for i, sim in enumerate(similarities):
if sim < threshold:
boundaries.append(i + 1)
boundaries.append(len(sentences))
# Crea chunks dai segmenti
chunks = []
for i in range(len(boundaries) - 1):
segment = " ".join(sentences[boundaries[i]:boundaries[i+1]])
if len(segment) > self.chunk_size * 2:
# Chunk troppo grande: dividi ulteriormente
sub_chunks = self.recursive_splitter.split_text(segment)
chunks.extend(sub_chunks)
elif segment.strip():
chunks.append(segment)
return chunks
# Esempio di utilizzo
chunker = AdvancedChunker(chunk_size=512, chunk_overlap=64, strategy="recursive")
document = """
Cos'è il RAG?
Il Retrieval-Augmented Generation (RAG) è un'architettura che combina la ricerca su basi
di conoscenza con la generazione di testo degli LLM. Il principio fondamentale è semplice:
invece di lasciare che l'LLM risponda solo con la sua conoscenza interna (soggetta ad
allucinazioni), RAG prima recupera documenti rilevanti e poi li fornisce come contesto.
Come funziona il retrieval?
Il processo di retrieval avviene in due fasi. Prima, i documenti vengono convertiti in
embeddings vettoriali e memorizzati in un vector database. Poi, la query dell'utente viene
anch'essa convertita in un embedding e viene effettuata una ricerca di similarità per
trovare i documenti più rilevanti.
"""
chunks = chunker.chunk_with_metadata(
document,
doc_metadata={"source": "rag-intro.txt", "author": "AI Team"}
)
for chunk in chunks:
print(f"Chunk {chunk['metadata']['chunk_index']}: {len(chunk['text'])} chars")
print(f" {chunk['text'][:100]}...")
2.2 부모-자식 청킹
고급 RAG 시스템의 가장 효과적인 전략 중 하나는 부모-자식 청킹: 작은 청크(하위)가 인덱싱됩니다. 검색 정밀도는 높지만 제공하기 위해 큰 청크(상위)가 반환됩니다. LLM에 대한 충분한 맥락.
from langchain.storage import InMemoryStore
from langchain.retrievers import ParentDocumentRetriever
from langchain.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Splitter per chunks grandi (parent) - per il contesto
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
# Splitter per chunks piccoli (child) - per l'indicizzazione
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40)
# Storage: chunks grandi in memory store, chunks piccoli in vector store
vectorstore = Qdrant.from_documents(
[], # inizialmente vuoto
OpenAIEmbeddings(),
url="http://localhost:6333",
collection_name="rag_child_chunks"
)
docstore = InMemoryStore() # o Redis per persistenza
# Parent-Child Retriever
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=docstore,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# Come funziona:
# 1. add_documents() divide i documenti in parent e child chunks
# 2. I child chunks vengono indicizzati nel vector store
# 3. I parent chunks vengono salvati nel docstore con un ID univoco
# 4. I child chunks hanno un metadato "doc_id" che punta al parent
# Retrieval:
# 1. Cerca i child chunks più simili alla query
# 2. Recupera i parent chunks corrispondenti
# Risultato: massima precisione (child) + massimo contesto (parent)
3. 순위 재지정: 검색 정확도 향상
Il 순위 재지정 품질을 대폭 향상시키는 기술입니다 결과에 더 정확한(그러나 느린) 두 번째 모델을 적용하여 검색 이니셜. 일반적인 흐름은 다음과 같습니다. 빠른 벡터 검색으로 50-100명의 후보 검색, 그 다음에 정확한 크로스 인코더로 재정렬, 결국 탑케이를 얻으세요.
3.1 바이-인코더와 크로스-인코더
두 접근 방식의 차이점은 근본적입니다.
- 이중 인코더(검색): 쿼리와 문서를 별도로 벡터로 인코딩합니다. 문서가 미리 계산되어 검색이 매우 빠르지만, 문서를 인코딩하는 동안 쿼리를 볼 수 없기 때문에 정확도가 떨어집니다.
- 크로스 인코더(재순위): 쿼리와 문서를 연결하여 관련성 점수를 생성합니다. 훨씬 더 정확하지만 각 쿼리-문서 쌍을 실시간으로 처리해야 하기 때문에 검색만큼 확장성이 없습니다.
from sentence_transformers import CrossEncoder
from typing import List, Tuple
import time
class RerankingRetriever:
"""Retriever con reranking a due stadi"""
def __init__(
self,
bi_encoder, # Sentence Transformer per retrieval veloce
vector_index, # FAISS o vector DB
cross_encoder_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
):
self.bi_encoder = bi_encoder
self.index = vector_index
self.cross_encoder = CrossEncoder(cross_encoder_name)
self.documents = []
def retrieve_and_rerank(
self,
query: str,
initial_k: int = 50,
final_k: int = 5
) -> List[Tuple[str, float]]:
"""
Pipeline completa:
1. Retrieval veloce con bi-encoder (top-50)
2. Reranking preciso con cross-encoder (top-5)
"""
t0 = time.time()
# STAGE 1: Retrieval veloce
query_emb = self.bi_encoder.encode([query], normalize_embeddings=True)
scores, indices = self.index.search(query_emb.astype('float32'), initial_k)
candidates = [
(self.documents[i], float(s))
for s, i in zip(scores[0], indices[0])
if i != -1
]
t1 = time.time()
print(f"Retrieval: {(t1-t0)*1000:.1f}ms - {len(candidates)} candidati")
# STAGE 2: Reranking preciso con cross-encoder
if not candidates:
return []
# Prepara coppie (query, documento) per il cross-encoder
cross_pairs = [(query, doc) for doc, _ in candidates]
cross_scores = self.cross_encoder.predict(cross_pairs)
# Combina e riordina
reranked = sorted(
zip([doc for doc, _ in candidates], cross_scores),
key=lambda x: x[1],
reverse=True
)
t2 = time.time()
print(f"Reranking: {(t2-t1)*1000:.1f}ms")
return reranked[:final_k]
def reciprocal_rank_fusion(
self,
results_list: List[List[Tuple[str, float]]],
k: int = 60
) -> List[Tuple[str, float]]:
"""
Reciprocal Rank Fusion: combina risultati da multiple strategie
di retrieval (es. dense + sparse) in un singolo ranking.
Formula: score = sum(1 / (k + rank_i)) per ogni lista i
"""
doc_scores = {}
for results in results_list:
for rank, (doc, _) in enumerate(results):
if doc not in doc_scores:
doc_scores[doc] = 0.0
doc_scores[doc] += 1.0 / (k + rank + 1)
return sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
순위 재지정을 위한 크로스 인코더 모델
| 모델 | 속도 | 품질 | 권장 용도 |
|---|---|---|---|
| 크로스 인코더/ms-marco-MiniLM-L-6-v2 | 높은 | 좋은 | 생산, 지연 시간 중요 |
| 크로스 인코더/ms-marco-electra-base | 평균 | 훌륭한 | 좋은 균형 |
| BAAI/bge-reranker-대형 | 낮은 | 훌륭한 | 최대 품질, 중요하지 않은 지연 시간 |
| Cohere 재순위 API | 아피스 | 훌륭한 | 프로토타입 제작, 예산 사용 가능 |
4. 지연 시간 및 비용을 위한 지능형 캐싱
프로덕션 RAG 시스템에서는 많은 비율의 쿼리가 유사하거나 동일합니다(예: FAQ, 자주 묻는 질문). 그만큼 의미론적 캐싱 더 나아간다 정확한 캐시(동일한 쿼리에서만 캐시 적중) 및 쿼리당 결과 재사용 의미상 유사하여 LLM 추론 비용을 대폭 절감합니다.
import redis
import numpy as np
import json
import hashlib
from sentence_transformers import SentenceTransformer
from typing import Optional, Tuple
import time
class SemanticCache:
"""Cache semantica che riusa risposte per query simili"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
embedding_model: str = "all-MiniLM-L6-v2",
similarity_threshold: float = 0.95,
ttl_seconds: int = 3600 # 1 ora
):
self.redis = redis.from_url(redis_url)
self.model = SentenceTransformer(embedding_model)
self.threshold = similarity_threshold
self.ttl = ttl_seconds
def _get_cache_key(self, text: str) -> str:
return f"rag_cache:{hashlib.md5(text.encode()).hexdigest()}"
def get(self, query: str) -> Optional[Tuple[str, float]]:
"""
Cerca in cache:
1. Prima cerca match esatto (O(1))
2. Poi cerca match semantico (O(n) - ottimizzabile con FAISS)
Ritorna (risposta, similarity_score) o None
"""
# Cache esatto
exact_key = self._get_cache_key(query)
cached = self.redis.get(exact_key)
if cached:
data = json.loads(cached)
return data['response'], 1.0
# Cache semantico
query_emb = self.model.encode([query], normalize_embeddings=True)[0]
# Ottimizzazione: usa Redis SCAN per iterare sulle chiavi cached
# In produzione, usa un indice FAISS separato per il lookup semantico
best_score = 0.0
best_response = None
for key in self.redis.scan_iter("rag_cache:*"):
cached = self.redis.get(key)
if not cached:
continue
data = json.loads(cached)
cached_emb = np.array(data['embedding'])
sim = float(np.dot(query_emb, cached_emb))
if sim > best_score:
best_score = sim
best_response = data['response']
if best_score >= self.threshold:
return best_response, best_score
return None
def set(self, query: str, response: str):
"""Salva la risposta in cache con l'embedding della query"""
query_emb = self.model.encode([query], normalize_embeddings=True)[0]
key = self._get_cache_key(query)
data = {
'query': query,
'response': response,
'embedding': query_emb.tolist(),
'timestamp': time.time()
}
self.redis.setex(key, self.ttl, json.dumps(data))
def get_stats(self) -> dict:
"""Statistiche del cache"""
keys = list(self.redis.scan_iter("rag_cache:*"))
return {
'total_cached': len(keys),
'memory_bytes': sum(
self.redis.memory_usage(k) or 0 for k in keys[:100]
)
}
# Integrazione nel RAG pipeline
class CachedRAGPipeline:
def __init__(self, rag_pipeline, cache: SemanticCache):
self.rag = rag_pipeline
self.cache = cache
self.hits = 0
self.misses = 0
def query(self, question: str) -> dict:
# 1. Prova cache
cached = self.cache.get(question)
if cached:
response, sim = cached
self.hits += 1
return {
"answer": response,
"cached": True,
"similarity": sim,
"latency_ms": 1 # quasi istantaneo
}
# 2. Full RAG pipeline
t0 = time.time()
response = self.rag.generate(question)
latency = (time.time() - t0) * 1000
# 3. Salva in cache per future query simili
self.cache.set(question, response)
self.misses += 1
return {
"answer": response,
"cached": False,
"similarity": None,
"latency_ms": latency
}
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
5. 모니터링 및 관찰 가능성
생산 중인 RAG 시스템은 모든 수준에서 관찰 가능해야 합니다. 충분하지 않다 HTTP 대기 시간 모니터링: 검색 품질, 속도를 측정해야 합니다. 환각, 사용자 만족도 및 쿼리당 비용.
5.1 인프라 지표
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps
# Metriche infrastrutturali
rag_queries_total = Counter(
'rag_queries_total',
'Numero totale di query RAG',
['status', 'cached']
)
rag_query_duration = Histogram(
'rag_query_duration_seconds',
'Durata delle query RAG',
['component'],
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
)
rag_retrieval_chunks = Histogram(
'rag_retrieval_chunks',
'Numero di chunks recuperati per query',
buckets=[1, 3, 5, 10, 20, 50]
)
rag_retrieval_score = Histogram(
'rag_retrieval_score',
'Score di rilevanza del top-1 chunk',
buckets=[0.1, 0.3, 0.5, 0.7, 0.8, 0.9, 0.95, 1.0]
)
rag_cache_hit_rate = Gauge(
'rag_cache_hit_rate',
'Tasso di hit del semantic cache'
)
rag_llm_tokens_total = Counter(
'rag_llm_tokens_total',
'Totale token LLM utilizzati',
['type'] # 'prompt' o 'completion'
)
# Metriche qualità (aggiornate da evaluation service)
rag_faithfulness_score = Gauge(
'rag_faithfulness_score',
'Score medio di faithfulness (risposta supportata da contesto)'
)
rag_answer_relevance = Gauge(
'rag_answer_relevance',
'Score medio di rilevanza della risposta'
)
# Decorator per timing automatico
def track_timing(component: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
rag_queries_total.labels(status='success', cached='false').inc()
return result
except Exception as e:
rag_queries_total.labels(status='error', cached='false').inc()
raise
finally:
duration = time.time() - start
rag_query_duration.labels(component=component).observe(duration)
return wrapper
return decorator
# Esempio utilizzo
class MonitoredRAGService:
@track_timing('full_query')
def query(self, question: str) -> dict:
# Retrieval
with rag_query_duration.labels(component='retrieval').time():
chunks = self.retrieve(question)
rag_retrieval_chunks.observe(len(chunks))
if chunks:
rag_retrieval_score.observe(chunks[0][1]) # score top-1
# Generation
with rag_query_duration.labels(component='generation').time():
response = self.generate(question, chunks)
return response
5.2 RAG에 대한 구조적 로깅
import structlog
import uuid
from contextlib import contextmanager
# Configura structlog per output JSON
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_log_level,
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
class TracedRAGPipeline:
"""RAG pipeline con tracing distribuito"""
def query(self, question: str, user_id: str = None) -> dict:
trace_id = str(uuid.uuid4())
log = logger.bind(
trace_id=trace_id,
user_id=user_id,
question_hash=hashlib.md5(question.encode()).hexdigest()[:8]
)
log.info("rag_query_start", question_length=len(question))
# Retrieval
t0 = time.time()
try:
chunks = self.retrieve(question)
retrieval_time = time.time() - t0
log.info(
"rag_retrieval_complete",
chunks_retrieved=len(chunks),
top_score=chunks[0][1] if chunks else 0,
retrieval_ms=retrieval_time * 1000
)
except Exception as e:
log.error("rag_retrieval_error", error=str(e))
raise
# Generation
t1 = time.time()
try:
response = self.generate(question, chunks)
generation_time = time.time() - t1
log.info(
"rag_generation_complete",
response_length=len(response),
generation_ms=generation_time * 1000,
total_ms=(time.time() - t0) * 1000
)
except Exception as e:
log.error("rag_generation_error", error=str(e))
raise
return {
"answer": response,
"trace_id": trace_id,
"chunks_used": len(chunks),
"total_ms": (time.time() - t0) * 1000
}
6. RAGAS를 통한 자동 평가
라가스 (RAG 평가)는 평가를 위한 가장 널리 사용되는 프레임워크입니다. 자동 RAG 시스템. 이는 품질의 네 가지 기본 차원을 측정합니다.
- 충실: 검색된 컨텍스트에서 응답이 지원됩니까? (환각 방지)
- 답변 관련성: 답변이 질문과 관련이 있나요?
- 상황 회상: 검색된 컨텍스트에 필요한 정보가 포함되어 있습니까?
- 상황 정확도: 복구된 청크는 모두 관련이 있나요?
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_recall,
context_precision,
answer_correctness
)
from datasets import Dataset
from typing import List, Dict
import pandas as pd
class RAGEvaluator:
"""Valutazione automatica sistema RAG con RAGAS"""
def __init__(self, rag_pipeline):
self.rag = rag_pipeline
def create_evaluation_dataset(
self,
test_questions: List[str],
ground_truths: List[str]
) -> Dataset:
"""
Crea dataset di valutazione processando le domande con il RAG.
Ogni row contiene: domanda, risposta, contesti recuperati, verita.
"""
questions = []
answers = []
contexts = []
for question, gt in zip(test_questions, ground_truths):
# Recupera contesti
chunks = self.rag.retrieve(question, top_k=5)
context_list = [chunk for chunk, _ in chunks]
# Genera risposta
answer = self.rag.generate(question)
questions.append(question)
answers.append(answer)
contexts.append(context_list)
return Dataset.from_dict({
"question": questions,
"answer": answers,
"contexts": contexts,
"ground_truth": ground_truths
})
def evaluate_pipeline(
self,
test_questions: List[str],
ground_truths: List[str]
) -> pd.DataFrame:
"""
Valuta il pipeline RAG e ritorna un report dettagliato.
"""
dataset = self.create_evaluation_dataset(test_questions, ground_truths)
results = evaluate(
dataset,
metrics=[
faithfulness, # 0-1: risposta supportata da contesto?
answer_relevancy, # 0-1: risposta pertinente alla domanda?
context_recall, # 0-1: contesto copre la ground truth?
context_precision, # 0-1: contesto è tutto pertinente?
answer_correctness # 0-1: risposta corretta vs ground truth?
]
)
# Report
df = results.to_pandas()
print("\n=== RAGAS Evaluation Report ===")
print(f"Faithfulness: {df['faithfulness'].mean():.3f}")
print(f"Answer Relevancy: {df['answer_relevancy'].mean():.3f}")
print(f"Context Recall: {df['context_recall'].mean():.3f}")
print(f"Context Precision:{df['context_precision'].mean():.3f}")
print(f"Answer Correct.: {df['answer_correctness'].mean():.3f}")
# Identifica casi problematici
low_faith = df[df['faithfulness'] < 0.5]
if len(low_faith) > 0:
print(f"\nATTENZIONE: {len(low_faith)} domande con faithfulness bassa:")
for _, row in low_faith.iterrows():
print(f" Q: {row['question'][:80]}...")
return df
def continuous_evaluation(self, sample_rate: float = 0.05):
"""
Valutazione continua in produzione: campiona il 5% delle query
e le valuta automaticamente per rilevare degradazione.
"""
import random
def evaluate_sample(question: str, answer: str, contexts: List[str]):
if random.random() > sample_rate:
return
# Stima qualità senza ground truth usando LLM-as-judge
from openai import OpenAI
client = OpenAI()
judge_prompt = f"""Valuta la qualità di questa risposta RAG.
Domanda: {question}
Contesti recuperati: {" | ".join(contexts[:2])}
Risposta generata: {answer}
Valuta su scala 1-5:
1. La risposta è supportata dai contesti? (faithfulness)
2. La risposta è pertinente alla domanda? (relevance)
Rispondi SOLO con JSON: {"faithfulness": X, "relevance": X}"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": judge_prompt}],
temperature=0
)
try:
import json
scores = json.loads(response.choices[0].message.content)
# Aggiorna metriche Prometheus
rag_faithfulness_score.set(scores['faithfulness'] / 5.0)
rag_answer_relevance.set(scores['relevance'] / 5.0)
except:
pass
return evaluate_sample
# Test set di esempio
test_questions = [
"Cos'è il RAG e come funziona?",
"Qual è la differenza tra BERT e Sentence Transformers?",
"Come si sceglie un vector database per la produzione?"
]
ground_truths = [
"RAG (Retrieval-Augmented Generation) combina la ricerca su basi di conoscenza con la generazione LLM per ridurre le allucinazioni.",
"BERT produce embeddings contestuali per token, mentre Sentence Transformers è ottimizzato per produrre embeddings a livello di frase per la similarity search.",
"La scelta dipende da scala, requisiti di latenza, budget e se serve hosting gestito o self-hosted."
]
7. 코퍼스 업데이트 관리
프로덕션 중인 RAG 시스템은 시간이 지남에 따라 변경되는 코퍼스(새 문서)를 관리해야 합니다. 추가됨, 오래된 문서는 제거됨, 기존 문서는 삭제됨 업데이트되었습니다. 이것이 문제이다 말뭉치 관리.
from qdrant_client import QdrantClient
from qdrant_client.models import (
PointStruct, UpdateStatus, Filter, FieldCondition, MatchValue
)
from sentence_transformers import SentenceTransformer
import hashlib
import time
from typing import List, Dict, Optional
class IncrementalRAGIndex:
"""Gestione aggiornamenti incrementali del corpus RAG"""
def __init__(
self,
collection_name: str = "rag_corpus",
embedding_model: str = "all-MiniLM-L6-v2"
):
self.client = QdrantClient(url="http://localhost:6333")
self.model = SentenceTransformer(embedding_model)
self.collection = collection_name
def _doc_hash(self, text: str) -> str:
"""Hash deterministico per deduplicazione"""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def upsert_document(
self,
doc_id: str,
text: str,
metadata: Dict,
chunk_size: int = 512
) -> int:
"""
Upsert (insert o update) di un documento.
Se il documento è già presente e non modificato, skippa.
"""
# Calcola hash per deduplication
content_hash = self._doc_hash(text)
# Controlla se già presente e non modificato
existing = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
),
limit=1
)
if existing[0] and existing[0][0].payload.get('content_hash') == content_hash:
return 0 # Nessun aggiornamento necessario
# Rimuovi vecchi chunks del documento
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
)
)
# Crea nuovi chunks
words = text.split()
chunk_words = chunk_size // 5 # ~5 chars/word
chunks = [
' '.join(words[i:i+chunk_words])
for i in range(0, len(words), chunk_words - 10) # 10 word overlap
if words[i:i+chunk_words]
]
if not chunks:
return 0
# Genera embeddings
embeddings = self.model.encode(chunks, normalize_embeddings=True)
# Crea points
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point_id = int(hashlib.md5(f"{doc_id}_{i}".encode()).hexdigest()[:8], 16)
points.append(PointStruct(
id=point_id,
vector=embedding.tolist(),
payload={
**metadata,
"doc_id": doc_id,
"chunk_index": i,
"content_hash": content_hash,
"text": chunk,
"updated_at": time.time()
}
))
# Upsert in Qdrant
self.client.upsert(
collection_name=self.collection,
points=points
)
return len(chunks)
def delete_document(self, doc_id: str):
"""Rimuove tutti i chunks di un documento"""
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
)
)
def get_corpus_stats(self) -> Dict:
"""Statistiche sul corpus"""
info = self.client.get_collection(self.collection)
return {
"total_chunks": info.points_count,
"index_status": info.status,
"vectors_config": str(info.config.params.vectors)
}
8. 프로덕션의 모범 사례 및 안티 패턴
RAG 생산 준비 체크리스트
- 청킹: 10-15% 중복되는 청크 크기 400-600 토큰을 사용합니다. 특정 말뭉치에 대해 다양한 전략을 테스트하세요.
- 순위 재지정: 정밀도가 중요한 쿼리에는 항상 크로스 인코더를 구현하세요. 추가 대기 시간(100-300ms) 허용
- 캐싱: 임계값 0.92-0.97의 의미론적 캐시는 FAQ와 유사한 코퍼스에서 LLM 비용을 30-60% 줄입니다.
- 모니터링: 충실도 추적, 답변 관련성, 검색 대기 시간, 쿼리당 LLM 토큰 비용
- 평가: 골든 테스트 세트(실제 정보가 포함된 100~200개의 질문)를 유지하고 각 배포 시 평가합니다.
- 대체: 검색 결과 관련성이 없는 경우(최고 점수 < 0.5) 환각 대신 "모르겠어요"라고 선언합니다.
- 버전 관리: 버전 포함 템플릿 및 변경 시 다시 색인화 마이그레이션 중에 인덱스를 병렬로 유지
프로덕션에서 피해야 할 안티패턴
- 품질 모니터링 없음: 대기 시간과 가동 시간만 모니터링하는 것만으로는 충분하지 않습니다. RAG는 기술적으로 "작동"할 수 있지만 잘못된 답변을 제공합니다.
- 코퍼스가 업데이트되지 않았습니다. 오래된 문서에 대한 RAG 시스템은 RAG가 없는 것보다 나쁩니다. 잘못된 정보에 자신있게 대응합니다.
- 고정된 top-k: 검색된 청크 수를 쿼리 길이에 맞게 조정합니다. 복잡한 쿼리에는 더 많은 컨텍스트가 필요합니다.
- 임베딩 대기 시간 무시: 쿼리 임베딩을 생성하는 데 10~50ms가 걸립니다. 1000 req/s를 곱하면 병목 현상이 발생합니다.
- 절대 판사로서의 LLM: 생성 모델은 RAG를 사용하더라도 맥락에서 벗어나 "환각"할 수 있습니다. 대응을 위한 가드레일을 구현합니다.
결론
RAG 시스템을 프로덕션 환경으로 가져오려면 파이프라인 그 이상이 필요합니다. 순차적. 우리는 프로덕션 지원 아키텍처가 어떻게 계획을 분리하는지 살펴보았습니다. 수집 및 쿼리, 고급 청킹이 품질에 직접적인 영향을 미치는 방식, 크로스 인코더 재순위가 검색 정확도를 향상시키는 방법 및 방법 RAGAS를 사용하여 모니터링하면 시간 경과에 따른 품질을 추적할 수 있습니다.
핵심 포인트:
- 항상 별도의 수집 경로와 쿼리 경로 - 요구 사항이 근본적으로 다릅니다.
- 청킹에 투자하세요. 가장 영향력이 크지만 종종 간과되는 변수입니다.
- 정확성이 중요한 사용 사례를 위해 크로스 인코더 재순위 구현
- 의미론적 캐싱을 사용하여 반복 쿼리의 비용과 대기 시간을 줄입니다.
- 대기 시간과 가동 시간뿐만 아니라 충실도를 측정하고 관련성에 대한 답변을 제공합니다.
- RAGAS를 사용하여 골든 테스트 세트를 유지하고 각 배포에서 평가합니다.
다음 기사에서는 살펴보겠습니다. RAG용 LangChain: 프레임워크 다음과 같은 고급 패턴에 중점을 두고 LLM 애플리케이션을 구축하는 데 가장 널리 사용됩니다. 대화형 RAG, 다중 홉 검색 및 도구 호출.
시리즈는 계속됩니다
- 기사 1: RAG 설명 - 기본 사항
- 기사 2: 임베딩 및 의미 검색
- 기사 3: 벡터 데이터베이스 - Qdrant 대 Pinecone
- 기사 4: 하이브리드 검색: BM25 + 벡터
- 조항 5: 생산 중인 RAG(현재)
- 기사 6: RAG용 LangChain
자세히 알아보기: RAG용 pg벡터 PostgreSQL에서 e MLOps: 모델 제공 생산 중.







