本番環境の RAG: PostgreSQL を使用したスケーラブルなアーキテクチャ
ラップトップ上で動作する RAG プロトタイプを構築するのは比較的簡単です。持ち込んでください 実稼働環境では、数百人の同時ユーザーにサービスを提供し、1 秒未満の高い遅延を実現 制御された可用性とコスト、そしてまったく異なるエンジニアリングの課題。
このシリーズの最終記事では、実稼働 RAG アーキテクチャを構築します。 PostgreSQL: 接続プーリング PgBouncerを使用すると、 返信を読む ベクトル検索専用、 パーティショニング 10 億のベクトル データセットの場合、 キャッシング Redisによるアプリケーションレベルでの監視、PrometheusとGrafanaによる監視、 ダウンタイムゼロの更新のための Kubernetes 導入戦略。
哲学を守りながら 「Postgresを使用するだけ」 2026 年: ベクトルなし 別のデータベース、Pinecone、Qdrant はありません。 PostgreSQL と pgvector が適切に構成されており、 実際のケースの 90% で、コスト/パフォーマンスの点で専門の競合他社に勝ります。
シリーズ概要
| # | アイテム | 集中 |
|---|---|---|
| 1 | ベクター | インストール、オペレーター、インデックス作成 |
| 2 | 埋め込みの詳細 | モデル、距離、世代 |
| 3 | PostgreSQL を使用した RAG | エンドツーエンドの RAG パイプライン |
| 4 | 類似性検索 | アルゴリズムと最適化 |
| 5 | HNSW および IVFFlat | 高度なインデックス作成戦略 |
| 6 | あなたはここにいます - 本番環境の RAG | スケーラビリティとパフォーマンス |
何を学ぶか
- PostgreSQL 上の RAG エンタープライズのリファレンス アーキテクチャ
- PgBouncer による接続プーリング: その理由と構成方法
- ベクトル検索専用のリードレプリカ
- 100 万/10 億のベクトル データセットのパーティショニング
- 繰り返されるクエリのための Redis によるアプリケーションレベルのキャッシュ
- モニタリング: 重要なメトリクス、Grafana ダッシュボード、アラート
- Kubernetes へのデプロイメント: ステートフルセット、PVC、ローリング アップデート
- マルチテナント: さまざまな顧客からデータを分離します。
- 実稼働向けの PostgreSQL 構成のチューニング
- 自動バキュームとベクターインデックスのメンテナンス
本番環境における RAG のリファレンス アーキテクチャ
## Architettura RAG Produzione su PostgreSQL
┌─────────────────────────────────────┐
│ Load Balancer │
│ (nginx / AWS ALB) │
└──────────────┬──────────────────────┘
│
┌──────────────▼──────────────────────┐
│ RAG API Service │
│ (FastAPI, K8s Deployment) │
│ Replicas: 3-10 pod │
└──┬─────────────────────┬────────────┘
│ │
┌────────▼────────┐ ┌────────▼────────┐
│ Redis Cache │ │ Embedding API │
│ (query cache, │ │ (OpenAI / local │
│ emb cache) │ │ Sentence Tr.) │
└─────────────────┘ └─────────────────┘
│
┌─────────────▼───────────────────────────┐
│ PgBouncer │
│ Connection Pool │
│ (transaction mode, pool_size=100) │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ PostgreSQL Primary │
│ (write: ingestion, updates) │
│ shared_buffers=32GB, m=16 │
└──────────────┬──────────────────────────┘
│ Streaming Replication
┌──────────────▼──────────────────────────┐
│ PostgreSQL Read Replica │
│ (read: vector search, RAG queries) │
│ Dedicated for ANN queries │
│ ef_search tuned per use case │
└─────────────────────────────────────────┘
PgBouncer による接続プーリング
PostgreSQL は接続ごとにプロセス (フォーク) を作成します。 200 人の同時ユーザーが実行されている場合 クエリ ベクターでは、各接続はプロセスのためだけに約 5 ~ 10MB の RAM を使用します。 Pgバウンサー データベースへの実際の接続のプールを維持し、アプリケーション要求をキューに入れます。 PostgreSQL 接続の数が数百から数十に減ります。
PgBouncer のインストールと構成
# pgbouncer.ini - Configurazione per RAG workload
[databases]
# Database principale per ingestion (write)
ragdb_write = host=postgres-primary port=5432 dbname=ragdb user=raguser
# Read replica per vector search (read)
ragdb_read = host=postgres-replica port=5432 dbname=ragdb user=raguser
[pgbouncer]
# Pool mode: transaction e il più efficiente per query brevi
# session: la connessione e riservata per tutta la sessione
# transaction: la connessione e rilasciata dopo ogni transazione (OTTIMALE per RAG)
# statement: rilascio dopo ogni statement (non compatibile con transazioni multi-statement)
pool_mode = transaction
# Numero massimo di connessioni per database verso PostgreSQL
max_db_connections = 50
# Connessioni per pool (per user+database combination)
default_pool_size = 25
# Limite totale connessioni client verso PgBouncer
max_client_conn = 1000
# Timeout e resilienza
query_wait_timeout = 30 # secondi prima di errore "query too long"
server_idle_timeout = 600 # chiudi connessioni idle dopo 10 minuti
client_idle_timeout = 0 # no timeout per i client (gestito dall'app)
# Autenticazione
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
# Monitoring
stats_period = 60
logfile = /var/log/pgbouncer/pgbouncer.log
pidfile = /var/run/pgbouncer/pgbouncer.pid
listen_addr = 0.0.0.0
listen_port = 5432
Python で接続プーリングを使用して使用する
import psycopg2.pool
from contextlib import contextmanager
import threading
class DatabasePool:
"""
Thread-safe connection pool verso PgBouncer.
Usa connessioni separate per write (primary) e read (replica).
"""
def __init__(self, write_dsn: str, read_dsn: str,
min_conn: int = 2, max_conn: int = 20):
self._write_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=min_conn, maxconn=max_conn, dsn=write_dsn
)
self._read_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=min_conn, maxconn=max_conn, dsn=read_dsn
)
self._lock = threading.Lock()
@contextmanager
def get_write_conn(self):
"""Connessione per operazioni di scrittura (primary)."""
conn = self._write_pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._write_pool.putconn(conn)
@contextmanager
def get_read_conn(self):
"""Connessione per operazioni di lettura (read replica)."""
conn = self._read_pool.getconn()
try:
# Imposta ef_search ottimale per vector queries
with conn.cursor() as cur:
cur.execute("SET hnsw.ef_search = 60")
yield conn
finally:
conn.rollback() # rollback per rilasciare lock
self._read_pool.putconn(conn)
# Inizializzazione del pool (una volta all'avvio dell'applicazione)
db_pool = DatabasePool(
write_dsn="host=pgbouncer port=5432 dbname=ragdb_write user=raguser password=xxx",
read_dsn="host=pgbouncer port=5432 dbname=ragdb_read user=raguser password=xxx"
)
# Utilizzo nelle API:
async def search_endpoint(query: str):
with db_pool.get_read_conn() as conn:
results = semantic_search(conn, query)
return results
async def ingest_endpoint(document_path: str):
with db_pool.get_write_conn() as conn:
ingest_document(conn, document_path)
PgBouncer モニタリング
-- Connettiti a PgBouncer sulla porta di admin (default: 6432)
-- psql -h localhost -p 6432 -U pgbouncer pgbouncer
-- Statistiche pool in tempo reale
SHOW POOLS;
-- Output:
-- database | user | cl_active | cl_waiting | sv_active | sv_idle | maxwait
-- ragdb_read | raguser | 23 | 0 | 23 | 2 | 0
-- ragdb_write| raguser | 5 | 0 | 5 | 0 | 0
-- Statistiche aggregate per database
SHOW STATS;
-- total_query_count | total_query_time | avg_query | avg_wait
-- 1248532 | 1832743.2 | 1.46 | 0.02
-- Lista connessioni client attuali
SHOW CLIENTS;
-- Lista connessioni server attuali verso PostgreSQL
SHOW SERVERS;
-- Alert: se cl_waiting > 0 per più di 5 secondi, il pool e saturo
-- Soluzione: aumenta default_pool_size o max_db_connections
ベクトル検索用のリードレプリカ
ベクトル クエリ (ANN 検索) は CPU とメモリを大量に消費しますが、新しいデータは必要ありません。 それらを 1 つに分離します 専用の返信を読む 操作のためにプライマリを解放します 取り込み (INSERT) の処理を軽減し、両方のワークロードのレイテンシを低く保ちます。
ストリーミング レプリケーションを使用したリードレプリカのセットアップ
# Sul PRIMARY server
# postgresql.conf
wal_level = replica # abilita WAL per replication
max_wal_senders = 5 # max 5 replica connesse
wal_keep_size = '1GB' # mantieni almeno 1GB di WAL per le replica
# pg_hba.conf - permetti replica connection
host replication repuser replica-host/32 md5
# Crea utente di replication
CREATE USER repuser REPLICATION LOGIN PASSWORD 'strongpassword';
# ===========================
# Sul REPLICA server
# ===========================
# Clona il primary:
pg_basebackup -h primary-host -U repuser -D /var/lib/postgresql/16/main -P -Xs -R
# Il flag -R crea automaticamente standby.signal e i parametri di recovery
# postgresql.conf sulla replica - ottimizzato per vector search:
hot_standby = on # permette letture sulla replica
shared_buffers = '32GB' # abbondante per l'indice HNSW
effective_cache_size = '96GB' # hint al planner sulla memoria totale
max_parallel_workers_per_gather = 4 # query vector parallele
hnsw.ef_search = 60 # parametro globale per la replica
random_page_cost = 1.1 # SSD: costo basso per random I/O
effective_io_concurrency = 200 # per SSD NVMe
レプリケーションの監視
-- Sul primary: stato delle repliche connesse
SELECT
client_addr AS "Indirizzo replica",
application_name AS "Nome replica",
state AS "Stato",
sent_lsn AS "WAL inviato",
write_lsn AS "WAL scritto",
flush_lsn AS "WAL flushed",
replay_lsn AS "WAL replicato",
-- Lag in bytes:
sent_lsn - replay_lsn AS "Lag bytes",
-- Lag in tempo (approssimato):
now() - write_lag AS "Write lag",
now() - replay_lag AS "Replay lag"
FROM pg_stat_replication;
-- Sulla replica: verifica che sta replicando
SELECT now() - pg_last_xact_replay_timestamp() AS "Replica lag seconds";
-- Alert: se replica lag > 30 secondi, possibile problema di replication
-- Verifica pg_stat_wal_receiver sulla replica:
SELECT status, received_lsn, last_msg_receipt_time, latest_end_lsn
FROM pg_stat_wal_receiver;
巨大なデータセットのテーブル分割
何千万ものベクトルがあると、1 つのテーブルを管理するのが難しくなります。 の パーティショニング テーブルを物理的に別々の部分に分割します。 より高速なクエリ (パーティション プルーニング) ときめ細かいメンテナンスが可能になります。
データによるパーティショニング (時系列 RAG)
-- Partitioning per mese: ottimo per documenti con timestamp (news, email, log)
CREATE TABLE documents_partitioned (
id BIGSERIAL,
source_path TEXT NOT NULL,
source_type TEXT,
content TEXT NOT NULL,
embedding vector(1536),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (id, created_at) -- la colonna di partizione deve essere nella PK
) PARTITION BY RANGE (created_at);
-- Crea partizioni per gli ultimi 12 mesi
DO $
DECLARE
d DATE;
BEGIN
FOR i IN 0..11 LOOP
d := DATE_TRUNC('month', NOW()) - (i || ' months')::INTERVAL;
EXECUTE format(
'CREATE TABLE documents_%s PARTITION OF documents_partitioned
FOR VALUES FROM (%L) TO (%L)',
TO_CHAR(d, 'YYYY_MM'),
d,
d + INTERVAL '1 month'
);
-- Indice HNSW su ogni partizione
EXECUTE format(
'CREATE INDEX ON documents_%s USING hnsw (embedding vector_cosine_ops) WITH (m=16, ef_construction=64)',
TO_CHAR(d, 'YYYY_MM')
);
RAISE NOTICE 'Partizione creata: documents_%', TO_CHAR(d, 'YYYY_MM');
END LOOP;
END $;
-- Partition pruning automatico (PostgreSQL usa solo le partizioni rilevanti):
EXPLAIN (ANALYZE)
SELECT id, content, embedding <=> query_vec AS dist
FROM documents_partitioned
WHERE created_at >= '2026-01-01' AND created_at < '2026-02-01' -- solo partizione Gennaio 2026
ORDER BY embedding <=> query_vec
LIMIT 10;
-- Output: scansiona solo documents_2026_01, ignora le altre 11 partizioni
テナントごとのパーティショニング (マルチテナント)
-- Partitioning per tenant_id: isola i dati di ogni cliente
CREATE TABLE documents_multitenant (
id BIGSERIAL,
tenant_id TEXT NOT NULL,
content TEXT NOT NULL,
embedding vector(1536),
metadata JSONB DEFAULT '{}',
PRIMARY KEY (id, tenant_id)
) PARTITION BY LIST (tenant_id);
-- Crea partizione per ogni tenant
CREATE TABLE docs_tenant_acme PARTITION OF documents_multitenant FOR VALUES IN ('acme');
CREATE TABLE docs_tenant_globex PARTITION OF documents_multitenant FOR VALUES IN ('globex');
CREATE TABLE docs_tenant_initech PARTITION OF documents_multitenant FOR VALUES IN ('initech');
-- Default partition per nuovi tenant
CREATE TABLE docs_tenant_default PARTITION OF documents_multitenant DEFAULT;
-- Indice HNSW per ogni tenant
CREATE INDEX ON docs_tenant_acme USING hnsw (embedding vector_cosine_ops) WITH (m=16, ef_construction=64);
CREATE INDEX ON docs_tenant_globex USING hnsw (embedding vector_cosine_ops) WITH (m=16, ef_construction=64);
CREATE INDEX ON docs_tenant_initech USING hnsw (embedding vector_cosine_ops) WITH (m=16, ef_construction=64);
-- Query con tenant isolation automatica (partition pruning):
SELECT id, content, embedding <=> query_vec AS dist
FROM documents_multitenant
WHERE tenant_id = 'acme' -- cerca SOLO nella partizione di acme
ORDER BY embedding <=> query_vec
LIMIT 5;
-- Row Level Security (RLS) come secondo livello di sicurezza:
ALTER TABLE documents_multitenant ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON documents_multitenant
USING (tenant_id = current_setting('app.current_tenant'));
-- In Python, impostare il tenant context:
with conn.cursor() as cur:
cur.execute("SET app.current_tenant = %s", (tenant_id,))
# Tutte le query successive vedono solo i documenti del tenant corrente
Redis を使用したキャッシュ
多くの RAG クエリは、同じ質問、同じコンテキストを繰り返します。 2 レベルのキャッシュを実装する - 埋め込みキャッシュ (同じクエリの埋め込みの再計算を回避します) e 結果キャッシュ (同一のクエリに対してベクトル検索を再実行することは避けてください) - レイテンシーを 90% 削減し、OpenAI コストを 70% 削減できます。
import redis
import json
import hashlib
from typing import Optional
class RAGCache:
"""
Cache a due livelli:
1. Embedding cache: per i vettori delle query
2. Result cache: per i risultati completi RAG
"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.r = redis.from_url(redis_url, decode_responses=True)
# TTL per diversi tipi di cache
self.embedding_ttl = 3600 * 24 * 7 # 7 giorni (embedding stabili)
self.result_ttl = 3600 # 1 ora (risultati possono invecchiare)
def _cache_key(self, prefix: str, *args) -> str:
"""Genera una chiave cache deterministica."""
content = json.dumps(args, sort_keys=True)
hash_val = hashlib.sha256(content.encode()).hexdigest()[:16]
return f"rag:{prefix}:{hash_val}"
# === EMBEDDING CACHE ===
def get_embedding(self, text: str) -> Optional[list[float]]:
key = self._cache_key("emb", text)
cached = self.r.get(key)
if cached:
return json.loads(cached)
return None
def set_embedding(self, text: str, embedding: list[float]) -> None:
key = self._cache_key("emb", text)
self.r.setex(key, self.embedding_ttl, json.dumps(embedding))
# === RESULT CACHE ===
def get_rag_result(self, query: str, filters: dict = None) -> Optional[dict]:
key = self._cache_key("result", query, filters or {})
cached = self.r.get(key)
if cached:
result = json.loads(cached)
result["from_cache"] = True
return result
return None
def set_rag_result(self, query: str, result: dict, filters: dict = None) -> None:
key = self._cache_key("result", query, filters or {})
# Non cachare risposte con fonti vuote o errori
if result.get("sources") and result.get("answer"):
self.r.setex(key, self.result_ttl, json.dumps(result))
# === CACHE STATISTICS ===
def get_stats(self) -> dict:
info = self.r.info("stats")
total = info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0)
hit_rate = info["keyspace_hits"] / total if total > 0 else 0
return {
"hit_rate": round(hit_rate, 3),
"total_queries": total,
"memory_used": self.r.info("memory")["used_memory_human"],
"rag_keys": self.r.dbsize()
}
# Integrazione nel RAG System
cache = RAGCache("redis://redis:6379")
def cached_embed(text: str, client) -> list[float]:
"""Genera o recupera dal cache l'embedding di un testo."""
cached = cache.get_embedding(text)
if cached:
return cached
embedding = client.embeddings.create(
input=[text], model="text-embedding-3-small"
).data[0].embedding
cache.set_embedding(text, embedding)
return embedding
def cached_rag_query(rag_system, question: str, filters: dict = None) -> dict:
"""Esegui una query RAG con caching del risultato."""
# Controlla result cache
cached = cache.get_rag_result(question, filters)
if cached:
return cached
# Esegui la query completa
result = rag_system.ask(question, filters=filters)
# Salva in cache
cache.set_rag_result(question, result, filters)
return result
FastAPI: 非同期による RAG の提供
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel
import asyncio
from concurrent.futures import ThreadPoolExecutor
app = FastAPI(title="RAG API", version="1.0.0")
executor = ThreadPoolExecutor(max_workers=10)
class QueryRequest(BaseModel):
question: str
top_k: int = 5
source_type: str = None
use_hybrid: bool = True
tenant_id: str = None
class QueryResponse(BaseModel):
answer: str
sources: list[dict]
from_cache: bool = False
retrieval_ms: float
generation_ms: float
@app.post("/api/query", response_model=QueryResponse)
async def query_rag(request: QueryRequest):
"""
Query endpoint con async processing e caching.
"""
import time
# 1. Controlla cache
cached = cache.get_rag_result(request.question, {"source_type": request.source_type})
if cached:
return QueryResponse(
answer=cached["answer"],
sources=cached["sources"],
from_cache=True,
retrieval_ms=0,
generation_ms=0
)
# 2. Embedding della query (con cache)
t0 = time.time()
# Esegui in thread per non bloccare l'event loop
query_vec = await asyncio.get_event_loop().run_in_executor(
executor,
lambda: cached_embed(request.question, openai_client)
)
# 3. Vector search su PostgreSQL
async with db_pool.get_read_conn() as conn:
if request.tenant_id:
# Imposta tenant context per RLS
async with conn.cursor() as cur:
await cur.execute("SET app.current_tenant = %s", (request.tenant_id,))
chunks = await run_hybrid_search(conn, query_vec, request.top_k)
retrieval_ms = (time.time() - t0) * 1000
# 4. Generazione risposta
t1 = time.time()
response = await asyncio.get_event_loop().run_in_executor(
executor,
lambda: rag_generator.generate(request.question, chunks)
)
generation_ms = (time.time() - t1) * 1000
result = {
"answer": response.answer,
"sources": response.sources
}
cache.set_rag_result(request.question, result, {"source_type": request.source_type})
return QueryResponse(
answer=response.answer,
sources=response.sources,
from_cache=False,
retrieval_ms=round(retrieval_ms, 1),
generation_ms=round(generation_ms, 1)
)
@app.get("/api/health")
async def health():
return {"status": "ok", "cache": cache.get_stats()}
モニタリング: 重要な指標
Python を使用した Prometheus メトリクス
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Metriche applicative
rag_queries_total = Counter(
"rag_queries_total",
"Total number of RAG queries",
["status", "from_cache"]
)
rag_query_duration_seconds = Histogram(
"rag_query_duration_seconds",
"RAG query duration in seconds",
["phase"], # "retrieval", "generation", "total"
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
rag_retrieval_chunks_count = Histogram(
"rag_retrieval_chunks_count",
"Number of chunks retrieved per query",
buckets=[1, 2, 3, 5, 10, 20]
)
rag_retrieval_top_similarity = Gauge(
"rag_retrieval_top_similarity",
"Top similarity score of last retrieval"
)
# Metriche PostgreSQL (via pg_stat_activity)
db_active_connections = Gauge(
"db_active_connections",
"Active database connections",
["pool"] # "write", "read"
)
def track_rag_query(func):
"""Decorator per tracciare le metriche delle query RAG."""
def wrapper(*args, **kwargs):
import time
start = time.time()
try:
result = func(*args, **kwargs)
rag_queries_total.labels(
status="success",
from_cache=str(result.get("from_cache", False))
).inc()
return result
except Exception as e:
rag_queries_total.labels(status="error", from_cache="false").inc()
raise
finally:
rag_query_duration_seconds.labels("total").observe(time.time() - start)
return wrapper
# Avvia server Prometheus (porta 9090)
start_http_server(9090)
Grafana ダッシュボード: PostgreSQL メトリクスの SQL クエリ
-- Query per Grafana dashboard: metriche PostgreSQL vettoriali
-- 1. Latenza query per percentile (ultimi 5 minuti)
SELECT
percentile_cont(0.5) WITHIN GROUP (ORDER BY total_time) AS p50_ms,
percentile_cont(0.95) WITHIN GROUP (ORDER BY total_time) AS p95_ms,
percentile_cont(0.99) WITHIN GROUP (ORDER BY total_time) AS p99_ms,
COUNT(*) AS n_queries
FROM pg_stat_statements
WHERE query LIKE '%embedding%<=%>%' -- query che usano vector search
AND calls > 0;
-- 2. Cache hit ratio degli indici vettoriali
SELECT
indexrelname AS index_name,
idx_blks_read AS disk_reads,
idx_blks_hit AS cache_hits,
ROUND(idx_blks_hit::numeric / NULLIF(idx_blks_read + idx_blks_hit, 0) * 100, 2)
AS cache_hit_pct
FROM pg_statio_user_indexes
WHERE indexrelname LIKE '%hnsw%' OR indexrelname LIKE '%ivfflat%';
-- 3. Connessioni attive per stato
SELECT
state,
COUNT(*) AS connections,
MAX(now() - state_change) AS max_wait_time
FROM pg_stat_activity
WHERE datname = 'ragdb'
GROUP BY state;
-- 4. Tuple inserite/lette per secondo (indicatore di carico)
SELECT
relname,
n_tup_ins + n_tup_upd + n_tup_del AS writes_per_snapshot,
seq_tup_read + idx_tup_fetch AS reads_per_snapshot
FROM pg_stat_user_tables
WHERE relname LIKE '%document%';
RAG 本番用の PostgreSQL 構成
デフォルトの PostgreSQL 構成は、ワークロード ベクトルに対して最適化されていません。ここにあります
変更する重要なパラメータ postgresql.conf 本番環境の RAG システムの場合
32GB 以上の RAM を搭載:
# postgresql.conf - Configurazione produzione per RAG su PostgreSQL 16
# Sistema: 64GB RAM, 16 CPU, SSD NVMe
# === MEMORIA ===
shared_buffers = '16GB' # 25% della RAM totale
effective_cache_size = '48GB' # 75% della RAM (hint al planner, non memoria allocata)
work_mem = '64MB' # per sort/hash nelle query (per connessione!)
maintenance_work_mem = '2GB' # per CREATE INDEX HNSW, VACUUM, pg_dump
max_worker_processes = 16
max_parallel_workers = 8
max_parallel_workers_per_gather = 4
# === STORAGE / I/O ===
random_page_cost = 1.1 # SSD: abbassa da 4.0 a 1.1 per favorire index scans
effective_io_concurrency = 200 # NVMe: può gestire molte richieste parallele
checkpoint_completion_target = 0.9 # distribuisce i checkpoint nel tempo
wal_buffers = '64MB' # default -1 (auto) è di solito sufficiente
# === AUTOVACUUM (CRITICO per tabelle vector con molti INSERT/DELETE) ===
autovacuum_vacuum_cost_delay = '2ms' # più aggressivo del default 20ms
autovacuum_vacuum_scale_factor = 0.05 # trigger a 5% di righe morte (default 20%)
autovacuum_analyze_scale_factor = 0.02 # trigger analyze a 2% di righe nuove
autovacuum_vacuum_cost_limit = 400 # più operazioni per autovacuum worker
# === LOGGING ===
log_min_duration_statement = 1000 # logga query > 1 secondo
log_lock_waits = on
track_io_timing = on # necessario per EXPLAIN (BUFFERS) accurato
# === ESTENSIONI ===
shared_preload_libraries = 'pg_stat_statements,vector'
# === pgvector SPECIFIC ===
# hnsw.ef_search: impostalo a livello di sessione nel codice applicativo
# Non impostarlo globalmente qui: ogni use case ha il suo valore ottimale
ベクトルインデックスの自動バキュームとメンテナンス
埋め込みテーブルでは、RAG パイプライン中に頻繁に UPDATE と DELETE が発生します。 (ドキュメントの更新、再チャンク化、古いドキュメントの削除)。自動バキュームなし 正しく設定されていると、「テーブルの肥大化」によりベクターのパフォーマンスが低下します。
-- Verifica bloat della tabella documenti
SELECT
schemaname,
tablename,
n_live_tup AS righe_vive,
n_dead_tup AS righe_morte,
ROUND(n_dead_tup::numeric / NULLIF(n_live_tup + n_dead_tup, 0) * 100, 2) AS pct_morte,
last_vacuum,
last_autovacuum,
last_analyze
FROM pg_stat_user_tables
WHERE tablename LIKE '%document%';
-- Allarme: pct_morte > 5% significa che serve un VACUUM manuale
-- VACUUM manuale per recuperare spazio (non blocca letture):
VACUUM (VERBOSE, ANALYZE) documents;
-- VACUUM FULL per recuperare spazio fisico (BLOCCA scritture - fare in finestra manutenzione):
VACUUM FULL documents;
-- Imposta autovacuum aggressivo per la tabella documenti
ALTER TABLE documents SET (
autovacuum_vacuum_scale_factor = 0.02, -- 2% righe morte = trigger vacuum
autovacuum_analyze_scale_factor = 0.01, -- 1% nuove righe = trigger analyze
autovacuum_vacuum_cost_delay = '2ms'
);
-- Verifica stato degli indici HNSW dopo molti INSERT/DELETE
-- L'indice HNSW non si degrada con insert, ma lo spazio non viene recuperato dopo DELETE
SELECT
indexname,
pg_size_pretty(pg_relation_size(indexrelid)) AS index_size,
pg_size_pretty(pg_total_relation_size(indexrelid)) AS total_size
FROM pg_stat_user_indexes
WHERE tablename = 'documents'
ORDER BY pg_relation_size(indexrelid) DESC;
-- Ricostruzione indice senza downtime (per recuperare spazio dopo molti DELETE):
REINDEX INDEX CONCURRENTLY documents_hnsw_idx;
Kubernetes へのデプロイメント
PVC を使用した PostgreSQL の StatefulSet
## kubernetes/postgres-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres-vector
namespace: rag-production
spec:
serviceName: postgres-vector
replicas: 2 # primary + 1 replica
selector:
matchLabels:
app: postgres-vector
template:
metadata:
labels:
app: postgres-vector
spec:
containers:
- name: postgres
image: pgvector/pgvector:pg16
env:
- name: POSTGRES_DB
value: ragdb
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: postgres-secret
key: username
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: postgres-secret
key: password
- name: POSTGRES_SHARED_BUFFERS
value: "8GB"
- name: POSTGRES_MAINTENANCE_WORK_MEM
value: "2GB"
resources:
requests:
memory: "32Gi"
cpu: "8"
limits:
memory: "64Gi"
cpu: "16"
volumeMounts:
- name: postgres-data
mountPath: /var/lib/postgresql/data
- name: postgres-config
mountPath: /etc/postgresql/custom.conf
subPath: custom.conf
volumeClaimTemplates:
- metadata:
name: postgres-data
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: fast-ssd # NVMe SSD per performance ottimale
resources:
requests:
storage: 500Gi # 500GB per indici HNSW grandi
ローリング アップデートによる RAG API の展開
## kubernetes/rag-api-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rag-api
namespace: rag-production
spec:
replicas: 5 # 5 pod in parallelo
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2 # crea 2 pod nuovi prima di eliminare quelli vecchi
maxUnavailable: 0 # zero downtime update
selector:
matchLabels:
app: rag-api
template:
spec:
containers:
- name: rag-api
image: your-registry/rag-api:latest
ports:
- containerPort: 8000
env:
- name: DB_WRITE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: write-url
- name: DB_READ_URL
valueFrom:
secretKeyRef:
name: db-secret
key: read-url
- name: REDIS_URL
value: "redis://redis-service:6379"
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: openai-secret
key: api-key
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
livenessProbe:
httpGet:
path: /api/health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /api/health
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
コストと専用ソリューションとの比較
| 解決 | 100 万キャリア/月 | 1,000 万クエリ/月 | レイテンシ p95 | 注意事項 |
|---|---|---|---|---|
| PostgreSQL + pgvector (セルフホスト型) | 200~400 ドル (EC2/GKE) | 含まれています | 20~80ミリ秒 | 固定費、トータルコントロール |
| 松ぼっくり(管理済み) | 70~700ドル | 0.04 ~ 0.08 ドル/1000 クエリ | 10~40ミリ秒 | 変動費、管理性の低下 |
| 超塩基ベクトル | 25~200ドル | 含まれています | 30~100ミリ秒 | マネージド PostgreSQL + pgvector |
| Qdrant (自己ホスト型) | 150~300ドル | 含まれています | 5~20ミリ秒 | より高速かつ追加のインフラストラクチャ |
| pgvector (ベンチマーク 2025) | 松ぼっくりより最大 28 倍高速 | 16 分の 1 のコストで | 直接比較 | ベンチマーク.vector.dev |
ベクトル検索に PostgreSQL を使用すべきではない場合
- 何十億もの通信事業者: 500M-1B を超えるベクトルでは、ネイティブ ベクトル量子化のおかげで、Qdrant または Weaviate のメモリ効率が向上します。
- ミリ秒未満のレイテンシー: 5 ミリ秒未満の p99 が必要な場合は、特殊なインメモリ ベクトル データベース (Qdrant クラウド、Pinecone サーバーレス) がより適切です。
- DBA のいないチーム: PostgreSQL のスキルがない場合は、Pinecone のようなマネージド サービスを使用すると、運用の複雑さが軽減されます。
- マルチモーダル検索: 埋め込みサイズが異なる画像 + テキスト + オーディオには、PostgreSQL がネイティブに処理できないアーキテクチャが必要です。
RAG のバックアップと災害復旧ナレッジベース
RAG ナレッジ ベースには、元のドキュメントとそのベクトル埋め込みの両方が含まれています。 バックアップには両方を含める必要がありますが、最適な戦略はデータセットのサイズによって異なります。 そして更新の頻度:
# Strategia di backup per knowledge base RAG su PostgreSQL
# 1. Backup logico completo (pg_dump):
# Adatto per database fino a ~50GB. Include tutti i dati, indici, schema.
pg_dump -h postgres-primary \
-U raguser \
-d ragdb \
--format=custom \ # formato binario compresso
--compress=9 \ # massima compressione
--no-privileges \ # escludi permessi utente
--file=ragdb_backup_$(date +%Y%m%d_%H%M%S).dump
# Restore:
pg_restore -h postgres-new \
-U raguser \
-d ragdb \
--jobs=8 \ # restore parallelo (usa 8 CPU)
ragdb_backup.dump
# 2. Backup fisico (pg_basebackup) per DB grandi:
# Più veloce ma richiede più spazio. Usa WAL per point-in-time recovery.
pg_basebackup -h postgres-primary \
-U repuser \
-D /backup/ragdb_base_$(date +%Y%m%d) \
--format=tar \
--compress=9 \
--wal-method=stream \ # include WAL durante il backup
--checkpoint=fast
# 3. Solo embedding (se i documenti originali sono altrove):
# Esporta solo la tabella dei documenti con embedding
psql -h postgres-primary -U raguser ragdb -c \
"COPY (SELECT id, source_path, chunk_index, content, embedding::text, metadata, ingested_at
FROM rag_documents) TO STDOUT CSV HEADER" \
| gzip > rag_embeddings_$(date +%Y%m%d).csv.gz
# Stima dimensione backup:
# 1M vettori 1536-dim in float32 = 6GB solo embedding
# + contenuto testo: ~0.5GB per 1M chunk da 800 char
# Totale atteso: ~7GB per 1M documenti chunked
警告: 何か問題が発生したとき
-- Alert queries per Prometheus/Grafana alerting
-- Esegui ogni 60 secondi come Prometheus exporter
-- 1. Alert: latenza p99 vettori > 500ms (ultime 5 minuti)
SELECT
CASE
WHEN percentile_cont(0.99) WITHIN GROUP (ORDER BY total_time) > 500
THEN 'ALERT'
ELSE 'OK'
END AS status,
percentile_cont(0.99) WITHIN GROUP (ORDER BY total_time) AS p99_ms
FROM pg_stat_statements
WHERE query LIKE '%embedding%<=%>%'
AND calls > 0;
-- 2. Alert: replica lag > 30 secondi
SELECT
CASE
WHEN EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) > 30
THEN 'ALERT'
ELSE 'OK'
END AS replica_status,
EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) AS lag_seconds;
-- 3. Alert: troppi dead tuples (bloat > 10%)
SELECT
tablename,
CASE
WHEN n_dead_tup::float / NULLIF(n_live_tup + n_dead_tup, 0) > 0.1
THEN 'ALERT'
ELSE 'OK'
END AS bloat_status,
ROUND(n_dead_tup::numeric / NULLIF(n_live_tup + n_dead_tup, 0) * 100, 2) AS dead_pct
FROM pg_stat_user_tables
WHERE tablename LIKE '%document%';
-- 4. Alert: cache hit ratio indice vettoriale < 90%
SELECT
indexrelname,
CASE
WHEN idx_blks_hit::float / NULLIF(idx_blks_read + idx_blks_hit, 0) < 0.9
THEN 'ALERT'
ELSE 'OK'
END AS cache_status,
ROUND(idx_blks_hit::numeric / NULLIF(idx_blks_read + idx_blks_hit, 0) * 100, 2) AS hit_pct
FROM pg_statio_user_indexes
WHERE indexrelname LIKE '%hnsw%';
-- 5. Alert: connessioni vicine al limite
SELECT
CASE
WHEN COUNT(*) > (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') * 0.85
THEN 'ALERT'
ELSE 'OK'
END AS conn_status,
COUNT(*) AS active_connections
FROM pg_stat_activity;
生産チェックリスト
-- CHECKLIST PRE-DEPLOY per RAG su PostgreSQL
-- 1. Verifica estensioni installate
SELECT extname, extversion FROM pg_extension WHERE extname IN ('vector', 'pg_stat_statements');
-- 2. Verifica indici vettoriali presenti e dimensioni
SELECT
tablename, indexname,
pg_size_pretty(pg_relation_size(indexrelid)) AS size
FROM pg_stat_user_indexes
WHERE indexname LIKE '%hnsw%' OR indexname LIKE '%ivfflat%';
-- 3. Verifica configurazione memoria (per prod con 32GB RAM)
SHOW shared_buffers; -- deve essere >= 8GB
SHOW work_mem; -- deve essere >= 16MB
SHOW maintenance_work_mem; -- deve essere >= 512MB
-- 4. Verifica autovacuum attivo sulla tabella documenti
SELECT autovacuum_enabled, autovacuum_vacuum_scale_factor
FROM pg_settings
WHERE name = 'autovacuum';
-- 5. Test connessione PgBouncer
-- psql -h pgbouncer-host -p 5432 -U raguser ragdb_read -c "SELECT 1"
-- 6. Test Redis cache
-- redis-cli -h redis-host PING -- deve rispondere PONG
-- 7. Benchmark prestazioni prima del go-live
-- target: p95 < 100ms per vector search, p99 < 500ms
-- 8. Verifica replica lag
SELECT now() - pg_last_xact_replay_timestamp() AS lag;
-- target: lag < 5 secondi in operazioni normali
アーキテクチャの比較: PostgreSQL と専用ベクター DB
| 待ってます | PostgreSQL + pgvector | 松ぼっくり / クドラント / ウィービエイト |
|---|---|---|
| インフラストラクチャの複雑さ | 低: システムが 1 つだけ | 高: 追加のシステムを管理する必要がある |
| 1,000 万クエリあたりのコスト | $200-400/月 (固定サーバー) | $400-800/月 (クエリごとに異なります) |
| p95 レイテンシ (100 万キャリア) | 20 ~ 80 ミリ秒 (HNSW に最適化) | 5 ~ 40 ミリ秒 (多くの場合メモリ内) |
| 最大のスケーラビリティ | ~500M-1B 実用キャリア | 数十億の通信事業者 |
| 統合された全文検索 | はい (ts_tsvector、GIN) | いいえ (別途 Elasticsearch が必要) |
| ACIDトランザクション | はい(ネイティブ) | 限定的または不在 |
| チームのスキル要件 | PostgreSQL DBA | クライアントAPIのみ |
シリーズ概要: PostgreSQL AI
私たちは、PostgreSQL を基盤として使用するために、テクノロジー スタック全体を協力して検討しました。 AI アプリケーション:
- pgvector: セットアップ、ベクトル演算子、最初の HNSW インデックス
- 埋め込み: モデル (OpenAI、Sentence Transformers)、距離、チャンキング
- ラグ: 完全な取り込み、取得、生成パイプライン
- 類似性検索: HNSW と IVFFlat、ANN アルゴリズム、MMR
- 高度なインデックス作成: 最適なパラメータ、モニタリング、ダウンタイムなしの再構築
- 生産: PgBouncer、リードレプリカ、パーティショニング、K8、モニタリング
中心的なメッセージは 2026 年のメッセージのままです。 「Postgresを使用するだけ」。あなたがそうであれば すでに PostgreSQL を使用している (そして大多数のアプリケーションが PostgreSQL を使用している) 場合は、すでにすべての機能が備わっています。 洗練され、スケーラブルでコスト効率の高い RAG システムを構築するために必要です。追加しないでください ベクトル数が 500M を超えるか、 極端なレイテンシ要件。
リソースとドキュメント
- pgvector GitHub: github.com/pgvector/pgvector
- MTEBリーダーボード:huggingface.co/spaces/mteb/leaderboard
- RAGAS フレームワーク: github.com/explodinggradients/ragas
- ANN ベンチマーク: ann-benchmarks.com
- PgBouncer ドキュメント: pgbouncer.org/config.html
- 関連する AI エンジニアリング シリーズ: RAG エンタープライズ アーキテクチャ
- 関連データ&AIビジネスシリーズ: AI用データインフラストラクチャ







