PostgreSQL を使用した RAG: ドキュメントからレスポンスまで
AI システムが文書に基づいて質問に答えてくれたらいいのにと思ったことはありませんか カスタム モデルをトレーニングする必要がなく、あなたの会社に固有のものですか?答えは「はい」 電話する 検索拡張生成 (RAG)、そしてそれは最も美しい建築の一つです 現代の AI の強力かつ実用的なテクノロジー。 PostgreSQL と pgvector は最高のツールの 1 つです それを実装するために。
RAG は、2 つの補完的な機能を組み合わせます。 セマンティック検索 (私を見つけてください アプリケーションに最も関連する書類)と 自然言語の生成 (それらの文書に基づいて一貫した応答を作成します)。その結果、応答するシステムが生まれます。 事前トレーニングされたモデルに関する一般的な知識ではなく、データに関する最新の知識が必要です。
この記事では、ドキュメントの取り込みから完全なエンドツーエンドの RAG パイプラインを構築します。 GPT-4 によって生成された応答を含むクエリへの応答はすべて PostgreSQL 上で行われます。追加のデータベースは必要ありません。 ベクター ストア用の外部サービスはありません。
シリーズ概要
| # | アイテム | 集中 |
|---|---|---|
| 1 | ベクター | インストール、オペレーター、インデックス作成 |
| 2 | 埋め込みの詳細 | モデル、距離、世代 |
| 3 | 現在位置 - PostgreSQL を使用した RAG | エンドツーエンドの RAG パイプライン |
| 4 | 類似性検索 | アルゴリズムと最適化 |
| 5 | HNSW および IVFFlat | 高度なインデックス作成戦略 |
| 6 | 本番環境の RAG | スケーラビリティとパフォーマンス |
何を学ぶか
- RAG システムの完全なアーキテクチャ: コンポーネントとデータ フロー
- ドキュメント取り込みパイプライン: 読み込み、解析、チャンク化
- pgvector を使用した PostgreSQL のストレージ戦略
- 取得: クエリから最も関連性の高いチャンクの選択まで
- 生成: プロンプトを作成して GPT-4 と統合する方法
- ハイブリッド検索: ベクトル検索と PostgreSQL 全文検索を組み合わせます。
- RAG 品質評価: 指標とツール
RAG アーキテクチャ: その仕組み
RAG システムには、異なる時間に動作する 2 つの主要なフェーズがあります。
フェーズ 1: 取り込み (オフライン)
1 回 (またはドキュメントの変更に応じて定期的に) 発生します。プロセスは次のとおりです。
- 負荷: ファイルシステム、URL、データベース、APIからドキュメントをロード
- 解析: PDF、DOCX、HTML、Markdown からテキストを抽出
- チャンク: テキストを最適なサイズの断片に分割する
- 埋め込む: 各チャンクの埋め込みベクトルを生成する
- 店: チャンク + 埋め込み + メタデータを PostgreSQL に保存
フェーズ 2: 取得 + 生成 (オンライン、クエリごと)
- クエリ: ユーザーが自然言語で質問する
- 埋め込みクエリ: 同じモデルを使用して質問をベクトルに変換します
- 検索: PostgreSQL で最も類似した k 個のチャンクを見つける
- コンテクスト: 見つかったチャンクをコンテキストとして組み立てます
- 生成する: 質問とコンテキストを LLM に送信して回答を取得します
## Flusso RAG Visualizzato
INGESTION (offline):
Documento PDF
|
v
[Parser] -> Testo grezzo
|
v
[Chunker] -> ["chunk 1", "chunk 2", ..., "chunk N"]
|
v
[Embedding Model] -> [[0.023, -0.841, ...], [0.891, 0.234, ...], ...]
|
v
[PostgreSQL + pgvector] -> Memorizzazione permanente
QUERY (online):
Domanda utente: "Come funziona l'indicizzazione HNSW?"
|
v
[Embedding Model] -> [0.045, -0.823, ...] (query vector)
|
v
[PostgreSQL ANN Search] -> Top 5 chunk più simili
|
v
[Prompt Builder] -> "Usa questo contesto: [chunk1, chunk2, ...] Domanda: ..."
|
v
[GPT-4 / Claude] -> "L'indicizzazione HNSW (Hierarchical Navigable Small World) ..."
|
v
Risposta all'utente
プロジェクトのセットアップ
依存症
# requirements.txt
openai>=1.12.0
psycopg2-binary>=2.9.9
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.20
pypdf>=3.17.0
python-dotenv>=1.0.0
tiktoken>=0.5.0
# Installazione
pip install -r requirements.txt
データベース構成
-- Setup iniziale PostgreSQL
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm; -- per full-text search
-- Schema completo per RAG
CREATE TABLE IF NOT EXISTS rag_documents (
id BIGSERIAL PRIMARY KEY,
-- Informazioni sorgente
source_path TEXT NOT NULL,
source_type TEXT NOT NULL CHECK (source_type IN ('pdf', 'txt', 'md', 'html', 'docx')),
source_hash TEXT NOT NULL, -- hash MD5 del file originale
-- Chunk info
chunk_index INTEGER NOT NULL,
chunk_total INTEGER,
-- Contenuto
title TEXT,
content TEXT NOT NULL,
content_length INTEGER GENERATED ALWAYS AS (length(content)) STORED,
-- Embedding
embedding_model TEXT NOT NULL DEFAULT 'text-embedding-3-small',
embedding vector(1536),
-- Metadata
metadata JSONB DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
-- Timestamps
ingested_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (source_path, chunk_index, source_hash)
);
-- Indice HNSW per vector search veloce
CREATE INDEX idx_rag_embedding_hnsw
ON rag_documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Indice GIN per full-text search
CREATE INDEX idx_rag_content_fts
ON rag_documents
USING gin (to_tsvector('english', content));
-- Indice per filtri comuni
CREATE INDEX idx_rag_source_type ON rag_documents (source_type);
CREATE INDEX idx_rag_tags ON rag_documents USING gin (tags);
CREATE INDEX idx_rag_metadata ON rag_documents USING gin (metadata);
ドキュメント取り込みパイプライン
Python プロジェクトの構造
rag_system/
├── config.py # Configurazione DB, API keys, parametri
├── ingestion/
│ ├── __init__.py
│ ├── loaders.py # Caricamento documenti da varie sorgenti
│ ├── parsers.py # Parsing PDF, DOCX, HTML, Markdown
│ ├── chunkers.py # Strategie di chunking
│ └── pipeline.py # Pipeline ingestion orchestrator
├── retrieval/
│ ├── __init__.py
│ ├── embedder.py # Generazione embeddings
│ └── searcher.py # Vector search e hybrid search
├── generation/
│ ├── __init__.py
│ ├── prompts.py # Template prompts
│ └── generator.py # Integrazione LLM
├── rag.py # Classe principale RAGSystem
└── main.py # Entry point
config.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Config:
# Database
db_host: str = os.getenv("DB_HOST", "localhost")
db_port: int = int(os.getenv("DB_PORT", "5432"))
db_name: str = os.getenv("DB_NAME", "ragdb")
db_user: str = os.getenv("DB_USER", "postgres")
db_password: str = os.getenv("DB_PASSWORD", "")
# OpenAI
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
embedding_model: str = "text-embedding-3-small"
embedding_dim: int = 1536
chat_model: str = "gpt-4o-mini" # cost-effective default
# Chunking
chunk_size: int = 800
chunk_overlap: int = 150
min_chunk_size: int = 100
# Retrieval
top_k: int = 5
similarity_threshold: float = 0.65 # minimum cosine similarity
# Generation
max_context_tokens: int = 8000
temperature: float = 0.1 # low temperature for factual answers
def get_db_url(self) -> str:
return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"
config = Config()
ingestion/loaders.py - マルチソースの読み込み
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
import requests
from bs4 import BeautifulSoup
@dataclass
class RawDocument:
content: str
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
def load_text_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
return RawDocument(
content=content,
source_path=path,
source_type="txt",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=p.stem
)
def load_markdown_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
# Estrai titolo dal frontmatter o dalla prima riga H1
title = None
for line in content.split("\n"):
if line.startswith("# "):
title = line[2:].strip()
break
return RawDocument(
content=content,
source_path=path,
source_type="md",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_pdf_file(path: str) -> RawDocument:
from pypdf import PdfReader
reader = PdfReader(path)
pages = []
for page in reader.pages:
pages.append(page.extract_text())
content = "\n\n".join(pages)
return RawDocument(
content=content,
source_path=path,
source_type="pdf",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=Path(path).stem,
metadata={"pages": len(reader.pages)}
)
def load_url(url: str) -> RawDocument:
response = requests.get(url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Rimuovi script, style, nav
for tag in soup(["script", "style", "nav", "header", "footer"]):
tag.decompose()
content = soup.get_text(separator="\n", strip=True)
title = soup.title.string if soup.title else url
return RawDocument(
content=content,
source_path=url,
source_type="html",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_document(source: str) -> RawDocument:
"""Smart loader che sceglie il parser corretto."""
if source.startswith("http"):
return load_url(source)
p = Path(source)
loaders = {
".txt": load_text_file,
".md": load_markdown_file,
".pdf": load_pdf_file,
}
loader = loaders.get(p.suffix.lower())
if not loader:
raise ValueError(f"Tipo file non supportato: {p.suffix}")
return loader(source)
ingestion/chunkers.py - インテリジェントなチャンキング
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dataclasses import dataclass
from typing import Optional
@dataclass
class TextChunk:
content: str
chunk_index: int
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class SmartChunker:
"""
Chunker che adatta la strategia al tipo di documento.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Separatori per testo generico
self._text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " "],
length_function=len
)
# Separatori per markdown (rispetta la struttura)
self._md_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["## ", "# ", "\n\n", "\n", ". "],
length_function=len
)
def chunk(self, doc) -> list[TextChunk]:
"""Chunk un documento, scegliendo la strategia giusta."""
if doc.source_type == "md":
raw_chunks = self._md_splitter.split_text(doc.content)
else:
raw_chunks = self._text_splitter.split_text(doc.content)
# Filtra chunk troppo piccoli
raw_chunks = [c for c in raw_chunks if len(c.strip()) > 100]
return [
TextChunk(
content=chunk.strip(),
chunk_index=i,
source_path=doc.source_path,
source_type=doc.source_type,
source_hash=doc.source_hash,
title=doc.title,
metadata={
**doc.metadata,
"chunk_total": len(raw_chunks),
"char_count": len(chunk)
}
)
for i, chunk in enumerate(raw_chunks)
]
ingestion/pipeline.py - メイン オーケストレーター
import psycopg2
from psycopg2.extras import execute_values
import json
import time
from .loaders import load_document
from .chunkers import SmartChunker
class IngestionPipeline:
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.chunker = SmartChunker(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap
)
self.conn = psycopg2.connect(config.get_db_url())
def is_already_ingested(self, source_path: str, source_hash: str) -> bool:
"""Controlla se il documento e già nel DB con lo stesso hash (non cambiato)."""
with self.conn.cursor() as cur:
cur.execute(
"SELECT COUNT(*) FROM rag_documents WHERE source_path = %s AND source_hash = %s",
(source_path, source_hash)
)
return cur.fetchone()[0] > 0
def ingest(self, source: str, tags: list[str] = None, force: bool = False) -> dict:
"""
Processa un documento e lo inserisce in PostgreSQL.
Ritorna statistiche sull'operazione.
"""
tags = tags or []
start_time = time.time()
# 1. Carica documento
doc = load_document(source)
print(f"Caricato: {source} ({len(doc.content)} chars, hash: {doc.source_hash[:8]})")
# 2. Controlla se già presente (incrementale update)
if not force and self.is_already_ingested(source, doc.source_hash):
print(f" Saltato: documento non modificato")
return {"skipped": True, "source": source}
# 3. Chunking
chunks = self.chunker.chunk(doc)
print(f" Chunking: {len(chunks)} chunk creati")
# 4. Elimina versione precedente (se esiste)
with self.conn.cursor() as cur:
cur.execute("DELETE FROM rag_documents WHERE source_path = %s", (source,))
# 5. Genera embeddings in batch
texts = [c.content for c in chunks]
embeddings = self.embedder.embed_batch(texts)
print(f" Embeddings generati: {len(embeddings)} vettori dim {len(embeddings[0])}")
# 6. Inserisci in PostgreSQL
rows = [
(
c.source_path,
c.source_type,
c.source_hash,
c.chunk_index,
len(chunks), # chunk_total
c.title,
c.content,
self.config.embedding_model,
embeddings[i],
json.dumps(c.metadata),
tags
)
for i, c in enumerate(chunks)
]
with self.conn.cursor() as cur:
execute_values(cur, """
INSERT INTO rag_documents
(source_path, source_type, source_hash, chunk_index, chunk_total,
title, content, embedding_model, embedding, metadata, tags)
VALUES %s
ON CONFLICT (source_path, chunk_index, source_hash) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
updated_at = NOW()
""", rows, template="(%s,%s,%s,%s,%s,%s,%s,%s,%s::vector,%s::jsonb,%s::text[])")
self.conn.commit()
elapsed = time.time() - start_time
stats = {
"source": source,
"chunks": len(chunks),
"embeddings": len(embeddings),
"elapsed_sec": round(elapsed, 2)
}
print(f" Completato in {elapsed:.1f}s - {stats}")
return stats
def ingest_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Ingesta tutti i documenti in una directory."""
from pathlib import Path
extensions = extensions or [".txt", ".md", ".pdf"]
results = []
for path in Path(directory).rglob("*"):
if path.suffix.lower() in extensions:
result = self.ingest(str(path))
results.append(result)
return results
取得: 適切なチャンクを見つける
検索/searcher.py
import psycopg2
from dataclasses import dataclass
from typing import Optional
@dataclass
class SearchResult:
id: int
source_path: str
source_type: str
chunk_index: int
title: Optional[str]
content: str
similarity: float
metadata: dict
class HybridSearcher:
"""
Combina vector search (semantica) con full-text search (keyword).
Reciprocal Rank Fusion per merging dei risultati.
"""
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.conn = psycopg2.connect(config.get_db_url())
def vector_search(self, query: str, top_k: int = 10,
source_type: Optional[str] = None,
tags: Optional[list[str]] = None) -> list[SearchResult]:
"""Ricerca semantica con filtri opzionali."""
query_embedding = self.embedder.embed_single(query)
threshold = 1 - self.config.similarity_threshold # converti a cosine distance
# Costruisci query dinamica con filtri opzionali
filters = ["embedding <=> %s::vector < %s"]
params = [query_embedding, threshold]
if source_type:
filters.append("source_type = %s")
params.append(source_type)
if tags:
filters.append("tags && %s::text[]") -- overlap: almeno un tag in comune
params.append(tags)
where_clause = " AND ".join(filters)
with self.conn.cursor() as cur:
cur.execute(f"""
SELECT
id, source_path, source_type, chunk_index, title, content,
1 - (embedding <=> %s::vector) AS similarity,
metadata
FROM rag_documents
WHERE {where_clause}
ORDER BY embedding <=> %s::vector
LIMIT %s
""", [query_embedding] + params + [query_embedding, top_k])
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(r[6], 4), metadata=r[7]
)
for r in rows
]
def fulltext_search(self, query: str, top_k: int = 10) -> list[SearchResult]:
"""Full-text search con ts_rank per ranking."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT
id, source_path, source_type, chunk_index, title, content,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS rank,
metadata
FROM rag_documents
WHERE to_tsvector('english', content) @@
plainto_tsquery('english', %s)
ORDER BY rank DESC
LIMIT %s
""", (query, query, top_k))
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(float(r[6]), 4), metadata=r[7]
)
for r in rows
]
def hybrid_search(self, query: str, top_k: int = 5,
vector_weight: float = 0.7) -> list[SearchResult]:
"""
Reciprocal Rank Fusion (RRF) per combinare vector e full-text search.
RRF Score = sum(1 / (k + rank)) per ogni lista di risultati.
"""
k_rrf = 60 # costante RRF standard
# Ottieni entrambi i risultati
vector_results = self.vector_search(query, top_k=top_k * 2)
fts_results = self.fulltext_search(query, top_k=top_k * 2)
# Calcola RRF scores
scores = {}
all_results = {}
for rank, result in enumerate(vector_results):
scores[result.id] = scores.get(result.id, 0) + vector_weight / (k_rrf + rank + 1)
all_results[result.id] = result
fts_weight = 1 - vector_weight
for rank, result in enumerate(fts_results):
scores[result.id] = scores.get(result.id, 0) + fts_weight / (k_rrf + rank + 1)
all_results[result.id] = result
# Ordina per RRF score e prendi top_k
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
final_results = [all_results[id] for id in sorted_ids[:top_k]]
# Aggiorna similarity con RRF score normalizzato
max_score = scores[sorted_ids[0]] if sorted_ids else 1
for result in final_results:
result.similarity = round(scores[result.id] / max_score, 4)
return final_results
生成: コンテキストから応答まで
生成/プロンプト.py
from string import Template
# System prompt che definisce il comportamento dell'AI
RAG_SYSTEM_PROMPT = """Sei un assistente AI preciso e utile. Rispondi alle domande
basandoti ESCLUSIVAMENTE sui documenti di contesto forniti.
Regole:
1. Usa SOLO le informazioni presenti nel contesto. Non inventare.
2. Se la risposta non e nel contesto, dillo chiaramente.
3. Cita le sorgenti usando [Fonte: nome_file, chunk X] dopo ogni affermazione.
4. Mantieni un tono professionale e conciso.
5. Struttura la risposta in modo chiaro con paragrafi o bullet points se appropriato.
"""
def build_rag_prompt(query: str, context_chunks: list, include_sources: bool = True) -> str:
"""
Costruisce il prompt per l'LLM con il contesto recuperato.
Args:
query: La domanda dell'utente
context_chunks: Lista di SearchResult
include_sources: Se includere le informazioni sulla sorgente
Returns:
Il prompt formattato per l'LLM
"""
if not context_chunks:
return f"Domanda: {query}\n\nNota: Non ho trovato documenti rilevanti nel knowledge base."
# Costruisci il contesto con numerazione e sorgente
context_parts = []
for i, chunk in enumerate(context_chunks, 1):
source_info = f"[Fonte: {chunk.source_path}, chunk {chunk.chunk_index}]" if include_sources else ""
context_parts.append(f"--- Documento {i} {source_info} ---\n{chunk.content}")
context_text = "\n\n".join(context_parts)
return f"""Contesto dai documenti:
{context_text}
---
Domanda dell'utente: {query}
Rispondi basandoti sul contesto fornito."""
生成/generator.py
from openai import OpenAI
from dataclasses import dataclass
from typing import Optional
import tiktoken
from .prompts import RAG_SYSTEM_PROMPT, build_rag_prompt
@dataclass
class RAGResponse:
answer: str
sources: list[dict]
model: str
total_tokens: int
prompt_tokens: int
completion_tokens: int
class RAGGenerator:
def __init__(self, config):
self.config = config
self.client = OpenAI(api_key=config.openai_api_key)
self.tokenizer = tiktoken.encoding_for_model("gpt-4o")
def count_tokens(self, text: str) -> int:
return len(self.tokenizer.encode(text))
def truncate_context(self, chunks: list, max_tokens: int) -> list:
"""
Tronca il contesto per non superare il limite di token.
Mantieni i chunk più rilevanti (gia ordinati per similarità).
"""
selected = []
used_tokens = 0
for chunk in chunks:
chunk_tokens = self.count_tokens(chunk.content)
if used_tokens + chunk_tokens > max_tokens:
break
selected.append(chunk)
used_tokens += chunk_tokens
return selected
def generate(self, query: str, context_chunks: list,
stream: bool = False) -> RAGResponse:
"""
Genera una risposta RAG.
Args:
query: La domanda dell'utente
context_chunks: Chunk recuperati da PostgreSQL
stream: Se True, usa streaming (non implementato qui per semplicità)
"""
# Tronca il contesto se necessario
max_context_tokens = self.config.max_context_tokens
truncated_chunks = self.truncate_context(context_chunks, max_context_tokens)
if len(truncated_chunks) < len(context_chunks):
print(f" Contesto troncato: {len(context_chunks)} -> {len(truncated_chunks)} chunk")
# Costruisci il prompt
user_prompt = build_rag_prompt(query, truncated_chunks)
# Chiama l'LLM
response = self.client.chat.completions.create(
model=self.config.chat_model,
messages=[
{"role": "system", "content": RAG_SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
temperature=self.config.temperature,
max_tokens=1500
)
answer = response.choices[0].message.content
usage = response.usage
# Prepara le sorgenti per la risposta
sources = [
{
"source": chunk.source_path,
"chunk_index": chunk.chunk_index,
"similarity": chunk.similarity,
"excerpt": chunk.content[:200] + "..."
}
for chunk in truncated_chunks
]
return RAGResponse(
answer=answer,
sources=sources,
model=self.config.chat_model,
total_tokens=usage.total_tokens,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens
)
完全な RAG システム
rag.py - メインクラス
from config import config, Config
from ingestion.pipeline import IngestionPipeline
from retrieval.searcher import HybridSearcher
from generation.generator import RAGGenerator
class EmbeddingService:
"""Wrapper per generazione embeddings OpenAI."""
def __init__(self, cfg: Config):
from openai import OpenAI
self.client = OpenAI(api_key=cfg.openai_api_key)
self.model = cfg.embedding_model
def embed_single(self, text: str) -> list[float]:
resp = self.client.embeddings.create(
input=[text.replace("\n", " ")],
model=self.model
)
return resp.data[0].embedding
def embed_batch(self, texts: list[str]) -> list[list[float]]:
cleaned = [t.replace("\n", " ").strip() for t in texts]
resp = self.client.embeddings.create(input=cleaned, model=self.model)
return [item.embedding for item in resp.data]
class RAGSystem:
"""
Sistema RAG completo: ingestion + retrieval + generation.
"""
def __init__(self, cfg: Config = None):
self.config = cfg or config
self.embedder = EmbeddingService(self.config)
self.ingestion = IngestionPipeline(self.config, self.embedder)
self.searcher = HybridSearcher(self.config, self.embedder)
self.generator = RAGGenerator(self.config)
def add_document(self, source: str, tags: list[str] = None) -> dict:
"""Aggiunge un documento al knowledge base."""
return self.ingestion.ingest(source, tags=tags)
def add_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Aggiunge tutti i documenti di una directory."""
return self.ingestion.ingest_directory(directory, extensions)
def ask(self, question: str, use_hybrid: bool = True,
source_type: str = None) -> dict:
"""
Pone una domanda al sistema RAG.
Returns:
dict con answer, sources, usage
"""
# 1. Retrieval
if use_hybrid:
chunks = self.searcher.hybrid_search(question, top_k=self.config.top_k)
else:
chunks = self.searcher.vector_search(
question, top_k=self.config.top_k, source_type=source_type
)
if not chunks:
return {
"answer": "Non ho trovato informazioni rilevanti per rispondere a questa domanda.",
"sources": [],
"retrieval": {"chunks_found": 0}
}
# 2. Generation
response = self.generator.generate(question, chunks)
return {
"answer": response.answer,
"sources": response.sources,
"retrieval": {
"chunks_found": len(chunks),
"top_similarity": chunks[0].similarity if chunks else 0
},
"usage": {
"model": response.model,
"total_tokens": response.total_tokens
}
}
main.py - システムの使用法
from rag import RAGSystem
# Inizializza il sistema
rag = RAGSystem()
# --- INGESTION ---
print("=== Aggiungendo documenti al knowledge base ===")
# Aggiungi singoli file
rag.add_document("docs/postgresql_guide.pdf", tags=["postgresql", "database"])
rag.add_document("docs/pgvector_tutorial.md", tags=["pgvector", "vector-search"])
rag.add_document("https://www.postgresql.org/docs/current/", tags=["official-docs"])
# Aggiungi una directory intera
stats = rag.add_directory("docs/", extensions=[".md", ".txt", ".pdf"])
print(f"Ingestati {len(stats)} documenti")
# --- QUERY ---
print("\n=== Interrogando il sistema ===")
questions = [
"Come si installa pgvector su PostgreSQL 16?",
"Qual e la differenza tra HNSW e IVFFlat?",
"Come si ottimizza la memoria per il vector search?",
]
for q in questions:
print(f"\nDomanda: {q}")
print("-" * 60)
result = rag.ask(q)
print(f"Risposta:\n{result['answer']}")
print(f"\nSorgenti utilizzate ({len(result['sources'])}):")
for src in result["sources"]:
print(f" - {src['source']} [similarità: {src['similarity']}]")
print(f"\nToken usati: {result['usage']['total_tokens']}")
ハイブリッド検索: PostgreSQL フルテキスト + ベクトル
PostgreSQL for RAG の大きな利点の 1 つは、検索を 1 つのクエリに結合できることです。 従来の全文検索を使用したセマンティクス (ベクトル)。これは特に次の場合に役立ちます。 正確な技術用語 (固有名、頭字語、ソフトウェア バージョン) を含むクエリ。 セマンティック検索だけでは、以下を完全に捕捉できない場合があります。
-- Hybrid search in SQL puro: vettore + full-text in una query
WITH vector_search AS (
SELECT id, content, source_path, chunk_index,
1 - (embedding <=> %s::vector) AS vector_score,
ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS vector_rank
FROM rag_documents
ORDER BY embedding <=> %s::vector
LIMIT 20
),
fts_search AS (
SELECT id, content, source_path, chunk_index,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS fts_score,
ROW_NUMBER() OVER (
ORDER BY ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) DESC
) AS fts_rank
FROM rag_documents
WHERE to_tsvector('english', content) @@ plainto_tsquery('english', %s)
LIMIT 20
),
-- Reciprocal Rank Fusion
rrf AS (
SELECT
COALESCE(v.id, f.id) AS id,
COALESCE(v.content, f.content) AS content,
COALESCE(v.source_path, f.source_path) AS source_path,
-- RRF score: 0.7 * vector_weight + 0.3 * fts_weight
COALESCE(0.7 / (60 + v.vector_rank), 0) +
COALESCE(0.3 / (60 + f.fts_rank), 0) AS rrf_score
FROM vector_search v
FULL OUTER JOIN fts_search f ON v.id = f.id
)
SELECT id, content, source_path, rrf_score
FROM rrf
ORDER BY rrf_score DESC
LIMIT 5;
RAGの品質評価
RAG システムが適切に機能しているかどうかをどのように測定しますか?主な指標は次のとおりです。
| メトリック | 測定内容 | ターゲット | 計算方法 |
|---|---|---|---|
| リコール@K | 適切なドキュメントは上位 K 件の結果から見つかります | > 0.70 | グラウンドトゥルースを備えたテストセット |
| プレシジョン@K | 見つかった結果は確かに関連性があります | > 0.60 | 手動注釈 |
| 忠実に答える | 応答は取得されたコンテキストによってサポートされます | > 0.80 | RAGAS フレームワーク |
| 回答の関連性 | 答えは尋ねられた質問に答えます | > 0.75 | RAGAS フレームワーク |
| P95 レイテンシー | 95 パーセンタイルでの応答時間 | < 3 秒 | 本番環境でのモニタリング |
# Valutazione con RAGAS
# pip install ragas
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_recall
from datasets import Dataset
# Prepara il dataset di test
test_data = {
"question": [
"Come si crea un indice HNSW in pgvector?",
"Qual e il limite di dimensioni per i vettori in pgvector?",
],
"answer": [
# Risposte generate dal tuo sistema RAG
rag.ask("Come si crea un indice HNSW in pgvector?")["answer"],
rag.ask("Qual e il limite di dimensioni per i vettori in pgvector?")["answer"],
],
"contexts": [
# I chunk recuperati per ciascuna domanda
[c["excerpt"] for c in rag.ask("...")["sources"]],
[c["excerpt"] for c in rag.ask("...")["sources"]],
],
"ground_truth": [
"CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64)",
"Il limite e 16000 dimensioni per vettori di tipo vector in pgvector 0.7+",
]
}
dataset = Dataset.from_dict(test_data)
results = evaluate(dataset, metrics=[faithfulness, answer_relevancy, context_recall])
print(results)
高度なチャンキング戦略
チャンク品質は、RAG 品質にとって最も重要な要素の 1 つです。戦略 チャンキングが適切に調整されていないと、最適な埋め込みモデルであってもパフォーマンスが低下する可能性があります。 特定の使用例に対する高度な戦略は次のとおりです。
セマンティックオーバーラップを伴うチャンク化
from langchain.text_splitter import RecursiveCharacterTextSplitter
import re
class SemanticChunker:
"""
Chunker che preserva la coerenza semantica dei paragrafi.
A differenza del semplice chunking per caratteri, questo
rispetta i confini di frase e paragrafo.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_by_sentences(self, text: str) -> list[str]:
"""Divide il testo in frasi usando regex."""
# Pattern per fine frase: ., !, ? seguiti da spazio e maiuscola
sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text)
return [s.strip() for s in sentences if s.strip()]
def create_chunks_with_context(self, text: str) -> list[str]:
"""
Crea chunk con context overlap:
ogni chunk include le ultime N parole del chunk precedente.
"""
sentences = self.split_by_sentences(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
# Se la frase corrente supera da sola il chunk_size, spezzala
if sentence_size > self.chunk_size:
if current_chunk:
chunks.append(" ".join(current_chunk))
# Mantieni overlap: ultime N parole
words = " ".join(current_chunk).split()
overlap_words = words[-30:] # ~150 caratteri overlap
current_chunk = [" ".join(overlap_words)]
current_size = len(" ".join(overlap_words))
# Spezza la frase lunga
splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap
)
for sub in splitter.split_text(sentence):
chunks.append(sub)
continue
# Aggiungi frase al chunk corrente
if current_size + sentence_size + 1 > self.chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
# Overlap: ultime 30 parole del chunk precedente
words = " ".join(current_chunk).split()
overlap_words = words[-30:]
current_chunk = [" ".join(overlap_words), sentence]
current_size = len(" ".join(current_chunk))
else:
current_chunk.append(sentence)
current_size += sentence_size + 1
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
ドキュメント構造によるチャンク化 (ヘッダーベース)
import re
from typing import Generator
def chunk_by_headers(markdown_text: str, max_chunk_size: int = 800) -> Generator:
"""
Chunking che rispetta la struttura gerarchica dei documenti Markdown.
Ogni sezione H2/H3 diventa un contesto separato, preservando il titolo
come header del chunk (fondamentale per la qualità dell'embedding).
"""
# Regex per trovare header Markdown (H1-H4)
header_pattern = re.compile(r'^(#{1,4})\s+(.+)






