はじめに:自律的な研究体制の構築
AI エージェントに関するシリーズの 13 回目の記事では、理論から実践へと移行します。 完全なシステム: 自律的研究アシスタント 3人の専門エージェントで構成されています 構造化され検証された研究レポートを作成するために協力する人々。このケーススタディはそれらすべてを統合しています これまでの 12 回の記事で検討した概念: マルチエージェント オーケストレーション、共有メモリ、 高度なツールの呼び出し、テスト、セキュリティ、FinOps、および展開。
私たちが直面している問題は具体的であり、企業の世界に広く蔓延しています。アナリストは情報を収集する必要があります。 ウェブ上の複数のソースから情報を収集し、その信頼性を検証し、関連する洞察を抽出し、 構造化されたレポートを作成します。このプロセスは手作業で行われるため、何時間もの作業が必要です。と 適切に設計されたマルチエージェント システムにより、レベルを維持しながら時間を数分に短縮できます。 高品質で追跡可能です。
私たちのシステムは、パターンを通じて調整された 3 つのエージェント アーキテクチャに基づいています。 一連 e 渡す オーケストレーションに関する記事で分析しました。各エージェントには役割があります 正確な専用ツールと明確に定義された入出力コントラクト。コミュニケーションは以下を通じて行われます 永続的な共有状態を維持し、システムは RAG (検索拡張生成) を統合して情報を強化します。 内部文書による分析。
この記事で学べること
- シーケンシャル パターンとハンドオフ パターンを使用した自律検索のためのマルチエージェント アーキテクチャを設計する方法
- 3 つの専門エージェント (リサーチャー、アナリスト、エディター) を完全に実装
- RAG と Pinecone の統合により、内部ドキュメントによる分析が強化されます
- ナレッジグラフによるエージェント間の共有メモリの管理
- マルチエージェント システムのエラー処理とフォールバック戦略
- API 経由でシステムを公開するための Docker Compose と FastAPI を使用したデプロイメント
- 運用環境におけるシステムのパフォーマンス、精度、コストの指標
システムアーキテクチャ
Research Assistant のアーキテクチャは、連続したパイプラインに配置された 3 つのエージェントに基づいています。 各エージェントは前のエージェントの出力を受け取り、それを独自の専用ツールで処理して、 次の出力のための構造化された出力。エージェント間で共有状態によりコンテキストが維持される 完全な調査により、各エージェントがフェーズから収集された情報にアクセスできるようになります。 前例。
マルチエージェントのシステムフロー
| 段階 | エージェント | 入力 | 出力 | ツール |
|---|---|---|---|---|
| 1 | 研究者 | ユーザークエリ + パラメータ | 要約付きで検証されたソース | Web 検索、URL スクレイパー、ソース検証ツール |
| 2 | アナリスト | 検証済みの情報源 | 信頼スコアを伴う調査結果 | 相互参照、キー抽出、矛盾検出 |
| 3 | エディタ | 構造化された調査結果 | フォーマットされた最終レポート | テンプレート エンジン、引用フォーマッタ、エクスポート ジェネレータ |
オーケストレーション グラフ
システムはオーケストレーションに LangGraph を使用します。メイン グラフはシーケンシャル フローを定義します エラーケースと説明の要求を管理する条件付きエッジを備えた 3 つのエージェント間。 たとえば、データが収集された場合、アナリストは研究者に追加のソースを検索するよう要求できます。 不十分または矛盾しています。
+------------------+
| User Query |
+--------+---------+
|
v
+--------+---------+
| RESEARCHER |
| (Web Search, |
| URL Scraping, |
| Validation) |
+--------+---------+
|
Fonti validate
|
v
+--------+---------+
+---->| ANALYST |
| | (Cross-Ref, |
| | Extraction, |
| | Fact-Check) |
| +--------+---------+
| |
Richiesta| Findings + Score
fonti | |
aggiuntive v
| +--------+---------+
+-----| ROUTER |
| (Score > 0.7?) |
+--------+---------+
|
Score OK
|
v
+--------+---------+
| EDITOR |
| (Template, |
| Citations, |
| Export) |
+--------+---------+
|
v
+--------+---------+
| Report Finale |
+------------------+
共有状態の定義
共有状態はエージェント間の通信の中心です。データの構造を定義します。
これらはグラフ内を流れるため、タイプの安全性とトレーサビリティが確保されます。私たちは、
TypedDict 各状態フィールドを明示的に定義します。
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import add_messages
from langchain_core.messages import BaseMessage
class Source(TypedDict):
url: str
title: str
content: str
credibility_score: float
domain: str
extraction_date: str
class Finding(TypedDict):
claim: str
evidence: List[str]
source_urls: List[str]
confidence: float
category: str
contradictions: Optional[List[str]]
class ResearchState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
query: str
max_sources: int
sources: List[Source]
findings: List[Finding]
overall_confidence: float
report: Optional[str]
report_format: str
errors: List[str]
iteration_count: int
needs_more_sources: bool
エージェント 1: 研究者
研究者は情報源の収集を担当するエージェントです。ユーザーのクエリを受け取り、 検索サブクエリに分割し、Tavily API 経由で Web を検索し、ダウンロードします。 見つかったページの内容を分析し、各情報源の信頼性を検証します。 出力は、関連するコンテンツの概要を含む検証済みのソースのリストです。
ツールの定義
Researcher には 3 つの特殊なツールがあります: 1 つは Web リサーチ用、もう 1 つはスクレイピング用です。 ページの内容の説明と、情報源の信頼性を検証するためのものです。
from langchain_core.tools import tool
from tavily import TavilyClient
import httpx
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import json
tavily_client = TavilyClient(api_key="tvly-...")
@tool
def web_search(query: str, max_results: int = 5) -> str:
"""Cerca informazioni sul web per una query specifica.
Args:
query: La query di ricerca da eseguire
max_results: Numero massimo di risultati (default: 5)
Returns:
JSON con i risultati della ricerca inclusi URL, titolo e snippet
"""
response = tavily_client.search(
query=query,
max_results=max_results,
search_depth="advanced",
include_raw_content=True,
include_domains=["arxiv.org", "github.com", "medium.com",
"techcrunch.com", "reuters.com"]
)
results = []
for r in response.get("results", []):
results.append({
"url": r["url"],
"title": r["title"],
"snippet": r["content"][:500],
"raw_content": r.get("raw_content", "")[:2000],
"score": r.get("score", 0.0)
})
return json.dumps(results, indent=2)
@tool
def scrape_url(url: str) -> str:
"""Scarica e analizza il contenuto di una pagina web.
Args:
url: L'URL della pagina da analizzare
Returns:
Il testo estratto dalla pagina (max 3000 caratteri)
"""
try:
headers = {"User-Agent": "ResearchBot/1.0"}
response = httpx.get(url, headers=headers, timeout=15.0,
follow_redirects=True)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
return text[:3000]
except Exception as e:
return f"Errore durante lo scraping di {url}: {str(e)}"
@tool
def validate_source(url: str, title: str, content_snippet: str) -> str:
"""Valida la credibilita di una fonte basandosi su dominio e contenuto.
Args:
url: L'URL della fonte
title: Il titolo della pagina
content_snippet: Un estratto del contenuto
Returns:
JSON con il punteggio di credibilita e la motivazione
"""
domain = urlparse(url).netloc.lower()
high_credibility = ["arxiv.org", "nature.com", "science.org",
"ieee.org", "acm.org", "gov", ".edu"]
medium_credibility = ["github.com", "medium.com", "techcrunch.com",
"reuters.com", "bloomberg.com"]
score = 0.5
reasons = []
for hc in high_credibility:
if hc in domain:
score = 0.9
reasons.append(f"Dominio accademico/istituzionale: {domain}")
break
else:
for mc in medium_credibility:
if mc in domain:
score = 0.7
reasons.append(f"Dominio tecnico riconosciuto: {domain}")
break
if len(content_snippet) > 200:
score += 0.05
reasons.append("Contenuto sostanziale presente")
if title and len(title) > 10:
score += 0.02
reasons.append("Titolo descrittivo presente")
score = min(score, 1.0)
return json.dumps({
"url": url,
"credibility_score": round(score, 2),
"reasons": reasons,
"domain": domain
})
エージェントリサーチャーの定義
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
researcher_llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
researcher_system_prompt = """Sei un Research Agent specializzato nella
raccolta di fonti attendibili dal web.
OBIETTIVO: Data una query di ricerca, trova e valida fonti di alta qualità.
PROCESSO:
1. Analizza la query e identifica 2-3 sotto-query specifiche
2. Per ogni sotto-query, usa web_search per trovare risultati
3. Per i risultati più promettenti, usa scrape_url per ottenere il
contenuto completo
4. Usa validate_source per verificare la credibilita di ogni fonte
5. Restituisci SOLO le fonti con credibility_score >= 0.6
OUTPUT: Produci un JSON con la lista delle fonti validate, ciascuna con:
- url, title, content (summary del contenuto rilevante)
- credibility_score, domain, extraction_date
NON inventare informazioni. Se non trovi fonti sufficienti, segnalalo."""
researcher_tools = [web_search, scrape_url, validate_source]
researcher_agent = create_react_agent(
model=researcher_llm,
tools=researcher_tools,
prompt=researcher_system_prompt
)
エージェント 2: アナリスト
アナリストは、研究者によって検証された情報源を受け取り、それらを徹底的に分析します。彼の目標 主要な調査結果を抽出し、相互参照を通じてソース間の一貫性を検証し、 矛盾があれば特定します。出力は、構造化された結果のリストです。 信頼度スコアは、それをサポートする情報源の数と質に基づいています。
アナリストツール
from collections import Counter
import re
@tool
def cross_reference_check(claim: str, sources_json: str) -> str:
"""Verifica un'affermazione incrociando multiple fonti.
Args:
claim: L'affermazione da verificare
sources_json: JSON con le fonti da analizzare
Returns:
JSON con il risultato della verifica è il numero di fonti
che supportano, contraddicono o non menzionano l'affermazione
"""
sources = json.loads(sources_json)
supporting = []
contradicting = []
neutral = []
claim_keywords = set(claim.lower().split())
for source in sources:
content = source.get("content", "").lower()
keyword_matches = sum(1 for kw in claim_keywords
if kw in content)
match_ratio = keyword_matches / max(len(claim_keywords), 1)
if match_ratio > 0.6:
supporting.append(source["url"])
elif match_ratio > 0.3:
neutral.append(source["url"])
else:
contradicting.append(source["url"])
total = len(sources)
confidence = len(supporting) / max(total, 1)
return json.dumps({
"claim": claim,
"supporting_sources": supporting,
"contradicting_sources": contradicting,
"neutral_sources": neutral,
"confidence": round(confidence, 2),
"verdict": "confirmed" if confidence > 0.6
else "uncertain" if confidence > 0.3
else "unverified"
})
@tool
def extract_key_findings(content: str, topic: str) -> str:
"""Estrae i finding principali da un testo rispetto a un topic.
Args:
content: Il testo da analizzare
topic: L'argomento di riferimento per l'estrazione
Returns:
JSON con i finding estratti e la loro rilevanza
"""
sentences = re.split(r'[.!?]+', content)
topic_keywords = set(topic.lower().split())
findings = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 20:
continue
words = set(sentence.lower().split())
relevance = len(words.intersection(topic_keywords))
relevance = relevance / max(len(topic_keywords), 1)
if relevance > 0.3:
findings.append({
"text": sentence[:200],
"relevance_score": round(relevance, 2)
})
findings.sort(key=lambda x: x["relevance_score"], reverse=True)
return json.dumps(findings[:10])
@tool
def detect_contradictions(findings_json: str) -> str:
"""Identifica contraddizioni tra i findings raccolti.
Args:
findings_json: JSON con i findings da analizzare
Returns:
JSON con le coppie di findings potenzialmente in contraddizione
"""
findings = json.loads(findings_json)
contradictions = []
negation_words = {"not", "no", "never", "non", "without",
"unlike", "contrary", "however", "but",
"decrease", "decline", "drop", "reduce"}
for i, f1 in enumerate(findings):
for f2 in findings[i+1:]:
text1 = set(f1.get("text", "").lower().split())
text2 = set(f2.get("text", "").lower().split())
overlap = text1.intersection(text2)
neg_in_1 = bool(text1.intersection(negation_words))
neg_in_2 = bool(text2.intersection(negation_words))
if len(overlap) > 3 and neg_in_1 != neg_in_2:
contradictions.append({
"finding_1": f1.get("text", "")[:100],
"finding_2": f2.get("text", "")[:100],
"shared_keywords": list(overlap)[:5],
"severity": "high" if len(overlap) > 5 else "medium"
})
return json.dumps(contradictions)
エージェント アナリストの定義
analyst_llm = ChatOpenAI(model="gpt-4o", temperature=0.0)
analyst_system_prompt = """Sei un Analyst Agent specializzato nel fact-checking
e nell'estrazione di insight da fonti multiple.
OBIETTIVO: Analizzare le fonti fornite dal Researcher, estrarre i finding
principali e verificarne la coerenza.
PROCESSO:
1. Per ogni fonte, usa extract_key_findings per estrarre i punti chiave
2. Usa cross_reference_check per verificare ogni finding chiave
3. Usa detect_contradictions per identificare incoerenze
4. Assegna un confidence score complessivo ai findings
OUTPUT: Produci un JSON con:
- findings: lista di finding con claim, evidence, source_urls, confidence
- overall_confidence: media pesata dei confidence score
- contradictions: lista di contraddizioni trovate
- recommendation: "proceed" se confidence > 0.7, "needs_more_sources" altrimenti
Sii critico e rigoroso. Non dare per scontato nulla."""
analyst_tools = [cross_reference_check, extract_key_findings,
detect_contradictions]
analyst_agent = create_react_agent(
model=analyst_llm,
tools=analyst_tools,
prompt=analyst_system_prompt
)
エージェント 3: 編集者
編集者は、最終レポートの作成を担当するエージェントです。構造化された所見を受け取る アナリストからの情報を受け取り、論理構造に整理し、学術基準に従って引用文をフォーマットします。 ユーザーが要求した形式 (Markdown、HTML、または JSON) でレポートを生成します。編集者は発明しない コンテンツ: 収集した情報を構造化、文脈化し、読みやすくすることに限定されます。 以前のエージェントによって確認されました。
エディタツール
from datetime import datetime
@tool
def apply_report_template(findings_json: str, query: str,
template_type: str = "executive") -> str:
"""Applica un template di report ai findings strutturati.
Args:
findings_json: JSON con i findings da formattare
query: La query originale dell'utente
template_type: Tipo di template (executive, technical, brief)
Returns:
Report formattato in Markdown
"""
findings = json.loads(findings_json)
date = datetime.now().strftime("%Y-%m-%d")
if template_type == "executive":
sections = [
f"# Research Report: {query}",
f"*Generated on {date}*\n",
"## Executive Summary\n",
"## Key Findings\n",
"## Detailed Analysis\n",
"## Sources and References\n",
"## Methodology\n",
"---",
"*This report was generated by an AI Research Assistant.*"
]
elif template_type == "technical":
sections = [
f"# Technical Analysis: {query}",
f"*Report Date: {date}*\n",
"## Abstract\n",
"## Data Sources\n",
"## Analysis\n",
"## Results\n",
"## Limitations\n",
"## References\n"
]
else:
sections = [
f"# Brief: {query}",
f"*{date}*\n",
"## Summary\n",
"## Key Points\n",
"## Sources\n"
]
return "\n".join(sections)
@tool
def format_citations(sources_json: str,
style: str = "apa") -> str:
"""Formatta le citazioni delle fonti secondo uno standard accademico.
Args:
sources_json: JSON con le fonti da citare
style: Stile di citazione (apa, chicago, ieee)
Returns:
Lista di citazioni formattate
"""
sources = json.loads(sources_json)
citations = []
for i, source in enumerate(sources, 1):
title = source.get("title", "Untitled")
url = source.get("url", "")
domain = source.get("domain", "")
date = source.get("extraction_date",
datetime.now().strftime("%Y-%m-%d"))
if style == "apa":
citation = (f"[{i}] {domain}. ({date}). "
f"*{title}*. Retrieved from {url}")
elif style == "ieee":
citation = (f"[{i}] \"{title},\" {domain}, "
f"{date}. [Online]. Available: {url}")
else:
citation = f"[{i}] {title}. {url} ({date})"
citations.append(citation)
return "\n".join(citations)
@tool
def generate_export(report_markdown: str,
output_format: str = "markdown") -> str:
"""Genera l'export finale del report nel formato richiesto.
Args:
report_markdown: Il report in formato Markdown
output_format: Formato di output (markdown, html, json)
Returns:
Il report nel formato specificato
"""
if output_format == "markdown":
return report_markdown
elif output_format == "html":
lines = report_markdown.split("\n")
html_lines = []
for line in lines:
if line.startswith("# "):
html_lines.append(f"<h1>{line[2:]}</h1>")
elif line.startswith("## "):
html_lines.append(f"<h2>{line[3:]}</h2>")
elif line.startswith("- "):
html_lines.append(f"<li>{line[2:]}</li>")
elif line.strip():
html_lines.append(f"<p>{line}</p>")
return "\n".join(html_lines)
elif output_format == "json":
return json.dumps({"report": report_markdown,
"format": output_format,
"generated_at": datetime.now().isoformat()})
return report_markdown
エディターエージェントの定義
editor_llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
editor_system_prompt = """Sei un Editor Agent specializzato nella
produzione di report professionali e ben strutturati.
OBIETTIVO: Trasformare i findings dell'Analyst in un report leggibile
e citato correttamente.
PROCESSO:
1. Usa apply_report_template per creare la struttura del report
2. Popola ogni sezione con i findings pertinenti
3. Usa format_citations per generare la bibliografia
4. Usa generate_export per produrre il formato finale
REGOLE DI STILE:
- Scrivi in modo chiaro e professionale
- Ogni affermazione deve avere una citazione [N]
- Evidenzia il livello di confidenza per ogni finding
- Segnala esplicitamente le aree di incertezza
- Non inventare MAI dati o citazioni non presenti nei findings
OUTPUT: Il report completo nel formato richiesto."""
editor_tools = [apply_report_template, format_citations,
generate_export]
editor_agent = create_react_agent(
model=editor_llm,
tools=editor_tools,
prompt=editor_system_prompt
)
LangGraph によるオーケストレーション
次に、3 つのエージェントを LangGraph グラフに組み立ててみましょう。グラフは一連の流れを定義します アナリストと編集者の間にルーター ノードがあり、全体的な信頼スコアが適切かどうかをチェックします。 レポートの生成を続行する場合、または追加のソースが必要な場合は、これで十分です。
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
def run_researcher(state: ResearchState) -> dict:
"""Nodo Researcher: raccoglie e valida le fonti."""
query = state["query"]
max_sources = state.get("max_sources", 5)
result = researcher_agent.invoke({
"messages": [("human",
f"Cerca fonti per: {query}. "
f"Trova almeno {max_sources} fonti attendibili.")]
})
last_message = result["messages"][-1].content
try:
sources = json.loads(last_message)
except json.JSONDecodeError:
sources = []
return {
"sources": sources,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": result["messages"]
}
def run_analyst(state: ResearchState) -> dict:
"""Nodo Analyst: analizza le fonti e produce findings."""
sources = state["sources"]
result = analyst_agent.invoke({
"messages": [("human",
f"Analizza queste fonti e produci findings strutturati:\n"
f"{json.dumps(sources, indent=2)}")]
})
last_message = result["messages"][-1].content
try:
analysis = json.loads(last_message)
findings = analysis.get("findings", [])
confidence = analysis.get("overall_confidence", 0.0)
needs_more = analysis.get("recommendation") == "needs_more_sources"
except json.JSONDecodeError:
findings = []
confidence = 0.0
needs_more = True
return {
"findings": findings,
"overall_confidence": confidence,
"needs_more_sources": needs_more,
"messages": result["messages"]
}
def run_editor(state: ResearchState) -> dict:
"""Nodo Editor: genera il report finale."""
findings = state["findings"]
query = state["query"]
report_format = state.get("report_format", "markdown")
result = editor_agent.invoke({
"messages": [("human",
f"Genera un report per la query '{query}' "
f"basato su questi findings:\n"
f"{json.dumps(findings, indent=2)}\n"
f"Formato richiesto: {report_format}")]
})
report = result["messages"][-1].content
return {
"report": report,
"messages": result["messages"]
}
def should_continue_or_edit(state: ResearchState) -> str:
"""Router: decide se procedere all'Editor o tornare al Researcher."""
if (state.get("needs_more_sources", False)
and state.get("iteration_count", 0) < 3):
return "researcher"
return "editor"
# Costruzione del grafo
graph = StateGraph(ResearchState)
graph.add_node("researcher", run_researcher)
graph.add_node("analyst", run_analyst)
graph.add_node("editor", run_editor)
graph.add_edge(START, "researcher")
graph.add_edge("researcher", "analyst")
graph.add_conditional_edges(
"analyst",
should_continue_or_edit,
{"researcher": "researcher", "editor": "editor"}
)
graph.add_edge("editor", END)
memory = MemorySaver()
research_app = graph.compile(checkpointer=memory)
メモリの統合
私たちのシステムの重要な側面は、共有メモリの管理です。 3人のエージェントが行動する 同じ状態を維持できるようにするための長期記憶も必要です。 以前の研究から学ぶため、すでに分析され構築されているソースに再度アクセスすることを避けます。 徐々にドメインのナレッジ グラフが作成されます。
共有ナレッジグラフ
検索から抽出されたエンティティを追跡する単純なナレッジ グラフを実装します。 それらの関係と検証された事実。この構造はアナリストによって検討されています 歴史的コンテキストを使用して分析を強化し、エディターから相互参照を挿入します。
from typing import Dict, Set, Tuple
import sqlite3
class KnowledgeGraph:
"""Knowledge graph persistente per il Research Assistant."""
def __init__(self, db_path: str = "research_kg.db"):
self.conn = sqlite3.connect(db_path)
self._create_tables()
def _create_tables(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
entity_type TEXT,
first_seen TEXT,
mention_count INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS relations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subject_id INTEGER REFERENCES entities(id),
predicate TEXT,
object_id INTEGER REFERENCES entities(id),
confidence REAL,
source_url TEXT
);
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
statement TEXT,
confidence REAL,
verified_by INTEGER DEFAULT 0,
source_urls TEXT,
created_at TEXT
);
""")
self.conn.commit()
def add_entity(self, name: str, entity_type: str):
self.conn.execute("""
INSERT INTO entities (name, entity_type, first_seen)
VALUES (?, ?, datetime('now'))
ON CONFLICT(name) DO UPDATE
SET mention_count = mention_count + 1
""", (name, entity_type))
self.conn.commit()
def add_relation(self, subject: str, predicate: str,
obj: str, confidence: float,
source_url: str = ""):
self.add_entity(subject, "auto")
self.add_entity(obj, "auto")
sub_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(subject,)).fetchone()[0]
obj_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(obj,)).fetchone()[0]
self.conn.execute("""
INSERT INTO relations
(subject_id, predicate, object_id, confidence, source_url)
VALUES (?, ?, ?, ?, ?)
""", (sub_id, predicate, obj_id, confidence, source_url))
self.conn.commit()
def query_entity(self, name: str) -> Dict:
entity = self.conn.execute(
"SELECT * FROM entities WHERE name LIKE ?",
(f"%{name}%",)).fetchone()
if not entity:
return {}
relations = self.conn.execute("""
SELECT e2.name, r.predicate, r.confidence
FROM relations r
JOIN entities e2 ON r.object_id = e2.id
WHERE r.subject_id = ?
""", (entity[0],)).fetchall()
return {
"name": entity[1],
"type": entity[2],
"mentions": entity[4],
"relations": [{"target": r[0], "predicate": r[1],
"confidence": r[2]} for r in relations]
}
RAGの統合
私たちのリサーチアシスタントは、RAG (検索拡張生成) システムを統合しています。 アナリストは、ベクトル データベースにアーカイブされた内部文書や以前のレポートを参照できます。 ベクトル ストアとして Pinecone を使用し、埋め込み生成には OpenAI を使用します。
取得パイプライン
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = PineconeVectorStore(
index_name="research-assistant",
embedding=embeddings,
namespace="documents"
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " "]
)
def index_document(content: str, metadata: dict):
"""Indicizza un documento nel vector store."""
chunks = text_splitter.split_text(content)
documents = [
Document(page_content=chunk, metadata={
**metadata,
"chunk_index": i,
"total_chunks": len(chunks)
})
for i, chunk in enumerate(chunks)
]
vectorstore.add_documents(documents)
def retrieve_relevant(query: str, top_k: int = 5) -> list:
"""Recupera i documenti più rilevanti per una query."""
results = vectorstore.similarity_search_with_score(
query, k=top_k
)
return [
{
"content": doc.page_content,
"metadata": doc.metadata,
"similarity_score": round(float(score), 3)
}
for doc, score in results
]
@tool
def search_internal_knowledge(query: str,
max_results: int = 5) -> str:
"""Cerca nei documenti interni e nei report precedenti.
Args:
query: La query di ricerca
max_results: Numero massimo di risultati
Returns:
JSON con i documenti rilevanti trovati nel knowledge base
"""
results = retrieve_relevant(query, top_k=max_results)
return json.dumps(results, indent=2)
エラー処理とフォールバック
マルチエージェント システムでは、エラー処理が重要な要素となります。どのエージェントも失敗する可能性があります ネットワークのタイムアウト、利用できない API、解析できないコンテンツ、レート制限など、さまざまな理由があります。 当社のシステムは、ツール レベル、エージェント レベル、および システムレベル。
3 レベルのフォールバック戦略
- ツールレベル: 各ツールは内部的に try/excel を使用して独自のエラーを処理します。 例外を伝播するのではなく、構造化されたエラー メッセージを返します。ソースなら に到達できない場合、ツールは HTTP コードのエラーとヒントを返します。
- エージェントレベル: エージェントがタスクを完了できなかった場合、ルーターは フローをリダイレクトできます。最初の情報源が利用できない場合、研究者は代替情報源を探します。 利用可能です。アナリストは、あまりにも多くの矛盾を発見した場合、追加の情報源を要求します。
- システムレベル: グラフには反復の最大制限があります (3 サイクル) 研究者兼分析者)。 3 回の反復後も信頼スコアが 0.5 未満のままの場合、システムは ただし、結果の信頼性が低いという明示的な警告を含むレポートが生成されます。 そして手動介入を提案します。
from langgraph.errors import NodeInterrupt
def run_researcher_with_fallback(state: ResearchState) -> dict:
"""Researcher con gestione errori e fallback."""
max_retries = 2
errors = list(state.get("errors", []))
for attempt in range(max_retries):
try:
result = run_researcher(state)
if not result.get("sources"):
errors.append(
f"Attempt {attempt+1}: No sources found"
)
continue
result["errors"] = errors
return result
except Exception as e:
errors.append(
f"Attempt {attempt+1}: {type(e).__name__}: {str(e)}"
)
return {
"sources": [],
"errors": errors,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": state.get("messages", [])
}
def manual_intervention_check(state: ResearchState) -> str:
"""Verifica se serve intervento manuale."""
iteration = state.get("iteration_count", 0)
confidence = state.get("overall_confidence", 0.0)
errors = state.get("errors", [])
if iteration >= 3 and confidence < 0.5:
raise NodeInterrupt(
f"Sistema bloccato dopo {iteration} iterazioni. "
f"Confidence: {confidence:.2f}. "
f"Errori: {len(errors)}. "
f"Intervento manuale richiesto."
)
if len(errors) > 5:
raise NodeInterrupt(
f"Troppi errori accumulati ({len(errors)}). "
f"Verificare la connettivita e i limiti API."
)
return "continue"
導入
Research Assistant を実稼働環境で使用できるようにするために、コンテナにパッケージ化します。 Docker を作成し、FastAPI 経由で REST サービスとして公開します。このアーキテクチャにより、拡張が可能になります システムを既存のアプリケーションと水平に統合し、パフォーマンスを監視します。 リアルタイム。
FastAPI ラッパー
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from uuid import uuid4
import asyncio
app = FastAPI(title="Research Assistant API", version="1.0.0")
class ResearchRequest(BaseModel):
query: str
max_sources: int = 5
report_format: str = "markdown"
class ResearchResponse(BaseModel):
job_id: str
status: str
report: str | None = None
confidence: float | None = None
sources_count: int | None = None
errors: list[str] = []
jobs: dict[str, ResearchResponse] = {}
async def execute_research(job_id: str, request: ResearchRequest):
"""Esegue la ricerca in background."""
try:
config = {"configurable": {"thread_id": job_id}}
initial_state = {
"query": request.query,
"max_sources": request.max_sources,
"report_format": request.report_format,
"messages": [],
"sources": [],
"findings": [],
"overall_confidence": 0.0,
"report": None,
"errors": [],
"iteration_count": 0,
"needs_more_sources": False
}
result = await asyncio.to_thread(
research_app.invoke, initial_state, config
)
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="completed",
report=result.get("report"),
confidence=result.get("overall_confidence"),
sources_count=len(result.get("sources", [])),
errors=result.get("errors", [])
)
except Exception as e:
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="failed",
errors=[str(e)]
)
@app.post("/research", response_model=ResearchResponse)
async def start_research(request: ResearchRequest,
background_tasks: BackgroundTasks):
job_id = str(uuid4())
jobs[job_id] = ResearchResponse(
job_id=job_id, status="processing")
background_tasks.add_task(execute_research, job_id, request)
return jobs[job_id]
@app.get("/research/{job_id}", response_model=ResearchResponse)
async def get_research_status(job_id: str):
if job_id not in jobs:
raise HTTPException(status_code=404,
detail="Job not found")
return jobs[job_id]
Docker Compose
version: "3.9"
services:
research-api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- TAVILY_API_KEY=${TAVILY_API_KEY}
- PINECONE_API_KEY=${PINECONE_API_KEY}
- PINECONE_INDEX=research-assistant
volumes:
- ./data:/app/data
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
depends_on:
- prometheus
volumes:
redis-data:
結果と教訓
本番環境のシステムメトリクス
| メトリック | 価値 | 注意事項 |
|---|---|---|
| レポートあたりの平均時間 | 2~4分 | クエリの複雑さとソースの数によって異なります。 |
| レポートの出典 | 5-12 | 検索ごとに平均 8 つの検証済みソース |
| 平均信頼スコア | 0.74 | 技術分野における 200 件の調査研究のサンプルについて |
| レポートあたりのコスト | $0.15 - $0.45 | Tavily API 呼び出しを含む、3 つのエージェントすべての GPT-4o |
| フォールバック率 | 12% | 研究者と分析者のサイクルを複数回必要とする研究 |
| 手動による介入 | 3% | 信頼度が 0.5 未満でレビューが必要なレポート |
学んだ教訓
- ツールの説明の質が差別化要因となる。説明付きツール 曖昧またはあいまいな場合、エージェントは誤った選択をしてしまいます。編集チームに時間を投資する パラメータの具体的な例を使用した正確な説明により、パフォーマンスが大幅に向上します。
- エラー処理は第一級の懸念事項であり、後付けではありません。で マルチエージェント システムでは、1 つのエージェントで処理されないエラーがカスケード的に伝播します。あらゆるツール また、グラフ内の各ノードには明示的なフォールバック戦略が必要です。
- 共有メモリが本当の差別化要因です。ナレッジグラフがなければ、 すべての検索はゼロから始まります。メモリを使用すると、システムは徐々に改善されます。 すでに分析されたエンティティを使用して、低品質のソースへの再訪問を回避し、コンテキストを構築します これにより、あらゆる新しい分析が強化されます。
- 信頼スコアは単に計算するだけでなく調整する必要があります。最初のスコア 彼らは楽観的すぎました。不明なドメインのソースに対するペナルティを導入しました。 複数の相互参照に対するボーナスと物議を醸すトピックに対する警告のしきい値。
- FinOps 戦略により運用コストを管理可能。 GPT-4o-miniを使用する 研究者 (より単純なタスクを行う人) には GPT-4o を使用し、アナリストと編集者にはコストを削減します。 品質に大きな影響を与えることなく、35% 削減されます。
結論
このケーススタディでは、概念がどのように機能するかを示す完全なマルチエージェント システムを構築しました。 これまでの 12 回の記事で検討した機能は、現実世界のアプリケーションに統合されます。自律的な研究 Assistant は学術的な演習ではありません。これは多数のドメインに適用できるアーキテクチャ パターンです。 財務デューデリジェンスから競合分析まで、科学調査からモニタリングまで 市場の。
このシステム構築で見えてきたポイントは企画の中心性 ツールの重要性は、補助的ではなく構造的な側面としてのエラー管理の重要性です。 継続的改善の要素としての共有メモリの価値。建築上のあらゆる決定は、 エージェントの数からモデルの選択に至るまで、具体的な指標と要件に基づいて決定する必要があります 実際のビジネスの。
シリーズの次の最終記事では、 「AI エージェントの未来: 新たな機能、 「AGI と現在の制限」、コードから調べて、どこへ行くのかを調べます。 AIエージェント。現在の限界、新たな機能、研究の方向性を分析します。 最も有望であり、この急速に進化するテクノロジーの経済的および倫理的影響です。







