PostgreSQL을 사용한 RAG: 문서에서 응답까지
AI 시스템이 문서를 기반으로 질문에 답할 수 있기를 바랐던 적이 있습니까? 사용자 정의 모델을 교육할 필요 없이 귀사에만 적용됩니까? 대답은 그렇습니다 전화 검색 증강 생성(RAG), 가장 아름다운 건축물 중 하나입니다. 현대 AI의 강력하고 실용적인 기술. 그리고 pgVector를 사용하는 PostgreSQL은 최고의 도구 중 하나입니다. 그것을 구현하기 위해.
RAG는 두 가지 보완적인 기능을 결합합니다. 의미 검색 (나를 찾아 신청서와 가장 관련이 있는 서류) 자연어 생성 (해당 문서를 기반으로 일관된 응답을 생성합니다.) 결과는 응답하는 시스템입니다 사전 훈련된 모델에 대한 일반적인 지식이 아닌 데이터에 대한 최신 지식을 바탕으로 합니다.
이 기사에서는 문서 수집부터 완전한 엔드투엔드 RAG 파이프라인을 구축합니다. GPT-4에서 생성된 응답이 포함된 쿼리에 모두 PostgreSQL에서 실행됩니다. 추가 데이터베이스가 없습니다. 벡터 저장소에 대한 외부 서비스가 없습니다.
시리즈 개요
| # | Articolo | 집중하다 |
|---|---|---|
| 1 | pg벡터 | 설치, 운영자, 인덱싱 |
| 2 | 심층적인 임베딩 | 모델, 거리, 세대 |
| 3 | 현재 위치 - PostgreSQL을 사용한 RAG | 엔드투엔드 RAG 파이프라인 |
| 4 | 유사성 검색 | 알고리즘 및 최적화 |
| 5 | HNSW 및 IVFFlat | 고급 인덱싱 전략 |
| 6 | 생산 중인 RAG | 확장성 및 성능 |
무엇을 배울 것인가
- RAG 시스템의 전체 아키텍처: 구성요소 및 데이터 흐름
- 문서 수집 파이프라인: 로드, 구문 분석, 청킹
- pgVector를 사용한 PostgreSQL의 스토리지 전략
- 검색: 쿼리부터 가장 관련성이 높은 청크 선택까지
- 생성: 프롬프트를 구축하고 GPT-4와 통합하는 방법
- 하이브리드 검색: 벡터 검색과 PostgreSQL 전체 텍스트 검색 결합
- RAG 품질 평가: 지표 및 도구
RAG 아키텍처: 작동 방식
RAG 시스템에는 서로 다른 시간에 작동하는 두 가지 주요 단계가 있습니다.
1단계: 수집(오프라인)
한 번 발생합니다(또는 문서 변경에 따라 주기적으로). 프로세스는 다음과 같습니다.
- 짐: 파일 시스템, URL, 데이터베이스, API에서 문서 로드
- 구문 분석: PDF, DOCX, HTML, Markdown에서 텍스트 추출
- 청크: 텍스트를 최적의 크기로 조각으로 분할
- 포함시키다: 각 청크에 대한 임베딩 벡터 생성
- 가게: PostgreSQL에 청크 + 임베딩 + 메타데이터 저장
2단계: 검색 + 생성(온라인, 각 쿼리에 대해)
- 쿼리: 사용자가 자연어로 질문을 합니다.
- 쿼리 삽입: 동일한 모델을 사용하여 질문을 벡터로 변환
- 찾다: PostgreSQL에서 가장 유사한 k개의 청크 찾기
- 문맥: 발견된 청크를 컨텍스트로 조립
- 생성하다: 답변을 얻으려면 LLM에 질문 + 컨텍스트를 보내십시오.
## Flusso RAG Visualizzato
INGESTION (offline):
Documento PDF
|
v
[Parser] -> Testo grezzo
|
v
[Chunker] -> ["chunk 1", "chunk 2", ..., "chunk N"]
|
v
[Embedding Model] -> [[0.023, -0.841, ...], [0.891, 0.234, ...], ...]
|
v
[PostgreSQL + pgvector] -> Memorizzazione permanente
QUERY (online):
Domanda utente: "Come funziona l'indicizzazione HNSW?"
|
v
[Embedding Model] -> [0.045, -0.823, ...] (query vector)
|
v
[PostgreSQL ANN Search] -> Top 5 chunk più simili
|
v
[Prompt Builder] -> "Usa questo contesto: [chunk1, chunk2, ...] Domanda: ..."
|
v
[GPT-4 / Claude] -> "L'indicizzazione HNSW (Hierarchical Navigable Small World) ..."
|
v
Risposta all'utente
프로젝트 설정
중독
# requirements.txt
openai>=1.12.0
psycopg2-binary>=2.9.9
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.20
pypdf>=3.17.0
python-dotenv>=1.0.0
tiktoken>=0.5.0
# Installazione
pip install -r requirements.txt
데이터베이스 구성
-- Setup iniziale PostgreSQL
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm; -- per full-text search
-- Schema completo per RAG
CREATE TABLE IF NOT EXISTS rag_documents (
id BIGSERIAL PRIMARY KEY,
-- Informazioni sorgente
source_path TEXT NOT NULL,
source_type TEXT NOT NULL CHECK (source_type IN ('pdf', 'txt', 'md', 'html', 'docx')),
source_hash TEXT NOT NULL, -- hash MD5 del file originale
-- Chunk info
chunk_index INTEGER NOT NULL,
chunk_total INTEGER,
-- Contenuto
title TEXT,
content TEXT NOT NULL,
content_length INTEGER GENERATED ALWAYS AS (length(content)) STORED,
-- Embedding
embedding_model TEXT NOT NULL DEFAULT 'text-embedding-3-small',
embedding vector(1536),
-- Metadata
metadata JSONB DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
-- Timestamps
ingested_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (source_path, chunk_index, source_hash)
);
-- Indice HNSW per vector search veloce
CREATE INDEX idx_rag_embedding_hnsw
ON rag_documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Indice GIN per full-text search
CREATE INDEX idx_rag_content_fts
ON rag_documents
USING gin (to_tsvector('english', content));
-- Indice per filtri comuni
CREATE INDEX idx_rag_source_type ON rag_documents (source_type);
CREATE INDEX idx_rag_tags ON rag_documents USING gin (tags);
CREATE INDEX idx_rag_metadata ON rag_documents USING gin (metadata);
문서 수집 파이프라인
Python 프로젝트 구조
rag_system/
├── config.py # Configurazione DB, API keys, parametri
├── ingestion/
│ ├── __init__.py
│ ├── loaders.py # Caricamento documenti da varie sorgenti
│ ├── parsers.py # Parsing PDF, DOCX, HTML, Markdown
│ ├── chunkers.py # Strategie di chunking
│ └── pipeline.py # Pipeline ingestion orchestrator
├── retrieval/
│ ├── __init__.py
│ ├── embedder.py # Generazione embeddings
│ └── searcher.py # Vector search e hybrid search
├── generation/
│ ├── __init__.py
│ ├── prompts.py # Template prompts
│ └── generator.py # Integrazione LLM
├── rag.py # Classe principale RAGSystem
└── main.py # Entry point
config.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Config:
# Database
db_host: str = os.getenv("DB_HOST", "localhost")
db_port: int = int(os.getenv("DB_PORT", "5432"))
db_name: str = os.getenv("DB_NAME", "ragdb")
db_user: str = os.getenv("DB_USER", "postgres")
db_password: str = os.getenv("DB_PASSWORD", "")
# OpenAI
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
embedding_model: str = "text-embedding-3-small"
embedding_dim: int = 1536
chat_model: str = "gpt-4o-mini" # cost-effective default
# Chunking
chunk_size: int = 800
chunk_overlap: int = 150
min_chunk_size: int = 100
# Retrieval
top_k: int = 5
similarity_threshold: float = 0.65 # minimum cosine similarity
# Generation
max_context_tokens: int = 8000
temperature: float = 0.1 # low temperature for factual answers
def get_db_url(self) -> str:
return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"
config = Config()
ingestion/loaders.py - 다중 소스 로드
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
import requests
from bs4 import BeautifulSoup
@dataclass
class RawDocument:
content: str
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
def load_text_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
return RawDocument(
content=content,
source_path=path,
source_type="txt",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=p.stem
)
def load_markdown_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
# Estrai titolo dal frontmatter o dalla prima riga H1
title = None
for line in content.split("\n"):
if line.startswith("# "):
title = line[2:].strip()
break
return RawDocument(
content=content,
source_path=path,
source_type="md",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_pdf_file(path: str) -> RawDocument:
from pypdf import PdfReader
reader = PdfReader(path)
pages = []
for page in reader.pages:
pages.append(page.extract_text())
content = "\n\n".join(pages)
return RawDocument(
content=content,
source_path=path,
source_type="pdf",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=Path(path).stem,
metadata={"pages": len(reader.pages)}
)
def load_url(url: str) -> RawDocument:
response = requests.get(url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Rimuovi script, style, nav
for tag in soup(["script", "style", "nav", "header", "footer"]):
tag.decompose()
content = soup.get_text(separator="\n", strip=True)
title = soup.title.string if soup.title else url
return RawDocument(
content=content,
source_path=url,
source_type="html",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_document(source: str) -> RawDocument:
"""Smart loader che sceglie il parser corretto."""
if source.startswith("http"):
return load_url(source)
p = Path(source)
loaders = {
".txt": load_text_file,
".md": load_markdown_file,
".pdf": load_pdf_file,
}
loader = loaders.get(p.suffix.lower())
if not loader:
raise ValueError(f"Tipo file non supportato: {p.suffix}")
return loader(source)
ingestion/chunkers.py - 지능형 청킹
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dataclasses import dataclass
from typing import Optional
@dataclass
class TextChunk:
content: str
chunk_index: int
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class SmartChunker:
"""
Chunker che adatta la strategia al tipo di documento.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Separatori per testo generico
self._text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " "],
length_function=len
)
# Separatori per markdown (rispetta la struttura)
self._md_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["## ", "# ", "\n\n", "\n", ". "],
length_function=len
)
def chunk(self, doc) -> list[TextChunk]:
"""Chunk un documento, scegliendo la strategia giusta."""
if doc.source_type == "md":
raw_chunks = self._md_splitter.split_text(doc.content)
else:
raw_chunks = self._text_splitter.split_text(doc.content)
# Filtra chunk troppo piccoli
raw_chunks = [c for c in raw_chunks if len(c.strip()) > 100]
return [
TextChunk(
content=chunk.strip(),
chunk_index=i,
source_path=doc.source_path,
source_type=doc.source_type,
source_hash=doc.source_hash,
title=doc.title,
metadata={
**doc.metadata,
"chunk_total": len(raw_chunks),
"char_count": len(chunk)
}
)
for i, chunk in enumerate(raw_chunks)
]
ingestion/pipeline.py - 기본 조정자
import psycopg2
from psycopg2.extras import execute_values
import json
import time
from .loaders import load_document
from .chunkers import SmartChunker
class IngestionPipeline:
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.chunker = SmartChunker(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap
)
self.conn = psycopg2.connect(config.get_db_url())
def is_already_ingested(self, source_path: str, source_hash: str) -> bool:
"""Controlla se il documento e già nel DB con lo stesso hash (non cambiato)."""
with self.conn.cursor() as cur:
cur.execute(
"SELECT COUNT(*) FROM rag_documents WHERE source_path = %s AND source_hash = %s",
(source_path, source_hash)
)
return cur.fetchone()[0] > 0
def ingest(self, source: str, tags: list[str] = None, force: bool = False) -> dict:
"""
Processa un documento e lo inserisce in PostgreSQL.
Ritorna statistiche sull'operazione.
"""
tags = tags or []
start_time = time.time()
# 1. Carica documento
doc = load_document(source)
print(f"Caricato: {source} ({len(doc.content)} chars, hash: {doc.source_hash[:8]})")
# 2. Controlla se già presente (incrementale update)
if not force and self.is_already_ingested(source, doc.source_hash):
print(f" Saltato: documento non modificato")
return {"skipped": True, "source": source}
# 3. Chunking
chunks = self.chunker.chunk(doc)
print(f" Chunking: {len(chunks)} chunk creati")
# 4. Elimina versione precedente (se esiste)
with self.conn.cursor() as cur:
cur.execute("DELETE FROM rag_documents WHERE source_path = %s", (source,))
# 5. Genera embeddings in batch
texts = [c.content for c in chunks]
embeddings = self.embedder.embed_batch(texts)
print(f" Embeddings generati: {len(embeddings)} vettori dim {len(embeddings[0])}")
# 6. Inserisci in PostgreSQL
rows = [
(
c.source_path,
c.source_type,
c.source_hash,
c.chunk_index,
len(chunks), # chunk_total
c.title,
c.content,
self.config.embedding_model,
embeddings[i],
json.dumps(c.metadata),
tags
)
for i, c in enumerate(chunks)
]
with self.conn.cursor() as cur:
execute_values(cur, """
INSERT INTO rag_documents
(source_path, source_type, source_hash, chunk_index, chunk_total,
title, content, embedding_model, embedding, metadata, tags)
VALUES %s
ON CONFLICT (source_path, chunk_index, source_hash) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
updated_at = NOW()
""", rows, template="(%s,%s,%s,%s,%s,%s,%s,%s,%s::vector,%s::jsonb,%s::text[])")
self.conn.commit()
elapsed = time.time() - start_time
stats = {
"source": source,
"chunks": len(chunks),
"embeddings": len(embeddings),
"elapsed_sec": round(elapsed, 2)
}
print(f" Completato in {elapsed:.1f}s - {stats}")
return stats
def ingest_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Ingesta tutti i documenti in una directory."""
from pathlib import Path
extensions = extensions or [".txt", ".md", ".pdf"]
results = []
for path in Path(directory).rglob("*"):
if path.suffix.lower() in extensions:
result = self.ingest(str(path))
results.append(result)
return results
검색: 올바른 청크 찾기
검색/searcher.py
import psycopg2
from dataclasses import dataclass
from typing import Optional
@dataclass
class SearchResult:
id: int
source_path: str
source_type: str
chunk_index: int
title: Optional[str]
content: str
similarity: float
metadata: dict
class HybridSearcher:
"""
Combina vector search (semantica) con full-text search (keyword).
Reciprocal Rank Fusion per merging dei risultati.
"""
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.conn = psycopg2.connect(config.get_db_url())
def vector_search(self, query: str, top_k: int = 10,
source_type: Optional[str] = None,
tags: Optional[list[str]] = None) -> list[SearchResult]:
"""Ricerca semantica con filtri opzionali."""
query_embedding = self.embedder.embed_single(query)
threshold = 1 - self.config.similarity_threshold # converti a cosine distance
# Costruisci query dinamica con filtri opzionali
filters = ["embedding <=> %s::vector < %s"]
params = [query_embedding, threshold]
if source_type:
filters.append("source_type = %s")
params.append(source_type)
if tags:
filters.append("tags && %s::text[]") -- overlap: almeno un tag in comune
params.append(tags)
where_clause = " AND ".join(filters)
with self.conn.cursor() as cur:
cur.execute(f"""
SELECT
id, source_path, source_type, chunk_index, title, content,
1 - (embedding <=> %s::vector) AS similarity,
metadata
FROM rag_documents
WHERE {where_clause}
ORDER BY embedding <=> %s::vector
LIMIT %s
""", [query_embedding] + params + [query_embedding, top_k])
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(r[6], 4), metadata=r[7]
)
for r in rows
]
def fulltext_search(self, query: str, top_k: int = 10) -> list[SearchResult]:
"""Full-text search con ts_rank per ranking."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT
id, source_path, source_type, chunk_index, title, content,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS rank,
metadata
FROM rag_documents
WHERE to_tsvector('english', content) @@
plainto_tsquery('english', %s)
ORDER BY rank DESC
LIMIT %s
""", (query, query, top_k))
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(float(r[6]), 4), metadata=r[7]
)
for r in rows
]
def hybrid_search(self, query: str, top_k: int = 5,
vector_weight: float = 0.7) -> list[SearchResult]:
"""
Reciprocal Rank Fusion (RRF) per combinare vector e full-text search.
RRF Score = sum(1 / (k + rank)) per ogni lista di risultati.
"""
k_rrf = 60 # costante RRF standard
# Ottieni entrambi i risultati
vector_results = self.vector_search(query, top_k=top_k * 2)
fts_results = self.fulltext_search(query, top_k=top_k * 2)
# Calcola RRF scores
scores = {}
all_results = {}
for rank, result in enumerate(vector_results):
scores[result.id] = scores.get(result.id, 0) + vector_weight / (k_rrf + rank + 1)
all_results[result.id] = result
fts_weight = 1 - vector_weight
for rank, result in enumerate(fts_results):
scores[result.id] = scores.get(result.id, 0) + fts_weight / (k_rrf + rank + 1)
all_results[result.id] = result
# Ordina per RRF score e prendi top_k
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
final_results = [all_results[id] for id in sorted_ids[:top_k]]
# Aggiorna similarity con RRF score normalizzato
max_score = scores[sorted_ids[0]] if sorted_ids else 1
for result in final_results:
result.similarity = round(scores[result.id] / max_score, 4)
return final_results
생성: 상황에서 응답까지
생성/prompts.py
from string import Template
# System prompt che definisce il comportamento dell'AI
RAG_SYSTEM_PROMPT = """Sei un assistente AI preciso e utile. Rispondi alle domande
basandoti ESCLUSIVAMENTE sui documenti di contesto forniti.
Regole:
1. Usa SOLO le informazioni presenti nel contesto. Non inventare.
2. Se la risposta non e nel contesto, dillo chiaramente.
3. Cita le sorgenti usando [Fonte: nome_file, chunk X] dopo ogni affermazione.
4. Mantieni un tono professionale e conciso.
5. Struttura la risposta in modo chiaro con paragrafi o bullet points se appropriato.
"""
def build_rag_prompt(query: str, context_chunks: list, include_sources: bool = True) -> str:
"""
Costruisce il prompt per l'LLM con il contesto recuperato.
Args:
query: La domanda dell'utente
context_chunks: Lista di SearchResult
include_sources: Se includere le informazioni sulla sorgente
Returns:
Il prompt formattato per l'LLM
"""
if not context_chunks:
return f"Domanda: {query}\n\nNota: Non ho trovato documenti rilevanti nel knowledge base."
# Costruisci il contesto con numerazione e sorgente
context_parts = []
for i, chunk in enumerate(context_chunks, 1):
source_info = f"[Fonte: {chunk.source_path}, chunk {chunk.chunk_index}]" if include_sources else ""
context_parts.append(f"--- Documento {i} {source_info} ---\n{chunk.content}")
context_text = "\n\n".join(context_parts)
return f"""Contesto dai documenti:
{context_text}
---
Domanda dell'utente: {query}
Rispondi basandoti sul contesto fornito."""
생성/generator.py
from openai import OpenAI
from dataclasses import dataclass
from typing import Optional
import tiktoken
from .prompts import RAG_SYSTEM_PROMPT, build_rag_prompt
@dataclass
class RAGResponse:
answer: str
sources: list[dict]
model: str
total_tokens: int
prompt_tokens: int
completion_tokens: int
class RAGGenerator:
def __init__(self, config):
self.config = config
self.client = OpenAI(api_key=config.openai_api_key)
self.tokenizer = tiktoken.encoding_for_model("gpt-4o")
def count_tokens(self, text: str) -> int:
return len(self.tokenizer.encode(text))
def truncate_context(self, chunks: list, max_tokens: int) -> list:
"""
Tronca il contesto per non superare il limite di token.
Mantieni i chunk più rilevanti (gia ordinati per similarità).
"""
selected = []
used_tokens = 0
for chunk in chunks:
chunk_tokens = self.count_tokens(chunk.content)
if used_tokens + chunk_tokens > max_tokens:
break
selected.append(chunk)
used_tokens += chunk_tokens
return selected
def generate(self, query: str, context_chunks: list,
stream: bool = False) -> RAGResponse:
"""
Genera una risposta RAG.
Args:
query: La domanda dell'utente
context_chunks: Chunk recuperati da PostgreSQL
stream: Se True, usa streaming (non implementato qui per semplicità)
"""
# Tronca il contesto se necessario
max_context_tokens = self.config.max_context_tokens
truncated_chunks = self.truncate_context(context_chunks, max_context_tokens)
if len(truncated_chunks) < len(context_chunks):
print(f" Contesto troncato: {len(context_chunks)} -> {len(truncated_chunks)} chunk")
# Costruisci il prompt
user_prompt = build_rag_prompt(query, truncated_chunks)
# Chiama l'LLM
response = self.client.chat.completions.create(
model=self.config.chat_model,
messages=[
{"role": "system", "content": RAG_SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
temperature=self.config.temperature,
max_tokens=1500
)
answer = response.choices[0].message.content
usage = response.usage
# Prepara le sorgenti per la risposta
sources = [
{
"source": chunk.source_path,
"chunk_index": chunk.chunk_index,
"similarity": chunk.similarity,
"excerpt": chunk.content[:200] + "..."
}
for chunk in truncated_chunks
]
return RAGResponse(
answer=answer,
sources=sources,
model=self.config.chat_model,
total_tokens=usage.total_tokens,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens
)
완전한 RAG 시스템
rag.py - 메인 클래스
from config import config, Config
from ingestion.pipeline import IngestionPipeline
from retrieval.searcher import HybridSearcher
from generation.generator import RAGGenerator
class EmbeddingService:
"""Wrapper per generazione embeddings OpenAI."""
def __init__(self, cfg: Config):
from openai import OpenAI
self.client = OpenAI(api_key=cfg.openai_api_key)
self.model = cfg.embedding_model
def embed_single(self, text: str) -> list[float]:
resp = self.client.embeddings.create(
input=[text.replace("\n", " ")],
model=self.model
)
return resp.data[0].embedding
def embed_batch(self, texts: list[str]) -> list[list[float]]:
cleaned = [t.replace("\n", " ").strip() for t in texts]
resp = self.client.embeddings.create(input=cleaned, model=self.model)
return [item.embedding for item in resp.data]
class RAGSystem:
"""
Sistema RAG completo: ingestion + retrieval + generation.
"""
def __init__(self, cfg: Config = None):
self.config = cfg or config
self.embedder = EmbeddingService(self.config)
self.ingestion = IngestionPipeline(self.config, self.embedder)
self.searcher = HybridSearcher(self.config, self.embedder)
self.generator = RAGGenerator(self.config)
def add_document(self, source: str, tags: list[str] = None) -> dict:
"""Aggiunge un documento al knowledge base."""
return self.ingestion.ingest(source, tags=tags)
def add_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Aggiunge tutti i documenti di una directory."""
return self.ingestion.ingest_directory(directory, extensions)
def ask(self, question: str, use_hybrid: bool = True,
source_type: str = None) -> dict:
"""
Pone una domanda al sistema RAG.
Returns:
dict con answer, sources, usage
"""
# 1. Retrieval
if use_hybrid:
chunks = self.searcher.hybrid_search(question, top_k=self.config.top_k)
else:
chunks = self.searcher.vector_search(
question, top_k=self.config.top_k, source_type=source_type
)
if not chunks:
return {
"answer": "Non ho trovato informazioni rilevanti per rispondere a questa domanda.",
"sources": [],
"retrieval": {"chunks_found": 0}
}
# 2. Generation
response = self.generator.generate(question, chunks)
return {
"answer": response.answer,
"sources": response.sources,
"retrieval": {
"chunks_found": len(chunks),
"top_similarity": chunks[0].similarity if chunks else 0
},
"usage": {
"model": response.model,
"total_tokens": response.total_tokens
}
}
main.py - 시스템 사용량
from rag import RAGSystem
# Inizializza il sistema
rag = RAGSystem()
# --- INGESTION ---
print("=== Aggiungendo documenti al knowledge base ===")
# Aggiungi singoli file
rag.add_document("docs/postgresql_guide.pdf", tags=["postgresql", "database"])
rag.add_document("docs/pgvector_tutorial.md", tags=["pgvector", "vector-search"])
rag.add_document("https://www.postgresql.org/docs/current/", tags=["official-docs"])
# Aggiungi una directory intera
stats = rag.add_directory("docs/", extensions=[".md", ".txt", ".pdf"])
print(f"Ingestati {len(stats)} documenti")
# --- QUERY ---
print("\n=== Interrogando il sistema ===")
questions = [
"Come si installa pgvector su PostgreSQL 16?",
"Qual e la differenza tra HNSW e IVFFlat?",
"Come si ottimizza la memoria per il vector search?",
]
for q in questions:
print(f"\nDomanda: {q}")
print("-" * 60)
result = rag.ask(q)
print(f"Risposta:\n{result['answer']}")
print(f"\nSorgenti utilizzate ({len(result['sources'])}):")
for src in result["sources"]:
print(f" - {src['source']} [similarità: {src['similarity']}]")
print(f"\nToken usati: {result['usage']['total_tokens']}")
하이브리드 검색: PostgreSQL 전체 텍스트 + 벡터
RAG용 PostgreSQL의 가장 큰 장점 중 하나는 검색을 하나의 쿼리로 결합할 수 있다는 것입니다. 고전적인 전체 텍스트 검색을 사용한 의미론(벡터). 이는 특히 유용합니다. 정확한 기술 용어(고유 명칭, 약어, 소프트웨어 버전)가 포함된 쿼리 의미론적 검색만으로는 다음을 완벽하게 포착하지 못할 수 있습니다.
-- Hybrid search in SQL puro: vettore + full-text in una query
WITH vector_search AS (
SELECT id, content, source_path, chunk_index,
1 - (embedding <=> %s::vector) AS vector_score,
ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS vector_rank
FROM rag_documents
ORDER BY embedding <=> %s::vector
LIMIT 20
),
fts_search AS (
SELECT id, content, source_path, chunk_index,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS fts_score,
ROW_NUMBER() OVER (
ORDER BY ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) DESC
) AS fts_rank
FROM rag_documents
WHERE to_tsvector('english', content) @@ plainto_tsquery('english', %s)
LIMIT 20
),
-- Reciprocal Rank Fusion
rrf AS (
SELECT
COALESCE(v.id, f.id) AS id,
COALESCE(v.content, f.content) AS content,
COALESCE(v.source_path, f.source_path) AS source_path,
-- RRF score: 0.7 * vector_weight + 0.3 * fts_weight
COALESCE(0.7 / (60 + v.vector_rank), 0) +
COALESCE(0.3 / (60 + f.fts_rank), 0) AS rrf_score
FROM vector_search v
FULL OUTER JOIN fts_search f ON v.id = f.id
)
SELECT id, content, source_path, rrf_score
FROM rrf
ORDER BY rrf_score DESC
LIMIT 5;
RAG 품질 평가
RAG 시스템이 제대로 작동하는지 어떻게 측정합니까? 주요 측정항목은 다음과 같습니다.
| 미터법 | 측정 대상 | 목표 | 계산 방법 |
|---|---|---|---|
| 리콜@K | 올바른 문서는 상위 K개 결과에서 발견됩니다. | > 0.70 | Ground Truth가 포함된 테스트 세트 |
| 정밀@K | 발견된 결과는 실제로 관련성이 있습니다. | > 0.60 | 수동 주석 |
| 답변 신실함 | 검색된 컨텍스트에서 응답이 지원됩니다. | > 0.80 | RAGAS 프레임워크 |
| 답변 관련성 | 답변은 질문에 대한 답변입니다. | > 0.75 | RAGAS 프레임워크 |
| P95 대기 시간 | 95번째 백분위수에서의 응답 시간 | 3초 미만 | 생산 중 모니터링 |
# Valutazione con RAGAS
# pip install ragas
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_recall
from datasets import Dataset
# Prepara il dataset di test
test_data = {
"question": [
"Come si crea un indice HNSW in pgvector?",
"Qual e il limite di dimensioni per i vettori in pgvector?",
],
"answer": [
# Risposte generate dal tuo sistema RAG
rag.ask("Come si crea un indice HNSW in pgvector?")["answer"],
rag.ask("Qual e il limite di dimensioni per i vettori in pgvector?")["answer"],
],
"contexts": [
# I chunk recuperati per ciascuna domanda
[c["excerpt"] for c in rag.ask("...")["sources"]],
[c["excerpt"] for c in rag.ask("...")["sources"]],
],
"ground_truth": [
"CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64)",
"Il limite e 16000 dimensioni per vettori di tipo vector in pgvector 0.7+",
]
}
dataset = Dataset.from_dict(test_data)
results = evaluate(dataset, metrics=[faithfulness, answer_relevancy, context_recall])
print(results)
고급 청킹 전략
청킹 품질은 RAG 품질의 가장 중요한 요소 중 하나입니다. 전략 제대로 보정되지 않은 청킹은 최상의 임베딩 모델을 사용하더라도 성능을 저하시킬 수 있습니다. 특정 사용 사례에 대한 고급 전략은 다음과 같습니다.
의미론적 중복을 사용한 청킹
from langchain.text_splitter import RecursiveCharacterTextSplitter
import re
class SemanticChunker:
"""
Chunker che preserva la coerenza semantica dei paragrafi.
A differenza del semplice chunking per caratteri, questo
rispetta i confini di frase e paragrafo.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_by_sentences(self, text: str) -> list[str]:
"""Divide il testo in frasi usando regex."""
# Pattern per fine frase: ., !, ? seguiti da spazio e maiuscola
sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text)
return [s.strip() for s in sentences if s.strip()]
def create_chunks_with_context(self, text: str) -> list[str]:
"""
Crea chunk con context overlap:
ogni chunk include le ultime N parole del chunk precedente.
"""
sentences = self.split_by_sentences(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
# Se la frase corrente supera da sola il chunk_size, spezzala
if sentence_size > self.chunk_size:
if current_chunk:
chunks.append(" ".join(current_chunk))
# Mantieni overlap: ultime N parole
words = " ".join(current_chunk).split()
overlap_words = words[-30:] # ~150 caratteri overlap
current_chunk = [" ".join(overlap_words)]
current_size = len(" ".join(overlap_words))
# Spezza la frase lunga
splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap
)
for sub in splitter.split_text(sentence):
chunks.append(sub)
continue
# Aggiungi frase al chunk corrente
if current_size + sentence_size + 1 > self.chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
# Overlap: ultime 30 parole del chunk precedente
words = " ".join(current_chunk).split()
overlap_words = words[-30:]
current_chunk = [" ".join(overlap_words), sentence]
current_size = len(" ".join(current_chunk))
else:
current_chunk.append(sentence)
current_size += sentence_size + 1
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
문서 구조별 청크(헤더 기반)
import re
from typing import Generator
def chunk_by_headers(markdown_text: str, max_chunk_size: int = 800) -> Generator:
"""
Chunking che rispetta la struttura gerarchica dei documenti Markdown.
Ogni sezione H2/H3 diventa un contesto separato, preservando il titolo
come header del chunk (fondamentale per la qualità dell'embedding).
"""
# Regex per trovare header Markdown (H1-H4)
header_pattern = re.compile(r'^(#{1,4})\s+(.+)






