서론: 자율적인 연구 시스템 구축
AI 에이전트에 관한 시리즈의 13번째 기사에서는 다음을 구축하여 이론에서 실습으로 이동합니다. 완전한 시스템: 자율 연구 조교 세 명의 전문 에이전트로 구성 구조화되고 검증된 연구 보고서를 작성하기 위해 협력하는 사람들입니다. 이 사례 연구는 이 모든 것을 통합합니다. 이전 12개 기사에서 살펴본 개념: 다중 에이전트 오케스트레이션, 공유 메모리, 고급 도구 호출, 테스트, 보안, FinOps 및 배포.
우리가 직면한 문제는 기업 세계에서 구체적이고 널리 퍼져 있습니다. 분석가는 수집해야 하는 문제입니다. 웹상의 다양한 소스로부터 정보를 수집하고 신뢰성을 검증하며 관련 인사이트를 추출하고 구조화된 보고서를 생성합니다. 수동으로 수행되는 이 프로세스에는 몇 시간의 작업이 필요합니다. 와 잘 설계된 멀티 에이전트 시스템을 사용하면 일정 수준을 유지하면서 시간을 몇 분으로 줄일 수 있습니다. 고품질이고 추적 가능합니다.
우리 시스템은 패턴을 통해 조정되는 3개의 에이전트 아키텍처를 기반으로 합니다. 잇달아 일어나는 e 핸드오프 오케스트레이션에 관한 기사에서 분석한 내용입니다. 각 상담원에게는 역할이 있습니다. 정확하고 전용 도구와 잘 정의된 입력/출력 계약이 필요합니다. 의사소통은 다음을 통해 이루어진다. 지속적인 공유 상태를 유지하며 시스템은 RAG(Retrieval-Augmented Generation)를 통합하여 내부 문서로 분석합니다.
이 기사에서 배울 내용
- 순차 및 핸드오프 패턴을 사용하여 자율 검색을 위한 다중 에이전트 아키텍처를 설계하는 방법
- 세 가지 전문 에이전트(연구원, 분석가, 편집자)의 완전한 구현
- 내부 문서로 분석을 강화하기 위해 RAG와 Pinecone의 통합
- 지식 그래프로 에이전트 간 공유 메모리 관리
- 다중 에이전트 시스템의 오류 처리 및 대체 전략
- API를 통해 시스템을 노출하기 위해 Docker Compose 및 FastAPI를 사용한 배포
- 생산 중인 시스템의 성능, 정확성 및 비용 지표
시스템 아키텍처
연구 보조원의 아키텍처는 순차적 파이프라인으로 배열된 세 개의 에이전트를 기반으로 합니다. 각 에이전트는 이전 에이전트의 출력을 수신하고 이를 자체 전문 도구로 처리하여 생성합니다. 다음 출력을 위한 구조화된 출력. 에이전트 간에 공유 상태가 컨텍스트를 유지합니다. 완전한 조사를 통해 각 에이전트가 각 단계에서 수집된 정보에 액세스할 수 있습니다. 전례.
다중 에이전트 시스템 흐름
| 단계 | 대리인 | 입력 | 출력 | 도구 |
|---|---|---|---|---|
| 1 | 연구원 | 사용자 쿼리 + 매개변수 | 요약으로 검증된 소스 | 웹 검색, URL 스크레이퍼, 소스 유효성 검사기 |
| 2 | 분석자 | 검증된 소스 | 신뢰도 점수를 사용한 결과 | 상호 참조, 키 추출기, 모순 감지기 |
| 3 | 편집자 | 구조화된 발견 항목 | 형식화된 최종 보고서 | 템플릿 엔진, 인용 포맷터, 내보내기 생성기 |
오케스트레이션 그래프
시스템은 오케스트레이션을 위해 LangGraph를 사용합니다. 메인 그래프는 순차 흐름을 정의합니다. 오류 사례와 설명 요청을 관리하는 조건부 가장자리가 있는 세 에이전트 사이. 예를 들어, 분석가는 데이터가 수집된 경우 연구원에게 추가 소스를 검색하도록 요청할 수 있습니다. 부족하거나 모순됩니다.
+------------------+
| User Query |
+--------+---------+
|
v
+--------+---------+
| RESEARCHER |
| (Web Search, |
| URL Scraping, |
| Validation) |
+--------+---------+
|
Fonti validate
|
v
+--------+---------+
+---->| ANALYST |
| | (Cross-Ref, |
| | Extraction, |
| | Fact-Check) |
| +--------+---------+
| |
Richiesta| Findings + Score
fonti | |
aggiuntive v
| +--------+---------+
+-----| ROUTER |
| (Score > 0.7?) |
+--------+---------+
|
Score OK
|
v
+--------+---------+
| EDITOR |
| (Template, |
| Citations, |
| Export) |
+--------+---------+
|
v
+--------+---------+
| Report Finale |
+------------------+
공유 상태의 정의
공유 상태는 에이전트 간 통신의 핵심입니다. 데이터의 구조를 정의합니다.
그래프를 통해 흐르므로 유형 안전성과 추적성이 보장됩니다. 우리는
TypedDict 각 상태 필드를 명시적으로 정의합니다.
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import add_messages
from langchain_core.messages import BaseMessage
class Source(TypedDict):
url: str
title: str
content: str
credibility_score: float
domain: str
extraction_date: str
class Finding(TypedDict):
claim: str
evidence: List[str]
source_urls: List[str]
confidence: float
category: str
contradictions: Optional[List[str]]
class ResearchState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
query: str
max_sources: int
sources: List[Source]
findings: List[Finding]
overall_confidence: float
report: Optional[str]
report_format: str
errors: List[str]
iteration_count: int
needs_more_sources: bool
에이전트 1: 연구원
연구원은 출처 수집을 담당하는 대리인입니다. 사용자의 질의를 받아, 검색 하위 쿼리로 분류하고, Tavily API를 통해 웹을 검색하고, 다운로드합니다. 발견된 페이지의 내용을 분석하고 각 출처의 신뢰성을 검증합니다. 출력은 관련 콘텐츠 요약과 함께 검증된 소스 목록입니다.
도구의 정의
연구원은 세 가지 전문 도구를 가지고 있습니다. 하나는 웹 조사용이고 다른 하나는 스크래핑용입니다. 페이지의 내용과 출처의 신뢰성을 확인하기 위한 것입니다.
from langchain_core.tools import tool
from tavily import TavilyClient
import httpx
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import json
tavily_client = TavilyClient(api_key="tvly-...")
@tool
def web_search(query: str, max_results: int = 5) -> str:
"""Cerca informazioni sul web per una query specifica.
Args:
query: La query di ricerca da eseguire
max_results: Numero massimo di risultati (default: 5)
Returns:
JSON con i risultati della ricerca inclusi URL, titolo e snippet
"""
response = tavily_client.search(
query=query,
max_results=max_results,
search_depth="advanced",
include_raw_content=True,
include_domains=["arxiv.org", "github.com", "medium.com",
"techcrunch.com", "reuters.com"]
)
results = []
for r in response.get("results", []):
results.append({
"url": r["url"],
"title": r["title"],
"snippet": r["content"][:500],
"raw_content": r.get("raw_content", "")[:2000],
"score": r.get("score", 0.0)
})
return json.dumps(results, indent=2)
@tool
def scrape_url(url: str) -> str:
"""Scarica e analizza il contenuto di una pagina web.
Args:
url: L'URL della pagina da analizzare
Returns:
Il testo estratto dalla pagina (max 3000 caratteri)
"""
try:
headers = {"User-Agent": "ResearchBot/1.0"}
response = httpx.get(url, headers=headers, timeout=15.0,
follow_redirects=True)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
return text[:3000]
except Exception as e:
return f"Errore durante lo scraping di {url}: {str(e)}"
@tool
def validate_source(url: str, title: str, content_snippet: str) -> str:
"""Valida la credibilita di una fonte basandosi su dominio e contenuto.
Args:
url: L'URL della fonte
title: Il titolo della pagina
content_snippet: Un estratto del contenuto
Returns:
JSON con il punteggio di credibilita e la motivazione
"""
domain = urlparse(url).netloc.lower()
high_credibility = ["arxiv.org", "nature.com", "science.org",
"ieee.org", "acm.org", "gov", ".edu"]
medium_credibility = ["github.com", "medium.com", "techcrunch.com",
"reuters.com", "bloomberg.com"]
score = 0.5
reasons = []
for hc in high_credibility:
if hc in domain:
score = 0.9
reasons.append(f"Dominio accademico/istituzionale: {domain}")
break
else:
for mc in medium_credibility:
if mc in domain:
score = 0.7
reasons.append(f"Dominio tecnico riconosciuto: {domain}")
break
if len(content_snippet) > 200:
score += 0.05
reasons.append("Contenuto sostanziale presente")
if title and len(title) > 10:
score += 0.02
reasons.append("Titolo descrittivo presente")
score = min(score, 1.0)
return json.dumps({
"url": url,
"credibility_score": round(score, 2),
"reasons": reasons,
"domain": domain
})
에이전트 연구원의 정의
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
researcher_llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
researcher_system_prompt = """Sei un Research Agent specializzato nella
raccolta di fonti attendibili dal web.
OBIETTIVO: Data una query di ricerca, trova e valida fonti di alta qualità.
PROCESSO:
1. Analizza la query e identifica 2-3 sotto-query specifiche
2. Per ogni sotto-query, usa web_search per trovare risultati
3. Per i risultati più promettenti, usa scrape_url per ottenere il
contenuto completo
4. Usa validate_source per verificare la credibilita di ogni fonte
5. Restituisci SOLO le fonti con credibility_score >= 0.6
OUTPUT: Produci un JSON con la lista delle fonti validate, ciascuna con:
- url, title, content (summary del contenuto rilevante)
- credibility_score, domain, extraction_date
NON inventare informazioni. Se non trovi fonti sufficienti, segnalalo."""
researcher_tools = [web_search, scrape_url, validate_source]
researcher_agent = create_react_agent(
model=researcher_llm,
tools=researcher_tools,
prompt=researcher_system_prompt
)
에이전트 2: 분석가
분석가는 연구원이 검증한 소스를 받아 심층적으로 분석합니다. 그의 목표 핵심 결과를 추출하고, 상호 참조를 통해 소스 간의 일관성을 확인하고, 모순을 식별하십시오. 출력은 구조화된 발견 항목의 목록입니다. 이를 지원하는 소스의 수와 품질을 기반으로 신뢰도 점수를 얻습니다.
분석가 도구
from collections import Counter
import re
@tool
def cross_reference_check(claim: str, sources_json: str) -> str:
"""Verifica un'affermazione incrociando multiple fonti.
Args:
claim: L'affermazione da verificare
sources_json: JSON con le fonti da analizzare
Returns:
JSON con il risultato della verifica è il numero di fonti
che supportano, contraddicono o non menzionano l'affermazione
"""
sources = json.loads(sources_json)
supporting = []
contradicting = []
neutral = []
claim_keywords = set(claim.lower().split())
for source in sources:
content = source.get("content", "").lower()
keyword_matches = sum(1 for kw in claim_keywords
if kw in content)
match_ratio = keyword_matches / max(len(claim_keywords), 1)
if match_ratio > 0.6:
supporting.append(source["url"])
elif match_ratio > 0.3:
neutral.append(source["url"])
else:
contradicting.append(source["url"])
total = len(sources)
confidence = len(supporting) / max(total, 1)
return json.dumps({
"claim": claim,
"supporting_sources": supporting,
"contradicting_sources": contradicting,
"neutral_sources": neutral,
"confidence": round(confidence, 2),
"verdict": "confirmed" if confidence > 0.6
else "uncertain" if confidence > 0.3
else "unverified"
})
@tool
def extract_key_findings(content: str, topic: str) -> str:
"""Estrae i finding principali da un testo rispetto a un topic.
Args:
content: Il testo da analizzare
topic: L'argomento di riferimento per l'estrazione
Returns:
JSON con i finding estratti e la loro rilevanza
"""
sentences = re.split(r'[.!?]+', content)
topic_keywords = set(topic.lower().split())
findings = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 20:
continue
words = set(sentence.lower().split())
relevance = len(words.intersection(topic_keywords))
relevance = relevance / max(len(topic_keywords), 1)
if relevance > 0.3:
findings.append({
"text": sentence[:200],
"relevance_score": round(relevance, 2)
})
findings.sort(key=lambda x: x["relevance_score"], reverse=True)
return json.dumps(findings[:10])
@tool
def detect_contradictions(findings_json: str) -> str:
"""Identifica contraddizioni tra i findings raccolti.
Args:
findings_json: JSON con i findings da analizzare
Returns:
JSON con le coppie di findings potenzialmente in contraddizione
"""
findings = json.loads(findings_json)
contradictions = []
negation_words = {"not", "no", "never", "non", "without",
"unlike", "contrary", "however", "but",
"decrease", "decline", "drop", "reduce"}
for i, f1 in enumerate(findings):
for f2 in findings[i+1:]:
text1 = set(f1.get("text", "").lower().split())
text2 = set(f2.get("text", "").lower().split())
overlap = text1.intersection(text2)
neg_in_1 = bool(text1.intersection(negation_words))
neg_in_2 = bool(text2.intersection(negation_words))
if len(overlap) > 3 and neg_in_1 != neg_in_2:
contradictions.append({
"finding_1": f1.get("text", "")[:100],
"finding_2": f2.get("text", "")[:100],
"shared_keywords": list(overlap)[:5],
"severity": "high" if len(overlap) > 5 else "medium"
})
return json.dumps(contradictions)
에이전트 분석가 정의
analyst_llm = ChatOpenAI(model="gpt-4o", temperature=0.0)
analyst_system_prompt = """Sei un Analyst Agent specializzato nel fact-checking
e nell'estrazione di insight da fonti multiple.
OBIETTIVO: Analizzare le fonti fornite dal Researcher, estrarre i finding
principali e verificarne la coerenza.
PROCESSO:
1. Per ogni fonte, usa extract_key_findings per estrarre i punti chiave
2. Usa cross_reference_check per verificare ogni finding chiave
3. Usa detect_contradictions per identificare incoerenze
4. Assegna un confidence score complessivo ai findings
OUTPUT: Produci un JSON con:
- findings: lista di finding con claim, evidence, source_urls, confidence
- overall_confidence: media pesata dei confidence score
- contradictions: lista di contraddizioni trovate
- recommendation: "proceed" se confidence > 0.7, "needs_more_sources" altrimenti
Sii critico e rigoroso. Non dare per scontato nulla."""
analyst_tools = [cross_reference_check, extract_key_findings,
detect_contradictions]
analyst_agent = create_react_agent(
model=analyst_llm,
tools=analyst_tools,
prompt=analyst_system_prompt
)
에이전트 3: 편집자
편집자는 최종 보고서 작성을 담당하는 대리인입니다. 구조화된 발견 항목 수신 분석가로부터 논리적 구조로 정리하고 학문적 기준에 따라 인용 형식을 지정합니다. 사용자가 요청한 형식(Markdown, HTML 또는 JSON)으로 보고서를 생성합니다. 편집자는 발명하지 않는다 내용: 수집된 정보를 구조화하고, 맥락화하고, 읽을 수 있게 만드는 것으로 제한됩니다. 이전 에이전트에 의해 확인되었습니다.
편집기 도구
from datetime import datetime
@tool
def apply_report_template(findings_json: str, query: str,
template_type: str = "executive") -> str:
"""Applica un template di report ai findings strutturati.
Args:
findings_json: JSON con i findings da formattare
query: La query originale dell'utente
template_type: Tipo di template (executive, technical, brief)
Returns:
Report formattato in Markdown
"""
findings = json.loads(findings_json)
date = datetime.now().strftime("%Y-%m-%d")
if template_type == "executive":
sections = [
f"# Research Report: {query}",
f"*Generated on {date}*\n",
"## Executive Summary\n",
"## Key Findings\n",
"## Detailed Analysis\n",
"## Sources and References\n",
"## Methodology\n",
"---",
"*This report was generated by an AI Research Assistant.*"
]
elif template_type == "technical":
sections = [
f"# Technical Analysis: {query}",
f"*Report Date: {date}*\n",
"## Abstract\n",
"## Data Sources\n",
"## Analysis\n",
"## Results\n",
"## Limitations\n",
"## References\n"
]
else:
sections = [
f"# Brief: {query}",
f"*{date}*\n",
"## Summary\n",
"## Key Points\n",
"## Sources\n"
]
return "\n".join(sections)
@tool
def format_citations(sources_json: str,
style: str = "apa") -> str:
"""Formatta le citazioni delle fonti secondo uno standard accademico.
Args:
sources_json: JSON con le fonti da citare
style: Stile di citazione (apa, chicago, ieee)
Returns:
Lista di citazioni formattate
"""
sources = json.loads(sources_json)
citations = []
for i, source in enumerate(sources, 1):
title = source.get("title", "Untitled")
url = source.get("url", "")
domain = source.get("domain", "")
date = source.get("extraction_date",
datetime.now().strftime("%Y-%m-%d"))
if style == "apa":
citation = (f"[{i}] {domain}. ({date}). "
f"*{title}*. Retrieved from {url}")
elif style == "ieee":
citation = (f"[{i}] \"{title},\" {domain}, "
f"{date}. [Online]. Available: {url}")
else:
citation = f"[{i}] {title}. {url} ({date})"
citations.append(citation)
return "\n".join(citations)
@tool
def generate_export(report_markdown: str,
output_format: str = "markdown") -> str:
"""Genera l'export finale del report nel formato richiesto.
Args:
report_markdown: Il report in formato Markdown
output_format: Formato di output (markdown, html, json)
Returns:
Il report nel formato specificato
"""
if output_format == "markdown":
return report_markdown
elif output_format == "html":
lines = report_markdown.split("\n")
html_lines = []
for line in lines:
if line.startswith("# "):
html_lines.append(f"<h1>{line[2:]}</h1>")
elif line.startswith("## "):
html_lines.append(f"<h2>{line[3:]}</h2>")
elif line.startswith("- "):
html_lines.append(f"<li>{line[2:]}</li>")
elif line.strip():
html_lines.append(f"<p>{line}</p>")
return "\n".join(html_lines)
elif output_format == "json":
return json.dumps({"report": report_markdown,
"format": output_format,
"generated_at": datetime.now().isoformat()})
return report_markdown
편집자 에이전트 정의
editor_llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
editor_system_prompt = """Sei un Editor Agent specializzato nella
produzione di report professionali e ben strutturati.
OBIETTIVO: Trasformare i findings dell'Analyst in un report leggibile
e citato correttamente.
PROCESSO:
1. Usa apply_report_template per creare la struttura del report
2. Popola ogni sezione con i findings pertinenti
3. Usa format_citations per generare la bibliografia
4. Usa generate_export per produrre il formato finale
REGOLE DI STILE:
- Scrivi in modo chiaro e professionale
- Ogni affermazione deve avere una citazione [N]
- Evidenzia il livello di confidenza per ogni finding
- Segnala esplicitamente le aree di incertezza
- Non inventare MAI dati o citazioni non presenti nei findings
OUTPUT: Il report completo nel formato richiesto."""
editor_tools = [apply_report_template, format_citations,
generate_export]
editor_agent = create_react_agent(
model=editor_llm,
tools=editor_tools,
prompt=editor_system_prompt
)
LangGraph를 사용한 오케스트레이션
이제 세 가지 에이전트를 LangGraph 그래프로 조합해 보겠습니다. 그래프는 순차 흐름을 정의합니다. 전체 신뢰도 점수를 확인하는 분석가와 편집자 사이의 라우터 노드 보고서 생성을 진행하기에 충분하거나 추가 소스가 필요한 경우입니다.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
def run_researcher(state: ResearchState) -> dict:
"""Nodo Researcher: raccoglie e valida le fonti."""
query = state["query"]
max_sources = state.get("max_sources", 5)
result = researcher_agent.invoke({
"messages": [("human",
f"Cerca fonti per: {query}. "
f"Trova almeno {max_sources} fonti attendibili.")]
})
last_message = result["messages"][-1].content
try:
sources = json.loads(last_message)
except json.JSONDecodeError:
sources = []
return {
"sources": sources,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": result["messages"]
}
def run_analyst(state: ResearchState) -> dict:
"""Nodo Analyst: analizza le fonti e produce findings."""
sources = state["sources"]
result = analyst_agent.invoke({
"messages": [("human",
f"Analizza queste fonti e produci findings strutturati:\n"
f"{json.dumps(sources, indent=2)}")]
})
last_message = result["messages"][-1].content
try:
analysis = json.loads(last_message)
findings = analysis.get("findings", [])
confidence = analysis.get("overall_confidence", 0.0)
needs_more = analysis.get("recommendation") == "needs_more_sources"
except json.JSONDecodeError:
findings = []
confidence = 0.0
needs_more = True
return {
"findings": findings,
"overall_confidence": confidence,
"needs_more_sources": needs_more,
"messages": result["messages"]
}
def run_editor(state: ResearchState) -> dict:
"""Nodo Editor: genera il report finale."""
findings = state["findings"]
query = state["query"]
report_format = state.get("report_format", "markdown")
result = editor_agent.invoke({
"messages": [("human",
f"Genera un report per la query '{query}' "
f"basato su questi findings:\n"
f"{json.dumps(findings, indent=2)}\n"
f"Formato richiesto: {report_format}")]
})
report = result["messages"][-1].content
return {
"report": report,
"messages": result["messages"]
}
def should_continue_or_edit(state: ResearchState) -> str:
"""Router: decide se procedere all'Editor o tornare al Researcher."""
if (state.get("needs_more_sources", False)
and state.get("iteration_count", 0) < 3):
return "researcher"
return "editor"
# Costruzione del grafo
graph = StateGraph(ResearchState)
graph.add_node("researcher", run_researcher)
graph.add_node("analyst", run_analyst)
graph.add_node("editor", run_editor)
graph.add_edge(START, "researcher")
graph.add_edge("researcher", "analyst")
graph.add_conditional_edges(
"analyst",
should_continue_or_edit,
{"researcher": "researcher", "editor": "editor"}
)
graph.add_edge("editor", END)
memory = MemorySaver()
research_app = graph.compile(checkpointer=memory)
메모리 통합
우리 시스템의 중요한 측면은 공유 메모리 관리입니다. 3명의 에이전트가 활동한다. 동일한 상태에 있지만 시스템을 허용하는 장기 기억도 필요합니다. 이전 연구로부터 배우기 위해 이미 분석된 소스를 다시 방문하지 말고 구축하세요. 점진적으로 해당 도메인의 지식 그래프를 생성합니다.
공유 지식 그래프
검색에서 추출된 엔터티를 추적하는 간단한 지식 그래프를 구현합니다. 확인된 사실과의 관계. 이 구조는 분석가의 자문을 받습니다. 역사적인 맥락을 통해 분석을 강화하고 편집기에서 상호 참조를 삽입합니다.
from typing import Dict, Set, Tuple
import sqlite3
class KnowledgeGraph:
"""Knowledge graph persistente per il Research Assistant."""
def __init__(self, db_path: str = "research_kg.db"):
self.conn = sqlite3.connect(db_path)
self._create_tables()
def _create_tables(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
entity_type TEXT,
first_seen TEXT,
mention_count INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS relations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subject_id INTEGER REFERENCES entities(id),
predicate TEXT,
object_id INTEGER REFERENCES entities(id),
confidence REAL,
source_url TEXT
);
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
statement TEXT,
confidence REAL,
verified_by INTEGER DEFAULT 0,
source_urls TEXT,
created_at TEXT
);
""")
self.conn.commit()
def add_entity(self, name: str, entity_type: str):
self.conn.execute("""
INSERT INTO entities (name, entity_type, first_seen)
VALUES (?, ?, datetime('now'))
ON CONFLICT(name) DO UPDATE
SET mention_count = mention_count + 1
""", (name, entity_type))
self.conn.commit()
def add_relation(self, subject: str, predicate: str,
obj: str, confidence: float,
source_url: str = ""):
self.add_entity(subject, "auto")
self.add_entity(obj, "auto")
sub_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(subject,)).fetchone()[0]
obj_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(obj,)).fetchone()[0]
self.conn.execute("""
INSERT INTO relations
(subject_id, predicate, object_id, confidence, source_url)
VALUES (?, ?, ?, ?, ?)
""", (sub_id, predicate, obj_id, confidence, source_url))
self.conn.commit()
def query_entity(self, name: str) -> Dict:
entity = self.conn.execute(
"SELECT * FROM entities WHERE name LIKE ?",
(f"%{name}%",)).fetchone()
if not entity:
return {}
relations = self.conn.execute("""
SELECT e2.name, r.predicate, r.confidence
FROM relations r
JOIN entities e2 ON r.object_id = e2.id
WHERE r.subject_id = ?
""", (entity[0],)).fetchall()
return {
"name": entity[1],
"type": entity[2],
"mentions": entity[4],
"relations": [{"target": r[0], "predicate": r[1],
"confidence": r[2]} for r in relations]
}
RAG 통합
우리 연구 보조원은 RAG(Retrieval-Augmented Generation) 시스템을 통합하여 다음을 수행합니다. 분석가는 벡터 데이터베이스에 보관된 내부 문서 및 이전 보고서를 참조할 수 있습니다. 우리는 벡터 저장소로 Pinecone을 사용하고 임베딩 생성을 위해 OpenAI를 사용합니다.
검색 파이프라인
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = PineconeVectorStore(
index_name="research-assistant",
embedding=embeddings,
namespace="documents"
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " "]
)
def index_document(content: str, metadata: dict):
"""Indicizza un documento nel vector store."""
chunks = text_splitter.split_text(content)
documents = [
Document(page_content=chunk, metadata={
**metadata,
"chunk_index": i,
"total_chunks": len(chunks)
})
for i, chunk in enumerate(chunks)
]
vectorstore.add_documents(documents)
def retrieve_relevant(query: str, top_k: int = 5) -> list:
"""Recupera i documenti più rilevanti per una query."""
results = vectorstore.similarity_search_with_score(
query, k=top_k
)
return [
{
"content": doc.page_content,
"metadata": doc.metadata,
"similarity_score": round(float(score), 3)
}
for doc, score in results
]
@tool
def search_internal_knowledge(query: str,
max_results: int = 5) -> str:
"""Cerca nei documenti interni e nei report precedenti.
Args:
query: La query di ricerca
max_results: Numero massimo di risultati
Returns:
JSON con i documenti rilevanti trovati nel knowledge base
"""
results = retrieve_relevant(query, top_k=max_results)
return json.dumps(results, indent=2)
오류 처리 및 대체
다중 에이전트 시스템에서 오류 처리는 중요한 측면입니다. 모든 에이전트는 실패할 수 있습니다 다양한 이유: 네트워크 시간 초과, 사용할 수 없는 API, 구문 분석할 수 없는 콘텐츠, 속도 제한. 우리 시스템은 도구 수준, 에이전트 수준 및 에이전트 수준의 세 가지 수준에서 대체 전략을 구현합니다. 시스템 수준.
3단계 대체 전략
- 도구 수준: 각 도구는 내부적으로 try/Exception을 사용하여 자체 오류를 처리합니다. 예외를 전파하는 대신 구조화된 오류 메시지를 반환합니다. 소스인 경우 연결할 수 없는 경우 도구는 HTTP 코드 및 힌트와 함께 오류를 반환합니다.
- 상담원 수준: 에이전트가 작업을 완료하지 못하면 라우터는 흐름을 리디렉션할 수 있습니다. 연구원은 첫 번째 소스를 사용할 수 없는 경우 대체 소스를 찾습니다. 가능합니다. 분석가는 모순이 너무 많이 발견되면 추가 소스를 요청합니다.
- 시스템 수준: 그래프에는 최대 반복 제한(3사이클)이 있습니다. 연구원-분석가). 3번의 반복 후에도 신뢰도 점수가 0.5 미만으로 유지되면 시스템은 그러나 결과의 신뢰성이 낮다는 명시적인 경고가 포함된 보고서를 생성합니다. 수동 개입을 제안합니다.
from langgraph.errors import NodeInterrupt
def run_researcher_with_fallback(state: ResearchState) -> dict:
"""Researcher con gestione errori e fallback."""
max_retries = 2
errors = list(state.get("errors", []))
for attempt in range(max_retries):
try:
result = run_researcher(state)
if not result.get("sources"):
errors.append(
f"Attempt {attempt+1}: No sources found"
)
continue
result["errors"] = errors
return result
except Exception as e:
errors.append(
f"Attempt {attempt+1}: {type(e).__name__}: {str(e)}"
)
return {
"sources": [],
"errors": errors,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": state.get("messages", [])
}
def manual_intervention_check(state: ResearchState) -> str:
"""Verifica se serve intervento manuale."""
iteration = state.get("iteration_count", 0)
confidence = state.get("overall_confidence", 0.0)
errors = state.get("errors", [])
if iteration >= 3 and confidence < 0.5:
raise NodeInterrupt(
f"Sistema bloccato dopo {iteration} iterazioni. "
f"Confidence: {confidence:.2f}. "
f"Errori: {len(errors)}. "
f"Intervento manuale richiesto."
)
if len(errors) > 5:
raise NodeInterrupt(
f"Troppi errori accumulati ({len(errors)}). "
f"Verificare la connettivita e i limiti API."
)
return "continue"
전개
연구 보조원을 생산에 사용할 수 있도록 컨테이너에 포장합니다. Docker를 사용하여 FastAPI를 통해 REST 서비스로 노출합니다. 이 아키텍처를 사용하면 확장이 가능합니다. 수평적으로 시스템을 기존 애플리케이션과 통합하고 성능을 모니터링합니다. 실시간.
FastAPI 래퍼
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from uuid import uuid4
import asyncio
app = FastAPI(title="Research Assistant API", version="1.0.0")
class ResearchRequest(BaseModel):
query: str
max_sources: int = 5
report_format: str = "markdown"
class ResearchResponse(BaseModel):
job_id: str
status: str
report: str | None = None
confidence: float | None = None
sources_count: int | None = None
errors: list[str] = []
jobs: dict[str, ResearchResponse] = {}
async def execute_research(job_id: str, request: ResearchRequest):
"""Esegue la ricerca in background."""
try:
config = {"configurable": {"thread_id": job_id}}
initial_state = {
"query": request.query,
"max_sources": request.max_sources,
"report_format": request.report_format,
"messages": [],
"sources": [],
"findings": [],
"overall_confidence": 0.0,
"report": None,
"errors": [],
"iteration_count": 0,
"needs_more_sources": False
}
result = await asyncio.to_thread(
research_app.invoke, initial_state, config
)
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="completed",
report=result.get("report"),
confidence=result.get("overall_confidence"),
sources_count=len(result.get("sources", [])),
errors=result.get("errors", [])
)
except Exception as e:
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="failed",
errors=[str(e)]
)
@app.post("/research", response_model=ResearchResponse)
async def start_research(request: ResearchRequest,
background_tasks: BackgroundTasks):
job_id = str(uuid4())
jobs[job_id] = ResearchResponse(
job_id=job_id, status="processing")
background_tasks.add_task(execute_research, job_id, request)
return jobs[job_id]
@app.get("/research/{job_id}", response_model=ResearchResponse)
async def get_research_status(job_id: str):
if job_id not in jobs:
raise HTTPException(status_code=404,
detail="Job not found")
return jobs[job_id]
도커 작성
version: "3.9"
services:
research-api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- TAVILY_API_KEY=${TAVILY_API_KEY}
- PINECONE_API_KEY=${PINECONE_API_KEY}
- PINECONE_INDEX=research-assistant
volumes:
- ./data:/app/data
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
depends_on:
- prometheus
volumes:
redis-data:
결과와 교훈
프로덕션의 시스템 측정항목
| 미터법 | Valore | 메모 |
|---|---|---|
| 보고서당 평균 시간 | 2~4분 | 쿼리의 복잡성과 소스 수에 따라 다릅니다. |
| 보고서 출처 | 5-12 | 검색당 평균 8개의 검증된 소스 |
| 평균 신뢰도 점수 | 0.74 | 기술 분야의 200개 연구 샘플 |
| 보고서당 비용 | $0.15 - $0.45 | Tavily API 호출을 포함하여 에이전트 3개 모두에 대한 GPT-4o |
| 대체 비율 | 12% | 둘 이상의 연구자-분석가 주기가 필요한 연구 |
| 수동 개입 | 3% | 검토가 필요한 신뢰도가 0.5보다 낮은 보고서 |
배운 교훈
- 도구 설명의 품질이 차별화 요소입니다.. 설명이 포함된 도구 모호하거나 모호하면 에이전트가 잘못된 선택을 하게 됩니다. 편집팀에 시간을 투자하세요 매개변수의 구체적인 예와 함께 정확한 설명을 제공하면 성능이 크게 향상됩니다.
- 오류 처리는 나중에 고려하는 것이 아니라 가장 중요한 문제입니다.. 에서 다중 에이전트 시스템에서는 한 에이전트에서 처리되지 않은 오류가 계단식으로 전파됩니다. 모든 도구 그래프의 각 노드에는 명시적인 대체 전략이 있어야 합니다.
- 공유 메모리는 진정한 차별화 요소입니다.. 지식 그래프가 없으면, 모든 검색은 처음부터 시작됩니다. 메모리를 사용하면 시스템이 점진적으로 향상됩니다. 이미 분석된 엔터티, 품질이 낮은 소스를 다시 방문하는 것을 방지하고 컨텍스트를 구축합니다. 이는 모든 새로운 분석을 풍성하게 합니다.
- 신뢰도 점수는 단순히 계산하는 것이 아니라 보정되어야 합니다.. 첫 번째 점수 그들은 너무 낙관적이었습니다. 도메인을 알 수 없는 출처에 대한 처벌을 도입했으며, 논쟁의 여지가 있는 주제에 대한 다중 상호 참조 및 주의 임계값에 대한 보너스.
- FinOps 전략을 통해 운영 비용을 관리할 수 있습니다.. GPT-4o-mini 사용 간단한 작업을 수행하는 연구원과 분석가 및 편집자를 위한 GPT-4o는 비용을 절감합니다. 품질에 큰 영향을 주지 않고 35%까지 향상됩니다.
결론
이 사례 연구에서 우리는 개념이 어떻게 이루어지는지 보여주는 완전한 다중 에이전트 시스템을 구축했습니다. 이전 12개 기사에서 살펴본 내용은 실제 애플리케이션에 통합됩니다. 자율적인 연구 어시스턴트는 학술적인 활동이 아닙니다. 다양한 도메인에 적용할 수 있는 아키텍처 패턴입니다. 재무 실사부터 경쟁 분석까지, 과학 연구부터 모니터링까지 시장의.
본 시스템 구축에서 나타난 핵심은 기획의 중심성이다. 도구 중에서 보조적인 측면이 아닌 구조적인 측면으로서 오류 관리의 중요성은 지속적인 개선을 위한 요소로서 공유 메모리의 가치. 모든 건축적 결정은 에이전트 수부터 모델 선택까지 구체적인 지표와 요구 사항을 따라야 합니다. 실제 사업의.
시리즈의 다음 기사이자 마지막 기사에서는 "AI 에이전트의 미래: 새로운 기능, AGI 및 현재의 한계", 우리는 그들이 어디로 가고 있는지 탐색하기 위해 코드에서 찾아볼 것입니다. AI 에이전트. 현재의 한계, 새로운 역량, 연구 방향을 분석하겠습니다. 빠르게 발전하는 이 기술의 가장 유망하고 경제적, 윤리적 의미를 담고 있습니다.







