ハイブリッド検索: BM25 とベクトル検索を組み合わせたプロダクション RAG 用
埋め込みによるセマンティック検索は、RAG システムで情報を取得する方法に革命をもたらしました。 しかし、それは運用環境で定期的に現れる根本的な制限を隠しています: ユーザーが検索した場合 「GPT-4 幻覚率ベンチマーク 2024 年第 3 四半期」、埋め込みモデルはドキュメントを意味的に検索します。 「言語パターンの幻覚」の概念に近いが、文書を復元できない可能性がある 特定のテキスト文字列を含むものそのもの。一方、キーワード検索では、正確に検索されます。 しかし、彼は「LLM 事実問題」が概念的に同一であることを知りません。
Il ハイブリッド検索 まさにこの緊張を解くために誕生しました。研究を組み合わせる スパース (BM25 およびバリアント) とデンス検索 (ベクトル検索) を組み合わせた、両方とも正確なシステムが得られます。 完全一致の場合は、意味論的な理解に堅牢です。最近の研究によると ハイブリッド システムにより検索品質が向上するということ 単一の方法と比較して 48% BEIR および MTEB ベンチマークでは、技術的なクエリ、固有名詞で特に大きな向上が見られます。 そして専門用語。
この記事は、BM25 からのハイブリッド検索アーキテクチャに関する技術的な詳細です。その仕組みについて説明します。 内部的には、融合手法(相互ランク融合、加重融合)、再ランキングへ クロスエンコーダーを使用し、Qdrant を使用した実用的な実装と NDCG/MRR メトリクスを使用した評価まで。 目標は、機能する検索パイプラインを構築および最適化するツールを提供することです。 ベンチマークだけでなく実稼働環境でも。
何を学ぶか
- セマンティックのみの検索の限界と BM25 が 2025 年になっても重要であり続ける理由
- BM25 アルゴリズム: 項周波数飽和、IDF 重み付け、長さ正規化
- ハイブリッド検索アーキテクチャ: スパースとデンスを並行して実行
- 逆順位融合 (RRF): パラメーター k の公式、実装、および調整
- 加重スコアの融合: 貢献度の正規化とバランス調整
- クロスエンコーダーの再ランキング: いつ使用するか、およびレイテンシーと精度を最適化する方法
- Qdrant スパース ベクトルとクエリ API を使用した実装
- 評価指標: ハイブリッド検索の NDCG@k、MRR、Precision@k
- キャッシュ、モニタリング、段階的な最適化を備えた本番パイプライン
セマンティックのみの研究の限界
密なベクトルを使用した意味検索は、テキストの潜在的な意味を捕捉するのに強力です。 しかし、実際の運用クエリで明らかになる構造的な脆弱性があります。 これらの制限を理解することは、ハイブリッド検索が必要な理由を理解するための第一歩です 本格的な RAG システムでは単なるオプションではありません。
主な問題は研究者が何と呼んでいるかということです 語彙の不一致: 埋め込みモデルは一般的なテキスト分布に基づいてトレーニングされており、常にキャプチャするとは限りません。 特定の技術用語、頭字語、製品名、ソフトウェア バージョン、または 識別コード。埋め込みモデルは、「MSMARCO-v2.1」がデータセットを参照していることを認識しません。 特定の、または「CVE-2024-4577」が PHP の重大な脆弱性であることを示します。 そのドメインで微調整されます。
セマンティック検索が失敗した場合
- バージョン番号を使用したクエリ: 「Python 3.12 asyncio.TaskGroup」と「Python 非同期パターン」の比較
- 一意の識別子: CVE ID、注文番号、税コード、ISBN
- 珍しい頭字語: ドメイン用語、会社の頭字語、規制コード
- 珍しい名前: 人名、中小企業、地理的位置
- 非常に短いクエリ: 1 ~ 2 個のトークンでは、埋め込みはあまり区別できません
- 最近の専門用語: 知識が遮断されたモデルは新しい用語を知りません
2番目の問題は、 スコア校正: 密ベクトルの類似性スコア (通常、範囲 [-1, 1] のコサイン類似度または無制限の内積) にはセマンティクスがありません 絶対的な。スコアが 0.85 の文書は、スコアが 0.82 の文書より必ずしも関連性が高いとは限りません。 異なるコンテキスト。これにより、異なるシステムのスコアを比較したり組み合わせたりすることが困難になります。 適切な正規化がなければ。
最後に、セマンティック検索には次のような問題があります。 セマンティックドリフト あいまいなクエリの場合: プログラミングのコンテキストにおける「Java」のようなクエリは、「Java Island」に関するドキュメントを取得する可能性があります。 ドキュメントのコンテキストが埋め込みモデルにとって十分に明確ではない場合、特に 非常に短い、または文脈から切り離されたテキストの塊。
BM25: 基本アルゴリズムの刷新
BM25(ベストマッチ25)は1990年代に開発され今も残るランキング機能で、2025年には キーワード調査に最も効果的な情報検索アルゴリズムの 1 つ。理解する 内部の仕組みは、正しく使用するためにも、その理由を理解するためにも必要です。 セマンティック検索を非常にうまく補完します。
BM25 は、次の 2 つの主要なメカニズムで TF-IDF を拡張します。 項周波数飽和 e 長さの正規化。ドキュメントをスコアリングするための完全な公式 D は、用語 {q1, ..., qn} を含むクエリ Q に関して次のとおりです。
# Formula BM25 (pseudocodice matematico)
# score(D, Q) = sommatoria per qi in Q di:
# IDF(qi) * (TF(qi, D) * (k1 + 1)) / (TF(qi, D) + k1 * (1 - b + b * |D| / avgdl))
#
# Dove:
# IDF(qi) = log((N - df_i + 0.5) / (df_i + 0.5) + 1)
# TF(qi, D) = frequenza del termine qi nel documento D
# |D| = lunghezza del documento D in termini
# avgdl = lunghezza media dei documenti nella collection
# N = numero totale di documenti
# df_i = numero di documenti che contengono qi
# k1 = parametro di saturazione TF (default: 1.2-2.0)
# b = parametro di length normalization (default: 0.75)
# Implementazione Python con rank_bm25
from rank_bm25 import BM25Okapi
import nltk
from nltk.tokenize import word_tokenize
class BM25Retriever:
def __init__(self, corpus: list[str], k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
# Tokenizzazione e lowercase
self.tokenized_corpus = [
word_tokenize(doc.lower()) for doc in corpus
]
self.bm25 = BM25Okapi(self.tokenized_corpus, k1=k1, b=b)
self.corpus = corpus
def retrieve(self, query: str, top_k: int = 20) -> list[dict]:
tokenized_query = word_tokenize(query.lower())
scores = self.bm25.get_scores(tokenized_query)
# Crea lista di (index, score) ordinata per score decrescente
ranked = sorted(
enumerate(scores),
key=lambda x: x[1],
reverse=True
)[:top_k]
return [
{"doc_id": idx, "text": self.corpus[idx], "score": score}
for idx, score in ranked
if score > 0 # Filtra documenti senza match
]
# Uso pratico
corpus = [
"BM25 is a ranking function used in information retrieval",
"Vector search uses dense embeddings for semantic similarity",
"Hybrid search combines BM25 and vector search for better recall",
"Python asyncio enables concurrent programming",
]
retriever = BM25Retriever(corpus, k1=1.5, b=0.75)
results = retriever.retrieve("BM25 hybrid search retrieval", top_k=3)
for r in results:
print(f"Score: {r['score']:.4f} | Text: {r['text'][:60]}...")
パラメータ k1 項周波数の飽和を制御します: 低い k1 (0.5)、 用語の出現回数が 1 回と 2 回の違いは、10 回と 100 回の違いとほぼ同じくらい重要です 出来事; k1 (2.0) が高いと、TF は高周波に対しても影響を与え続けます。 パラメータ b 長いドキュメントにどの程度のペナルティを与えるかを制御します: b=0 を無効にします 長さの正規化、b=1 は完全に正規化します。
BM25 の見落とされがちな側面は、そのスコアが 上位に制限されていない: 検索語が多数出現する関連性の高い文書のスコアは 10、50、または 100 になります。 コーパスによって異なります。これにより、コサイン類似度スコアとの直接的な互換性の問題が発生します。 [-1, 1] にある密ベクトルの数。ハイブリッド フュージョンでは、この矛盾に対処する必要があります。
ハイブリッド検索アーキテクチャ: それらを組み合わせる方法
ハイブリッド検索システムの基本アーキテクチャは、疎検索と密検索を並行して実行します。 個別のインデックス作成で結果をマージしてから、結果をユーザー (または RAG コンテキストの LLM) に返します。 融合が発生する可能性のあるアーキテクチャ上のポイントは 3 つあり、さまざまなトレードオフがあります。
- 早期融合 (取得前): ドキュメントはベクトルで表現されます インデックスを作成する前に、スパース機能とデンス機能を組み合わせたハイブリッド。例: SPLADE、ColBERT モード 端から端まで。インデックス作成の点ではコストが高くなりますが、より一貫性があります。
- 後期融合(回復後): 2 つのレトリバーはインデックスに対して独立して動作します 分離され、結果はランキング レベルでマージされます。これは最も一般的で柔軟なアプローチです。 コンポーネントを個別に更新できます。
- ステージの再ランキング: 別のモデル (クロスエンコーダー) が結果を並べ替えます。 後期融合により融合。レイテンシは追加されますが、precision@k は大幅に向上します。
# Architettura base hybrid retrieval con late fusion
from typing import Protocol
import asyncio
class Retriever(Protocol):
async def search(self, query: str, top_k: int) -> list[dict]:
"""Ritorna lista di {'doc_id': str, 'text': str, 'score': float}"""
...
class HybridRetriever:
def __init__(
self,
sparse_retriever: Retriever,
dense_retriever: Retriever,
fusion_method: str = "rrf", # "rrf" | "weighted" | "dbsf"
sparse_weight: float = 0.4,
dense_weight: float = 0.6,
top_k_per_retriever: int = 50, # Recupera più docs per la fusione
):
self.sparse = sparse_retriever
self.dense = dense_retriever
self.fusion_method = fusion_method
self.sparse_weight = sparse_weight
self.dense_weight = dense_weight
self.top_k_per_retriever = top_k_per_retriever
async def search(self, query: str, final_top_k: int = 10) -> list[dict]:
# Esecuzione parallela dei due retriever
sparse_results, dense_results = await asyncio.gather(
self.sparse.search(query, self.top_k_per_retriever),
self.dense.search(query, self.top_k_per_retriever)
)
if self.fusion_method == "rrf":
return self._rrf_fusion(sparse_results, dense_results, final_top_k)
elif self.fusion_method == "weighted":
return self._weighted_fusion(sparse_results, dense_results, final_top_k)
else:
raise ValueError(f"Unknown fusion method: {self.fusion_method}")
def _rrf_fusion(self, sparse: list[dict], dense: list[dict], k: int) -> list[dict]:
# Implementato nella sezione successiva
pass
def _weighted_fusion(self, sparse: list[dict], dense: list[dict], k: int) -> list[dict]:
# Implementato nella sezione successiva
pass
相互ランク融合 (RRF)
RRF は、そのシンプルさと堅牢性により、ハイブリッド検索で最も使用される融合アルゴリズムです。 そしてスコアスケールからの独立性。元々はコーマック、クラーク、ブッチャーによって提案されました。 2009 年に、各文書にその位置のみに基づいてスコアを割り当てます。 各レトリバーのランキングリストではスコアの絶対値を完全に無視しています。
リスト L1、L2、...、Lm に出現するドキュメント D をスコアリングするための RRF 式は次のとおりです。
RRF(D) = i=1..m の合計: 1 / (k + Rank_i(D))
ここで、k はリストの先頭にあるドキュメントの影響を弱める定数 (通常は 60) です。 D がリスト i に現れない場合、その寄与は 0 です。 パラメーター k=60 は経験的に決定されました。通常の値の範囲は 10 ~ 100 です。
# Implementazione completa RRF
from collections import defaultdict
def reciprocal_rank_fusion(
result_lists: list[list[dict]],
k: int = 60,
id_field: str = "doc_id"
) -> list[dict]:
"""
Fonde N liste di risultati usando Reciprocal Rank Fusion.
Args:
result_lists: Lista di liste, ognuna ordinata per relevance decrescente
k: Costante di smorzamento (default 60, range consigliato 10-100)
id_field: Campo usato come identificatore univoco del documento
Returns:
Lista fusa ordinata per RRF score decrescente
"""
rrf_scores = defaultdict(float)
doc_registry = {} # Mappa doc_id -> doc completo
for result_list in result_lists:
for rank, doc in enumerate(result_list, start=1):
doc_id = doc[id_field]
# Formula RRF: 1 / (k + rank)
rrf_scores[doc_id] += 1.0 / (k + rank)
# Salva il documento (usa l'ultimo trovato se compare più volte)
if doc_id not in doc_registry:
doc_registry[doc_id] = doc
# Ordina per RRF score decrescente
sorted_docs = sorted(
rrf_scores.items(),
key=lambda x: x[1],
reverse=True
)
# Costruisce la lista finale con score
return [
{**doc_registry[doc_id], "rrf_score": score}
for doc_id, score in sorted_docs
]
# Esempio di utilizzo
sparse_results = [
{"doc_id": "doc_A", "text": "BM25 text...", "score": 12.5},
{"doc_id": "doc_B", "text": "...", "score": 8.3},
{"doc_id": "doc_C", "text": "...", "score": 5.1},
]
dense_results = [
{"doc_id": "doc_C", "text": "...", "score": 0.92}, # Doc C primo in dense
{"doc_id": "doc_A", "text": "BM25 text...", "score": 0.88},
{"doc_id": "doc_D", "text": "...", "score": 0.85}, # Solo in dense
]
fused = reciprocal_rank_fusion(
[sparse_results, dense_results],
k=60
)
# RRF scores:
# doc_A: 1/(60+1) + 1/(60+2) = 0.01639 + 0.01613 = 0.03252
# doc_C: 1/(60+3) + 1/(60+1) = 0.01587 + 0.01639 = 0.03226
# doc_B: 1/(60+2) = 0.01613
# doc_D: 1/(60+3) = 0.01587
for doc in fused:
print(f"{doc['doc_id']}: RRF={doc['rrf_score']:.5f}")
RRF のパワーは次のとおりです。 外れ値をスコアする堅牢性:関係ないよ BM25 が最初の文書に 100 のスコアを割り当て、2 番目の文書に 50 のスコアを割り当て、類似性スコアは 0.99 と 0.97 であるとします。 重要なのは相対的な位置だけです。これは、次の場合に特に適しています。 2 匹のレトリバーの得点スケールはまったく異なります。
パラメータ k 高い位置にあるドキュメントにどれだけの重みが与えられるかに影響します 低い位置の人に比べて。 k=60 の場合、ランク 1 のドキュメントは 1/61 = 0.0164 になります。 ランク 60 のものは 1/120 = 0.0083 になります。最初のものの価値は最後のものの 2 倍未満です。 k=10 の場合、最初 (1/11 = 0.091) は 60 番目 (1/70 = 0.014) のほぼ 7 倍の価値があります。 ランキングに加えて「勝者総取り」。ほとんどの場合、k=60 が適切な開始点です。
正規化による加重スコアの融合
重み付けされたフュージョンでは、ランクではなく絶対スコアを組み合わせて、どの程度のスコアを制御できるようにします。 各レトリバーに与える体重。主な問題は、 スコアの正規化: BM25 とコサインの類似性は完全に異なるスケールに存在するため、直接組み合わせることができます。 (「bm25_score * 0.4 +dens_score * 0.6」) は正規化しないと意味がありません。
# Weighted fusion con normalizzazione Min-Max e normalizzazione Z-score
import numpy as np
from typing import Optional
def min_max_normalize(scores: list[float]) -> list[float]:
"""Normalizza scores in [0, 1] usando Min-Max scaling."""
if not scores:
return []
min_val = min(scores)
max_val = max(scores)
if max_val == min_val:
return [1.0] * len(scores) # Tutti uguali -> tutti 1.0
return [(s - min_val) / (max_val - min_val) for s in scores]
def dbsf_normalize(scores: list[float]) -> list[float]:
"""
Distribution-Based Score Fusion (DBSF) normalization.
Usa mean e std per normalizzazione più robusta agli outlier.
"""
if not scores:
return []
mean = np.mean(scores)
std = np.std(scores)
if std == 0:
return [0.5] * len(scores)
# Clamp tra 0 e 1 dopo la trasformazione
normalized = [(s - mean) / (3 * std) + 0.5 for s in scores]
return [max(0.0, min(1.0, n)) for n in normalized]
def weighted_fusion(
sparse_results: list[dict],
dense_results: list[dict],
sparse_weight: float = 0.3,
dense_weight: float = 0.7,
normalization: str = "minmax", # "minmax" | "dbsf"
top_k: Optional[int] = None,
id_field: str = "doc_id"
) -> list[dict]:
"""
Combina risultati sparse e dense con weighted fusion normalizzata.
"""
# Costruisce dizionari per lookup veloce
sparse_map = {d[id_field]: d for d in sparse_results}
dense_map = {d[id_field]: d for d in dense_results}
# Tutti i doc_ids unici
all_ids = set(sparse_map.keys()) | set(dense_map.keys())
# Normalizza gli score di ciascun retriever
normalize_fn = min_max_normalize if normalization == "minmax" else dbsf_normalize
if sparse_results:
sparse_scores_norm = dict(zip(
[d[id_field] for d in sparse_results],
normalize_fn([d["score"] for d in sparse_results])
))
else:
sparse_scores_norm = {}
if dense_results:
dense_scores_norm = dict(zip(
[d[id_field] for d in dense_results],
normalize_fn([d["score"] for d in dense_results])
))
else:
dense_scores_norm = {}
# Calcola score combinato
fused_docs = []
for doc_id in all_ids:
sparse_score = sparse_scores_norm.get(doc_id, 0.0)
dense_score = dense_scores_norm.get(doc_id, 0.0)
combined_score = sparse_weight * sparse_score + dense_weight * dense_score
# Prende il doc dal retriever che lo ha trovato
doc = sparse_map.get(doc_id) or dense_map.get(doc_id)
fused_docs.append({
**doc,
"combined_score": combined_score,
"sparse_score_norm": sparse_score,
"dense_score_norm": dense_score,
})
# Ordina per combined score
fused_docs.sort(key=lambda x: x["combined_score"], reverse=True)
return fused_docs[:top_k] if top_k else fused_docs
# Quando usare weighted vs RRF:
# - RRF: quando i retriever hanno scale molto diverse, come punto di partenza
# - Weighted + DBSF: quando vuoi controllare il bilanciamento sparse/dense
# basandoti su metriche di evaluation del tuo specifico dataset
# - Weighted + MinMax: più semplice, sensibile agli outlier di score
クロスエンコーダーによる再ランキング
融合 (RRF または重み付け) により、推定された関連性の順に並べられた候補のリストが生成されます。 ただし、BM25 とデンス レトリバーの両方が使用します。 バイエンコーダー: クエリとドキュメントが来ます 個別にエンコードされ、類似性が事後的に計算されます。これは効率的ですが漏れがあります クエリとドキュメント間のきめ細かい対話。
I クロスエンコーダ モデルを通じてクエリとドキュメントを一緒に処理します トランスフォーマーにより、セルフアテンションメカニズムが直接インタラクションをキャプチャできるようになります。 クエリトークンとドキュメントトークンの間。結果は関連性スコアです 精度は大幅に向上しますが、計算コストは数値に比例します。 評価するペア (クエリ、ドキュメント) の数。
# Cross-encoder re-ranking con sentence-transformers
from sentence_transformers import CrossEncoder
import time
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class CrossEncoderReranker:
"""
Re-ranker basato su cross-encoder per la fase di precision refinement.
Usa modello ms-marco per ranking di rilevanza query-documento.
"""
def __init__(
self,
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
max_length: int = 512,
batch_size: int = 32,
device: Optional[str] = None, # None = auto-detect GPU/CPU
):
self.model = CrossEncoder(
model_name,
max_length=max_length,
device=device
)
self.batch_size = batch_size
logger.info(f"CrossEncoder loaded: {model_name}")
def rerank(
self,
query: str,
documents: list[dict],
text_field: str = "text",
top_k: Optional[int] = None,
) -> list[dict]:
"""
Re-ordina i documenti usando il cross-encoder.
Args:
query: Query originale dell'utente
documents: Lista di documenti da ri-ordinare (output del hybrid retriever)
text_field: Campo del documento che contiene il testo
top_k: Ritorna solo i top_k più rilevanti
Returns:
Documenti ri-ordinati con campo "rerank_score" aggiunto
"""
if not documents:
return []
start_time = time.time()
# Crea coppie (query, document_text) per il cross-encoder
query_doc_pairs = [
(query, doc[text_field]) for doc in documents
]
# Inferenza in batch per efficienza
scores = self.model.predict(
query_doc_pairs,
batch_size=self.batch_size,
show_progress_bar=False,
)
elapsed = time.time() - start_time
logger.debug(
f"Cross-encoder scored {len(documents)} docs in {elapsed:.3f}s "
f"({elapsed/len(documents)*1000:.1f}ms/doc)"
)
# Aggiunge score e ri-ordina
reranked = [
{**doc, "rerank_score": float(score)}
for doc, score in zip(documents, scores)
]
reranked.sort(key=lambda x: x["rerank_score"], reverse=True)
return reranked[:top_k] if top_k else reranked
# Pipeline completa: Hybrid Retrieval + Cross-Encoder Reranking
class RAGRetrievalPipeline:
def __init__(
self,
hybrid_retriever: HybridRetriever,
reranker: CrossEncoderReranker,
retrieval_top_k: int = 50, # Recupera molti per il reranker
final_top_k: int = 5, # Top-K finale per il LLM context
):
self.hybrid_retriever = hybrid_retriever
self.reranker = reranker
self.retrieval_top_k = retrieval_top_k
self.final_top_k = final_top_k
async def retrieve_for_llm(self, query: str) -> list[dict]:
"""
Pipeline completa: hybrid retrieval -> cross-encoder reranking.
Ottimizzato per massimizzare precision@5 (i 5 docs passati al LLM).
"""
# Step 1: Hybrid retrieval con largo top_k per il reranker
candidates = await self.hybrid_retriever.search(
query, final_top_k=self.retrieval_top_k
)
if not candidates:
return []
# Step 2: Re-ranking con cross-encoder
# Il reranker opera su self.retrieval_top_k docs, tipicamente 20-50
reranked = self.reranker.rerank(
query=query,
documents=candidates,
top_k=self.final_top_k
)
return reranked
# Performance tipica (GPU T4):
# - Hybrid retrieval (BM25 + HNSW): ~10-20ms
# - Cross-encoder reranking (20 docs): ~80-120ms
# - Cross-encoder reranking (50 docs): ~200-350ms
# Totale pipeline: ~100-370ms a seconda del top_k del reranker
クロスエンコーダーの推奨モデル (2025)
- クロスエンコーダー/ms-marco-MiniLM-L-6-v2: 最適な速度と精度のバランス。 MSマルコのMAP 0.82。 GPU で最大 12 ミリ秒/ドキュメント。生産に最適です。
- クロスエンコーダー/ms-marco-MiniLM-L-12-v2: より正確ですが、最大 2 倍遅くなります。優先度の高いクエリの場合。
- BAAI/bge-reranker-v2-m3: 多言語に対応しており、イタリア語にも優れています。最大 8192 個のトークンをサポートします。イタリア語でRAGにおすすめ。
- Cohere再ランクAPI: マネージド ソリューション、最大 50 ミリ秒の遅延、優れた精度。クエリごとのコスト。迅速な概念実証に最適です。
- ジナ リランカー v2: オープンソース、8192 トークン コンテキスト、技術文書に優れています。
Qdrant スパース + デンス ベクトルを使用した実装
Qdrant は、ハイブリッド検索をネイティブにサポートします。 クエリAPI スパースベクトルとメカニズムを使って プリフェッチ。ソリューションとは異なります スパースとデンスに個別のシステムが必要な場合、Qdrant は両方を 1 つで処理します コレクションを作成し、アーキテクチャを大幅に簡素化します。
# Qdrant Hybrid Search: setup e query completa
from qdrant_client import QdrantClient
from qdrant_client import models
from fastembed import TextEmbedding, SparseTextEmbedding
import numpy as np
# Inizializzazione client e modelli di embedding
client = QdrantClient("localhost", port=6333)
# Modello dense: all-MiniLM-L6-v2 (384 dim, veloce)
dense_model = TextEmbedding("sentence-transformers/all-MiniLM-L6-v2")
# Modello sparse: BM25 tramite FastEmbed
sparse_model = SparseTextEmbedding("Qdrant/bm25")
COLLECTION_NAME = "hybrid_rag_collection"
def create_hybrid_collection():
"""Crea collection con supporto sparse + dense vectors."""
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config={
"dense": models.VectorParams(
size=384, # Dimensione all-MiniLM-L6-v2
distance=models.Distance.COSINE,
on_disk=False, # In-memory per latenza bassa
)
},
sparse_vectors_config={
"sparse": models.SparseVectorParams(
modifier=models.Modifier.IDF, # BM25-style IDF weighting
)
},
optimizers_config=models.OptimizersConfigDiff(
indexing_threshold=20000, # Inizia HNSW dopo 20k vettori
),
)
print(f"Collection '{COLLECTION_NAME}' creata con sparse + dense support")
def index_documents(documents: list[dict]):
"""
Indicizza documenti con vettori dense e sparse.
Args:
documents: Lista di {'id': str, 'text': str, 'metadata': dict}
"""
texts = [doc["text"] for doc in documents]
# Genera dense embeddings in batch
dense_embeddings = list(dense_model.embed(texts))
# Genera sparse embeddings (BM25) in batch
sparse_embeddings = list(sparse_model.embed(texts))
# Prepara i points per Qdrant
points = []
for i, doc in enumerate(documents):
sparse_emb = sparse_embeddings[i]
points.append(
models.PointStruct(
id=i, # Usa ID numerico o UUID
payload={
"text": doc["text"],
**doc.get("metadata", {})
},
vector={
"dense": dense_embeddings[i].tolist(),
"sparse": models.SparseVector(
indices=sparse_emb.indices.tolist(),
values=sparse_emb.values.tolist(),
)
}
)
)
# Upsert in batch
client.upsert(
collection_name=COLLECTION_NAME,
points=points,
wait=True
)
print(f"Indicizzati {len(documents)} documenti")
def hybrid_search_qdrant(
query: str,
top_k: int = 10,
prefetch_k: int = 50,
fusion: str = "rrf", # "rrf" | "dbsf"
) -> list[dict]:
"""
Esegue hybrid search con Qdrant Query API.
Usa il meccanismo di prefetch: recupera top_k*5 candidati da ogni
retriever, poi li fonde con RRF o DBSF.
"""
# Genera query embeddings
query_dense = list(dense_model.embed([query]))[0].tolist()
query_sparse_emb = list(sparse_model.embed([query]))[0]
query_sparse = models.SparseVector(
indices=query_sparse_emb.indices.tolist(),
values=query_sparse_emb.values.tolist(),
)
# Fusion method
fusion_model = (
models.Fusion.RRF if fusion == "rrf"
else models.Fusion.DBSF
)
# Query API con prefetch (hybrid search nativo Qdrant)
results = client.query_points(
collection_name=COLLECTION_NAME,
prefetch=[
# Prefetch BM25 sparse
models.Prefetch(
query=query_sparse,
using="sparse",
limit=prefetch_k,
),
# Prefetch dense semantic
models.Prefetch(
query=query_dense,
using="dense",
limit=prefetch_k,
),
],
query=models.FusionQuery(fusion=fusion_model),
limit=top_k,
with_payload=True,
)
return [
{
"doc_id": str(point.id),
"text": point.payload.get("text", ""),
"score": point.score,
"payload": point.payload
}
for point in results.points
]
# Esempio di utilizzo completo
if __name__ == "__main__":
# Crea la collection
create_hybrid_collection()
# Indicizza documenti di esempio
sample_docs = [
{"id": "1", "text": "Qdrant is a vector database optimized for ANN search with filtering", "metadata": {"category": "vectordb"}},
{"id": "2", "text": "BM25 algorithm for information retrieval with term frequency saturation", "metadata": {"category": "ir"}},
{"id": "3", "text": "Hybrid search combines sparse BM25 and dense vector embeddings", "metadata": {"category": "hybrid"}},
{"id": "4", "text": "Reciprocal Rank Fusion merges multiple ranked lists into a single ranking", "metadata": {"category": "fusion"}},
]
index_documents(sample_docs)
# Esegui query ibrida
query = "BM25 hybrid vector search fusion"
results = hybrid_search_qdrant(query, top_k=3, prefetch_k=20, fusion="rrf")
print(f"\nRisultati per: '{query}'")
for r in results:
print(f" Score: {r['score']:.4f} | {r['text'][:70]}...")
評価: NDCG、MRR、Precision@k
評価フレームワークなしでハイブリッド検索システムを構築し、測定なしで構築します。 パラメータ (RRF の k、疎/密重み、リランカーしきい値) を最適化する前に、 グラウンド トゥルースと定義されたメトリクスを備えたテスト データセットが必要です。最も重要な 3 つの指標 検索の場合、それらは NDCG、MRR、および Precision@k です。
# Framework di evaluation per hybrid retrieval
import numpy as np
from typing import Optional
def ndcg_at_k(
retrieved_ids: list[str],
relevant_ids: list[str],
k: int,
relevance_grades: Optional[dict] = None
) -> float:
"""
Normalized Discounted Cumulative Gain @k.
Misura la qualità del ranking considerando la posizione dei documenti rilevanti.
Valori in [0, 1], 1 = ranking perfetto.
Args:
retrieved_ids: Lista di doc_id nell'ordine recuperato
relevant_ids: Lista di doc_id rilevanti (ground truth)
k: Cut-off
relevance_grades: Dizionario {doc_id: grade} per rilevanza graduata (opzionale)
"""
relevant_set = set(relevant_ids)
top_k = retrieved_ids[:k]
# DCG: Discounted Cumulative Gain
dcg = 0.0
for i, doc_id in enumerate(top_k):
if relevance_grades:
grade = relevance_grades.get(doc_id, 0)
else:
grade = 1.0 if doc_id in relevant_set else 0.0
# Formula: rel_i / log2(i + 2) (i 0-indexed, log2(2) per i=0)
dcg += grade / np.log2(i + 2)
# IDCG: Ideal DCG (ranking perfetto)
if relevance_grades:
ideal_grades = sorted(
[relevance_grades.get(rid, 0) for rid in relevant_ids],
reverse=True
)[:k]
else:
ideal_grades = [1.0] * min(len(relevant_ids), k)
idcg = sum(
grade / np.log2(i + 2)
for i, grade in enumerate(ideal_grades)
)
return dcg / idcg if idcg > 0 else 0.0
def mrr(retrieved_results: list[list[str]], relevant_results: list[list[str]]) -> float:
"""
Mean Reciprocal Rank.
Media del reciproco del rank del primo risultato rilevante.
"""
reciprocal_ranks = []
for retrieved, relevant in zip(retrieved_results, relevant_results):
relevant_set = set(relevant)
rr = 0.0
for rank, doc_id in enumerate(retrieved, start=1):
if doc_id in relevant_set:
rr = 1.0 / rank
break
reciprocal_ranks.append(rr)
return np.mean(reciprocal_ranks)
def evaluate_retriever(
retriever_fn, # Funzione che prende query e ritorna lista di doc_id
test_queries: list[dict], # Lista di {'query': str, 'relevant_ids': list[str]}
k_values: list[int] = [1, 3, 5, 10]
) -> dict:
"""
Valuta un retriever su un test set con più metriche.
"""
all_retrieved = []
all_relevant = []
ndcg_scores = {k: [] for k in k_values}
for item in test_queries:
query = item["query"]
relevant_ids = item["relevant_ids"]
# Recupera risultati
results = retriever_fn(query, top_k=max(k_values))
retrieved_ids = [r["doc_id"] for r in results]
all_retrieved.append(retrieved_ids)
all_relevant.append(relevant_ids)
# Calcola NDCG per ogni k
for k in k_values:
ndcg = ndcg_at_k(retrieved_ids, relevant_ids, k)
ndcg_scores[k].append(ndcg)
# Aggrega metriche
metrics = {
"MRR": mrr(all_retrieved, all_relevant),
}
for k in k_values:
metrics[f"NDCG@{k}"] = np.mean(ndcg_scores[k])
return metrics
# Esempio: confronto sparse-only vs dense-only vs hybrid
def run_ablation_study(test_queries, sparse_retriever, dense_retriever, hybrid_retriever):
print("=== Ablation Study: Retrieval Methods ===\n")
for name, retriever in [
("BM25 only", sparse_retriever.retrieve),
("Dense only", dense_retriever.search),
("Hybrid RRF", hybrid_retriever.search),
]:
metrics = evaluate_retriever(retriever, test_queries)
print(f"{name}:")
for metric, value in metrics.items():
print(f" {metric}: {value:.4f}")
print()
生産パイプライン
本番環境のハイブリッド取得パイプラインは、それだけではなく、より多くの側面を処理する必要があります。 正しさ: レイテンシ (一般的な SLA: 500 ミリ秒未満で p95)、 キャッシング 頻繁なクエリの場合、 監視 長期にわたる品質の向上 優雅な劣化 コンポーネントの 1 つがダウンしたとき。
# Pipeline di produzione con caching, monitoring e fallback
import asyncio
import hashlib
import json
import time
from functools import lru_cache
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class ProductionHybridPipeline:
"""
Pipeline hybrid retrieval production-ready con:
- Cache query risultati (TTL configurabile)
- Metriche latenza per monitoring
- Fallback a dense-only se sparse non disponibile
- Circuit breaker per reranker
"""
def __init__(
self,
sparse_retriever,
dense_retriever,
reranker: Optional[CrossEncoderReranker] = None,
cache_ttl_seconds: int = 300,
retrieval_top_k: int = 50,
final_top_k: int = 5,
reranker_enabled: bool = True,
reranker_timeout_ms: float = 500,
):
self.sparse = sparse_retriever
self.dense = dense_retriever
self.reranker = reranker
self.cache: dict = {}
self.cache_ttl = cache_ttl_seconds
self.retrieval_top_k = retrieval_top_k
self.final_top_k = final_top_k
self.reranker_enabled = reranker_enabled
self.reranker_timeout_ms = reranker_timeout_ms
# Metriche (in produzione: Prometheus o simili)
self.metrics = {
"total_queries": 0,
"cache_hits": 0,
"reranker_timeouts": 0,
"sparse_failures": 0,
"avg_latency_ms": 0.0,
}
def _cache_key(self, query: str) -> str:
"""Hash deterministico della query per cache key."""
return hashlib.sha256(query.encode()).hexdigest()[:16]
def _get_cached(self, cache_key: str) -> Optional[list[dict]]:
"""Recupera dalla cache se non scaduta."""
if cache_key in self.cache:
cached_at, results = self.cache[cache_key]
if time.time() - cached_at < self.cache_ttl:
return results
del self.cache[cache_key]
return None
async def search(
self,
query: str,
use_cache: bool = True,
force_rerank: bool = False,
) -> dict:
"""
Esegue la pipeline completa con caching e monitoring.
Returns:
{
'results': list[dict],
'latency_ms': float,
'cache_hit': bool,
'reranked': bool,
'fallback_mode': bool,
}
"""
start = time.time()
self.metrics["total_queries"] += 1
cache_key = self._cache_key(query)
# Controlla cache
if use_cache:
cached = self._get_cached(cache_key)
if cached:
self.metrics["cache_hits"] += 1
return {
"results": cached,
"latency_ms": (time.time() - start) * 1000,
"cache_hit": True,
"reranked": False,
"fallback_mode": False,
}
fallback_mode = False
# Hybrid retrieval con fallback
try:
sparse_task = asyncio.create_task(
self.sparse.search(query, self.retrieval_top_k)
)
dense_task = asyncio.create_task(
self.dense.search(query, self.retrieval_top_k)
)
sparse_results, dense_results = await asyncio.gather(
sparse_task, dense_task
)
candidates = reciprocal_rank_fusion(
[sparse_results, dense_results], k=60
)[:self.retrieval_top_k]
except Exception as e:
logger.warning(f"Sparse retriever failed: {e}. Falling back to dense-only.")
self.metrics["sparse_failures"] += 1
fallback_mode = True
candidates = await self.dense.search(query, self.retrieval_top_k)
# Re-ranking opzionale con timeout
reranked = False
if self.reranker and (self.reranker_enabled or force_rerank) and candidates:
try:
rerank_start = time.time()
final_results = self.reranker.rerank(
query=query,
documents=candidates[:20], # Max 20 per contenere latenza
top_k=self.final_top_k,
)
rerank_elapsed = (time.time() - rerank_start) * 1000
if rerank_elapsed > self.reranker_timeout_ms:
logger.warning(f"Reranker slow: {rerank_elapsed:.0f}ms")
self.metrics["reranker_timeouts"] += 1
reranked = True
except Exception as e:
logger.error(f"Reranker failed: {e}. Using hybrid results.")
final_results = candidates[:self.final_top_k]
else:
final_results = candidates[:self.final_top_k]
# Aggiorna cache e metriche
self.cache[cache_key] = (time.time(), final_results)
latency = (time.time() - start) * 1000
self.metrics["avg_latency_ms"] = (
self.metrics["avg_latency_ms"] * 0.95 + latency * 0.05 # EMA
)
return {
"results": final_results,
"latency_ms": latency,
"cache_hit": False,
"reranked": reranked,
"fallback_mode": fallback_mode,
}
ベストプラクティスとアンチパターン
効果的なハイブリッド検索システムを構築するには、いくつかのよくある落とし穴を回避する必要があります これらは実稼働環境で出現し、基本的なチュートリアルでは明らかではありません。
ハイブリッド検索のベスト プラクティス
- RRF k=60 から開始します。 これは経験的に最も堅牢なデフォルトです。 NDCG メトリクスでベースラインを確立した後でのみ、他の値を試してください。
- レトリバーの場合は top_k >= 3x Final_top_k: 最終的なトップ5をご希望の場合は、 フュージョンに十分な材料を与えるために、各レトリバーから少なくとも 15 個を回収します。
- 一貫したトークン化: BM25 および埋め込みモデルは使用する必要があります 一貫性のために同じ前処理パイプライン (小文字、ストップワード、ステミング) を使用します。
- 最大 20 ~ 50 個のドキュメントのクロスエンコーダー: 50 名を超える候補者が獲得 追加される遅延コストに比べれば、精度の低下はわずかです。
- 疎と密を個別に評価します。 統合前は、 各コンポーネントのメトリクスを測定します。デンス リトリーバーがすでに 90% NDCG@5 にある場合、 ハイブリッドは特定のデータセットに価値を追加しない可能性があります。
- 正規化されたクエリレベルのキャッシュ: クエリの小文字とトリミング キャッシュヒット率を最大化するためにハッシュする前に。
避けるべきアンチパターン
- 正規化されていないスコアを組み合わせる: 「BM25_score + cosine_score」なし 正規化により、最大スケールのリトリーバー (ほぼ常に BM25) が優勢な結果が生成されます。
- すべての検索結果に対してリランカーを使用します。 200 個のドキュメントを再ランク付けする 2 ~ 3 秒の遅延が追加されます。リランカーは常に 20 ~ 50 人の候補者に制限してください。
- チャンクの品質を無視する: ハイブリッド取得ではチャンクが解決されない 形式が不十分(短すぎる、コンセプトの途中で切れている)。インデックス作成の品質 それが基本的な前提条件です。
- テストセットを使用せずに最適化します。 スパース/デンス重みまたは RRF の k を変更する テスト データセットで測定しないと、主観的な印象に過剰適合してしまいます。
- フォールバックを処理しない: BM25 インデックスがオフラインになった場合、システムは次のことを行う必要があります。 正常に高密度のみに低下し、クラッシュしません。
ハイブリッド検索だけでは不十分な場合
ハイブリッド検索は多くの問題を解決しますが、すべてではありません。実装後のメトリクスの場合 取得の量がまだ不十分な場合は、次の高度なパスを検討してください。
- HyDE (仮説的なドキュメント埋め込み): LLM は仮説的な答えを生成します これは取得者のクエリとして使用されます。意味の想起を改善する 抽象的なクエリまたは不適切な形式のクエリ。
- クエリの拡張: LLM を使用してクエリ バリアント (同義語、再定式化) を生成する すべての検索を実行し、その結果を RRF とマージします。
- 剣: 代わりに「スマートな」スパース ベクトルを生成する学習済みスパース モデル 純粋な用語の頻度。 BM25 よりも正確ですが、ML 推論が必要です。
- コルバート/コルパリ: クエリ内の各トークンを比較する遅延対話モデル 各ドキュメントトークン。クロスエンコーダよりも優れた精度と取得遅延 (再ランキングなし)。
- グラフRAG: キャプチャナレッジグラフによるベクトル検索の強化 エンティティ間の構造化された関係。マルチホップ推論が必要な質問に最適です。
結論
ハイブリッド取得は、今日の実稼働 RAG システムの標準戦略です 厳密な専門用語から漠然とした概念的な質問まで、異種クエリに取り組みます。 BM25 + デンスと RRF の組み合わせにより、すでに非常に堅牢なベースラインが提供されます。 クロスエンコーダの再ランキングにより、エンコーダで達成するのが難しいレベルの精度が得られます。 単一のアプローチ。
実装を成功させるための鍵は、操作の順序にあります。最初にビルドします。 ドメインの実際のグラウンド トゥルースを含むテスト セット。BM25 e 用に個別のベースラインを確立します。 密度が高い場合は、融合を実験してデルタを測定します。具体的なメトリクス (NDCG@5、MRR) のみを使用する場合 リランカーを追加することが、ユースケースで 200 ミリ秒のレイテンシを追加する価値があるかどうかがわかります。
次のステップ
- 続けて LangChain RAG パイプライン: 文書から応答まで この取得者を LLM を使用した完全なパイプラインに統合します。
- 法律 本番環境の RAG: 監視、評価、最適化 運用環境における完全な評価および監視フレームワークを実現します。
- 探検する 埋め込みとベクトル検索: BERT と文変換器 ドメインに最適な高密度モデルの選択について詳しくは、こちらをご覧ください。
- 考慮する pgvector と PostgreSQL AI ハイブリッド検索を既存の PostgreSQL データベースに直接実装したい場合。
リソースと参考資料
- Qdrant ハイブリッド検索ドキュメント - クエリ API とスパース ベクトル
- Cormack、Clarke、Buettcher (2009) - 「相互ランク融合はコンドルセおよび個別のランク学習方法よりも優れています」
- BEIR ベンチマーク - 異種検索ベンチマーク
- センテンストランスフォーマー/クロスエンコーダーのドキュメント
- MTEB (大規模テキスト埋め込みベンチマーク) - リーダーボード 2025







