Giriş: Otonom Bir Araştırma Sistemi Oluşturmak
Yapay zeka ajanları hakkındaki serinin bu on üçüncü makalesinde, teoriden pratiğe geçiyoruz. eksiksiz bir sistem: a Otonom Araştırma Asistanı üç uzman temsilciden oluşur Yapılandırılmış ve doğrulanmış araştırma raporları üretmek için işbirliği yapanlar. Bu vaka çalışması hepsini birleştiriyor önceki on iki makalede incelenen kavramlar: çok aracılı düzenleme, paylaşılan bellek, gelişmiş araç çağırma, test etme, güvenlik, FinOps ve dağıtım.
Karşılaştığımız sorun kurumsal dünyada somut ve yaygın: Bir analistin Web'deki birden fazla kaynaktan bilgi alın, güvenilirliğini doğrulayın, ilgili içgörüleri çıkarın ve yapılandırılmış bir rapor hazırlayın. Manuel olarak gerçekleştirilen bu işlem saatlerce çalışma gerektirir. bir ile iyi tasarlanmış çoklu ajan sistemi sayesinde, belirli bir seviyeyi korurken süreyi birkaç dakikaya indirebiliriz yüksek kalitede ve izlenebilir.
Sistemimiz kalıplarla koordine edilen üç aracılı bir mimariye dayanmaktadır. Sıralı e Dokunma orkestrasyonla ilgili makalelerde analiz ettiğimiz şey. Her temsilcinin bir rolü vardır hassas, özel araçlar ve iyi tanımlanmış bir girdi/çıktı sözleşmesi. İletişim aracılığıyla gerçekleşir kalıcı bir paylaşılan durum ve sistem, zenginleştirmek için RAG'yi (Geri Alma-Artırılmış Üretim) entegre eder Dahili belgelerle analiz.
Bu Makalede Neler Öğreneceksiniz?
- Sıralı ve Aktarma modelleriyle özerk arama için çok aracılı bir mimari nasıl tasarlanır
- Üç uzman aracının eksiksiz uygulanması: Araştırmacı, Analist ve Editör
- Analizi dahili belgelerle zenginleştirmek için RAG'ın Çam Kozalağı ile entegrasyonu
- Bilgi grafiği ile aracılar arasında paylaşılan hafızanın yönetimi
- Çok aracılı sistemler için hata işleme ve geri dönüş stratejileri
- Sistemi API aracılığıyla kullanıma sunmak için Docker Compose ve FastAPI ile dağıtım
- Sistemin üretimdeki performans, doğruluk ve maliyet metrikleri
Sistem Mimarisi
Araştırma Asistanımızın mimarisi sıralı bir boru hattında düzenlenmiş üç aracıya dayanmaktadır. Her etmen bir öncekinin çıktısını alır, bunu kendi özel araçlarıyla işler ve üretir. bir sonraki için yapılandırılmış bir çıktı. Aracılar arasında paylaşılan bir durum bağlamı korur Araştırmanın tamamlanması, her temsilcinin aşamalardan toplanan bilgilere erişmesine olanak tanır emsaller.
Çok Aracılı Sistem Akışı
| Faz | Ajan | Giriş | Çıkışlar | Aletler |
|---|---|---|---|---|
| 1 | Araştırmacı | Kullanıcı sorgusu + parametreler | Özetle doğrulanan kaynaklar | Web Arama, URL Kazıyıcı, Kaynak Doğrulayıcı |
| 2 | Analist | Doğrulanmış kaynaklar | Güven puanına sahip bulgular | Çapraz Referans, Anahtar Çıkarıcı, Çelişki Dedektörü |
| 3 | Editör | Yapılandırılmış bulgular | Biçimlendirilmiş nihai rapor | Şablon Motoru, Alıntı Formatlayıcı, Dışa Aktarma Oluşturucu |
Düzenleme Grafiği
Sistem düzenleme için LangGraph'ı kullanır. Ana grafik sıralı akışı tanımlar üç aracı arasında, hata durumlarını ve açıklama taleplerini yöneten koşullu kenarlar bulunur. Örneğin Analist, toplanan veriler aşağıdaki durumlarda Araştırmacıdan ek kaynaklar aramasını talep edebilir: yetersiz veya çelişkilidir.
+------------------+
| User Query |
+--------+---------+
|
v
+--------+---------+
| RESEARCHER |
| (Web Search, |
| URL Scraping, |
| Validation) |
+--------+---------+
|
Fonti validate
|
v
+--------+---------+
+---->| ANALYST |
| | (Cross-Ref, |
| | Extraction, |
| | Fact-Check) |
| +--------+---------+
| |
Richiesta| Findings + Score
fonti | |
aggiuntive v
| +--------+---------+
+-----| ROUTER |
| (Score > 0.7?) |
+--------+---------+
|
Score OK
|
v
+--------+---------+
| EDITOR |
| (Template, |
| Citations, |
| Export) |
+--------+---------+
|
v
+--------+---------+
| Report Finale |
+------------------+
Paylaşılan Durumun Tanımı
Paylaşılan durum, etmenler arasındaki iletişimin kalbidir. Verinin yapısını tanımlar
grafik boyunca akarak tip güvenliği ve izlenebilirlik sağlarlar. Bir kullanıyoruz
TypedDict Her durum alanını açıkça tanımlamak için.
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import add_messages
from langchain_core.messages import BaseMessage
class Source(TypedDict):
url: str
title: str
content: str
credibility_score: float
domain: str
extraction_date: str
class Finding(TypedDict):
claim: str
evidence: List[str]
source_urls: List[str]
confidence: float
category: str
contradictions: Optional[List[str]]
class ResearchState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
query: str
max_sources: int
sources: List[Source]
findings: List[Finding]
overall_confidence: float
report: Optional[str]
report_format: str
errors: List[str]
iteration_count: int
needs_more_sources: bool
Ajan 1: Araştırmacı
Araştırmacı, kaynakların toplanmasından sorumlu aracıdır. Kullanıcının sorgusunu alır, bunu arama alt sorgularına ayırır, Tavily API aracılığıyla web'de arama yapar, indirmeler yapar bulunan sayfaların içeriğini analiz eder ve her kaynağın güvenilirliğini doğrular. Çıktı, ilgili içeriğin özetiyle birlikte doğrulanmış kaynakların bir listesidir.
Araçların Tanımı
Araştırmacının üç özel aracı vardır: biri web araştırması için, diğeri kazıma için sayfaların içeriğinin değerlendirilmesi ve kaynakların güvenilirliğinin doğrulanması için bir tane.
from langchain_core.tools import tool
from tavily import TavilyClient
import httpx
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import json
tavily_client = TavilyClient(api_key="tvly-...")
@tool
def web_search(query: str, max_results: int = 5) -> str:
"""Cerca informazioni sul web per una query specifica.
Args:
query: La query di ricerca da eseguire
max_results: Numero massimo di risultati (default: 5)
Returns:
JSON con i risultati della ricerca inclusi URL, titolo e snippet
"""
response = tavily_client.search(
query=query,
max_results=max_results,
search_depth="advanced",
include_raw_content=True,
include_domains=["arxiv.org", "github.com", "medium.com",
"techcrunch.com", "reuters.com"]
)
results = []
for r in response.get("results", []):
results.append({
"url": r["url"],
"title": r["title"],
"snippet": r["content"][:500],
"raw_content": r.get("raw_content", "")[:2000],
"score": r.get("score", 0.0)
})
return json.dumps(results, indent=2)
@tool
def scrape_url(url: str) -> str:
"""Scarica e analizza il contenuto di una pagina web.
Args:
url: L'URL della pagina da analizzare
Returns:
Il testo estratto dalla pagina (max 3000 caratteri)
"""
try:
headers = {"User-Agent": "ResearchBot/1.0"}
response = httpx.get(url, headers=headers, timeout=15.0,
follow_redirects=True)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
return text[:3000]
except Exception as e:
return f"Errore durante lo scraping di {url}: {str(e)}"
@tool
def validate_source(url: str, title: str, content_snippet: str) -> str:
"""Valida la credibilita di una fonte basandosi su dominio e contenuto.
Args:
url: L'URL della fonte
title: Il titolo della pagina
content_snippet: Un estratto del contenuto
Returns:
JSON con il punteggio di credibilita e la motivazione
"""
domain = urlparse(url).netloc.lower()
high_credibility = ["arxiv.org", "nature.com", "science.org",
"ieee.org", "acm.org", "gov", ".edu"]
medium_credibility = ["github.com", "medium.com", "techcrunch.com",
"reuters.com", "bloomberg.com"]
score = 0.5
reasons = []
for hc in high_credibility:
if hc in domain:
score = 0.9
reasons.append(f"Dominio accademico/istituzionale: {domain}")
break
else:
for mc in medium_credibility:
if mc in domain:
score = 0.7
reasons.append(f"Dominio tecnico riconosciuto: {domain}")
break
if len(content_snippet) > 200:
score += 0.05
reasons.append("Contenuto sostanziale presente")
if title and len(title) > 10:
score += 0.02
reasons.append("Titolo descrittivo presente")
score = min(score, 1.0)
return json.dumps({
"url": url,
"credibility_score": round(score, 2),
"reasons": reasons,
"domain": domain
})
Ajan Araştırmacının Tanımı
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
researcher_llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
researcher_system_prompt = """Sei un Research Agent specializzato nella
raccolta di fonti attendibili dal web.
OBIETTIVO: Data una query di ricerca, trova e valida fonti di alta qualità.
PROCESSO:
1. Analizza la query e identifica 2-3 sotto-query specifiche
2. Per ogni sotto-query, usa web_search per trovare risultati
3. Per i risultati più promettenti, usa scrape_url per ottenere il
contenuto completo
4. Usa validate_source per verificare la credibilita di ogni fonte
5. Restituisci SOLO le fonti con credibility_score >= 0.6
OUTPUT: Produci un JSON con la lista delle fonti validate, ciascuna con:
- url, title, content (summary del contenuto rilevante)
- credibility_score, domain, extraction_date
NON inventare informazioni. Se non trovi fonti sufficienti, segnalalo."""
researcher_tools = [web_search, scrape_url, validate_source]
researcher_agent = create_react_agent(
model=researcher_llm,
tools=researcher_tools,
prompt=researcher_system_prompt
)
Ajan 2: Analist
Analist, Araştırmacı tarafından doğrulanan kaynakları alır ve bunları derinlemesine analiz eder. Onun hedefi temel bulguları çıkarmak, çapraz referanslama yoluyla kaynaklar arasındaki tutarlılığı doğrulamak ve herhangi bir çelişkiyi tanımlayın. Çıktı, yapılandırılmış bulguların bir listesidir; her biri onu destekleyen kaynakların sayısına ve kalitesine dayalı bir güven puanı ile.
Analist Aracı
from collections import Counter
import re
@tool
def cross_reference_check(claim: str, sources_json: str) -> str:
"""Verifica un'affermazione incrociando multiple fonti.
Args:
claim: L'affermazione da verificare
sources_json: JSON con le fonti da analizzare
Returns:
JSON con il risultato della verifica è il numero di fonti
che supportano, contraddicono o non menzionano l'affermazione
"""
sources = json.loads(sources_json)
supporting = []
contradicting = []
neutral = []
claim_keywords = set(claim.lower().split())
for source in sources:
content = source.get("content", "").lower()
keyword_matches = sum(1 for kw in claim_keywords
if kw in content)
match_ratio = keyword_matches / max(len(claim_keywords), 1)
if match_ratio > 0.6:
supporting.append(source["url"])
elif match_ratio > 0.3:
neutral.append(source["url"])
else:
contradicting.append(source["url"])
total = len(sources)
confidence = len(supporting) / max(total, 1)
return json.dumps({
"claim": claim,
"supporting_sources": supporting,
"contradicting_sources": contradicting,
"neutral_sources": neutral,
"confidence": round(confidence, 2),
"verdict": "confirmed" if confidence > 0.6
else "uncertain" if confidence > 0.3
else "unverified"
})
@tool
def extract_key_findings(content: str, topic: str) -> str:
"""Estrae i finding principali da un testo rispetto a un topic.
Args:
content: Il testo da analizzare
topic: L'argomento di riferimento per l'estrazione
Returns:
JSON con i finding estratti e la loro rilevanza
"""
sentences = re.split(r'[.!?]+', content)
topic_keywords = set(topic.lower().split())
findings = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 20:
continue
words = set(sentence.lower().split())
relevance = len(words.intersection(topic_keywords))
relevance = relevance / max(len(topic_keywords), 1)
if relevance > 0.3:
findings.append({
"text": sentence[:200],
"relevance_score": round(relevance, 2)
})
findings.sort(key=lambda x: x["relevance_score"], reverse=True)
return json.dumps(findings[:10])
@tool
def detect_contradictions(findings_json: str) -> str:
"""Identifica contraddizioni tra i findings raccolti.
Args:
findings_json: JSON con i findings da analizzare
Returns:
JSON con le coppie di findings potenzialmente in contraddizione
"""
findings = json.loads(findings_json)
contradictions = []
negation_words = {"not", "no", "never", "non", "without",
"unlike", "contrary", "however", "but",
"decrease", "decline", "drop", "reduce"}
for i, f1 in enumerate(findings):
for f2 in findings[i+1:]:
text1 = set(f1.get("text", "").lower().split())
text2 = set(f2.get("text", "").lower().split())
overlap = text1.intersection(text2)
neg_in_1 = bool(text1.intersection(negation_words))
neg_in_2 = bool(text2.intersection(negation_words))
if len(overlap) > 3 and neg_in_1 != neg_in_2:
contradictions.append({
"finding_1": f1.get("text", "")[:100],
"finding_2": f2.get("text", "")[:100],
"shared_keywords": list(overlap)[:5],
"severity": "high" if len(overlap) > 5 else "medium"
})
return json.dumps(contradictions)
Temsilci Analisti tanımı
analyst_llm = ChatOpenAI(model="gpt-4o", temperature=0.0)
analyst_system_prompt = """Sei un Analyst Agent specializzato nel fact-checking
e nell'estrazione di insight da fonti multiple.
OBIETTIVO: Analizzare le fonti fornite dal Researcher, estrarre i finding
principali e verificarne la coerenza.
PROCESSO:
1. Per ogni fonte, usa extract_key_findings per estrarre i punti chiave
2. Usa cross_reference_check per verificare ogni finding chiave
3. Usa detect_contradictions per identificare incoerenze
4. Assegna un confidence score complessivo ai findings
OUTPUT: Produci un JSON con:
- findings: lista di finding con claim, evidence, source_urls, confidence
- overall_confidence: media pesata dei confidence score
- contradictions: lista di contraddizioni trovate
- recommendation: "proceed" se confidence > 0.7, "needs_more_sources" altrimenti
Sii critico e rigoroso. Non dare per scontato nulla."""
analyst_tools = [cross_reference_check, extract_key_findings,
detect_contradictions]
analyst_agent = create_react_agent(
model=analyst_llm,
tools=analyst_tools,
prompt=analyst_system_prompt
)
Ajan 3: Editör
Editör, nihai raporun hazırlanmasından sorumlu olan temsilcidir. Yapılandırılmış bulguları alır Analistten alır, bunları mantıksal bir yapı halinde düzenler, alıntıları akademik standartlara göre biçimlendirir raporu kullanıcının istediği formatta (Markdown, HTML veya JSON) oluşturur. Editör icat etmez İçerikler: Toplanan bilgilerin yapılandırılması, bağlamsallaştırılması ve okunabilir hale getirilmesiyle kendisini sınırlar. ve önceki acenteler tarafından doğrulandı.
Düzenleyici Araçları
from datetime import datetime
@tool
def apply_report_template(findings_json: str, query: str,
template_type: str = "executive") -> str:
"""Applica un template di report ai findings strutturati.
Args:
findings_json: JSON con i findings da formattare
query: La query originale dell'utente
template_type: Tipo di template (executive, technical, brief)
Returns:
Report formattato in Markdown
"""
findings = json.loads(findings_json)
date = datetime.now().strftime("%Y-%m-%d")
if template_type == "executive":
sections = [
f"# Research Report: {query}",
f"*Generated on {date}*\n",
"## Executive Summary\n",
"## Key Findings\n",
"## Detailed Analysis\n",
"## Sources and References\n",
"## Methodology\n",
"---",
"*This report was generated by an AI Research Assistant.*"
]
elif template_type == "technical":
sections = [
f"# Technical Analysis: {query}",
f"*Report Date: {date}*\n",
"## Abstract\n",
"## Data Sources\n",
"## Analysis\n",
"## Results\n",
"## Limitations\n",
"## References\n"
]
else:
sections = [
f"# Brief: {query}",
f"*{date}*\n",
"## Summary\n",
"## Key Points\n",
"## Sources\n"
]
return "\n".join(sections)
@tool
def format_citations(sources_json: str,
style: str = "apa") -> str:
"""Formatta le citazioni delle fonti secondo uno standard accademico.
Args:
sources_json: JSON con le fonti da citare
style: Stile di citazione (apa, chicago, ieee)
Returns:
Lista di citazioni formattate
"""
sources = json.loads(sources_json)
citations = []
for i, source in enumerate(sources, 1):
title = source.get("title", "Untitled")
url = source.get("url", "")
domain = source.get("domain", "")
date = source.get("extraction_date",
datetime.now().strftime("%Y-%m-%d"))
if style == "apa":
citation = (f"[{i}] {domain}. ({date}). "
f"*{title}*. Retrieved from {url}")
elif style == "ieee":
citation = (f"[{i}] \"{title},\" {domain}, "
f"{date}. [Online]. Available: {url}")
else:
citation = f"[{i}] {title}. {url} ({date})"
citations.append(citation)
return "\n".join(citations)
@tool
def generate_export(report_markdown: str,
output_format: str = "markdown") -> str:
"""Genera l'export finale del report nel formato richiesto.
Args:
report_markdown: Il report in formato Markdown
output_format: Formato di output (markdown, html, json)
Returns:
Il report nel formato specificato
"""
if output_format == "markdown":
return report_markdown
elif output_format == "html":
lines = report_markdown.split("\n")
html_lines = []
for line in lines:
if line.startswith("# "):
html_lines.append(f"<h1>{line[2:]}</h1>")
elif line.startswith("## "):
html_lines.append(f"<h2>{line[3:]}</h2>")
elif line.startswith("- "):
html_lines.append(f"<li>{line[2:]}</li>")
elif line.strip():
html_lines.append(f"<p>{line}</p>")
return "\n".join(html_lines)
elif output_format == "json":
return json.dumps({"report": report_markdown,
"format": output_format,
"generated_at": datetime.now().isoformat()})
return report_markdown
Editör Aracısını Tanımlama
editor_llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
editor_system_prompt = """Sei un Editor Agent specializzato nella
produzione di report professionali e ben strutturati.
OBIETTIVO: Trasformare i findings dell'Analyst in un report leggibile
e citato correttamente.
PROCESSO:
1. Usa apply_report_template per creare la struttura del report
2. Popola ogni sezione con i findings pertinenti
3. Usa format_citations per generare la bibliografia
4. Usa generate_export per produrre il formato finale
REGOLE DI STILE:
- Scrivi in modo chiaro e professionale
- Ogni affermazione deve avere una citazione [N]
- Evidenzia il livello di confidenza per ogni finding
- Segnala esplicitamente le aree di incertezza
- Non inventare MAI dati o citazioni non presenti nei findings
OUTPUT: Il report completo nel formato richiesto."""
editor_tools = [apply_report_template, format_citations,
generate_export]
editor_agent = create_react_agent(
model=editor_llm,
tools=editor_tools,
prompt=editor_system_prompt
)
LangGraph ile Düzenleme
Şimdi üç aracıyı LangGraph grafiğinde toplayalım. Grafik sıralı akışı tanımlar Analist ile Editör arasında genel güven puanının olup olmadığını kontrol eden bir yönlendirici düğüm bulunur. raporun oluşturulmasına devam etmek için yeterlidir veya ek kaynaklara ihtiyaç duyulur.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
def run_researcher(state: ResearchState) -> dict:
"""Nodo Researcher: raccoglie e valida le fonti."""
query = state["query"]
max_sources = state.get("max_sources", 5)
result = researcher_agent.invoke({
"messages": [("human",
f"Cerca fonti per: {query}. "
f"Trova almeno {max_sources} fonti attendibili.")]
})
last_message = result["messages"][-1].content
try:
sources = json.loads(last_message)
except json.JSONDecodeError:
sources = []
return {
"sources": sources,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": result["messages"]
}
def run_analyst(state: ResearchState) -> dict:
"""Nodo Analyst: analizza le fonti e produce findings."""
sources = state["sources"]
result = analyst_agent.invoke({
"messages": [("human",
f"Analizza queste fonti e produci findings strutturati:\n"
f"{json.dumps(sources, indent=2)}")]
})
last_message = result["messages"][-1].content
try:
analysis = json.loads(last_message)
findings = analysis.get("findings", [])
confidence = analysis.get("overall_confidence", 0.0)
needs_more = analysis.get("recommendation") == "needs_more_sources"
except json.JSONDecodeError:
findings = []
confidence = 0.0
needs_more = True
return {
"findings": findings,
"overall_confidence": confidence,
"needs_more_sources": needs_more,
"messages": result["messages"]
}
def run_editor(state: ResearchState) -> dict:
"""Nodo Editor: genera il report finale."""
findings = state["findings"]
query = state["query"]
report_format = state.get("report_format", "markdown")
result = editor_agent.invoke({
"messages": [("human",
f"Genera un report per la query '{query}' "
f"basato su questi findings:\n"
f"{json.dumps(findings, indent=2)}\n"
f"Formato richiesto: {report_format}")]
})
report = result["messages"][-1].content
return {
"report": report,
"messages": result["messages"]
}
def should_continue_or_edit(state: ResearchState) -> str:
"""Router: decide se procedere all'Editor o tornare al Researcher."""
if (state.get("needs_more_sources", False)
and state.get("iteration_count", 0) < 3):
return "researcher"
return "editor"
# Costruzione del grafo
graph = StateGraph(ResearchState)
graph.add_node("researcher", run_researcher)
graph.add_node("analyst", run_analyst)
graph.add_node("editor", run_editor)
graph.add_edge(START, "researcher")
graph.add_edge("researcher", "analyst")
graph.add_conditional_edges(
"analyst",
should_continue_or_edit,
{"researcher": "researcher", "editor": "editor"}
)
graph.add_edge("editor", END)
memory = MemorySaver()
research_app = graph.compile(checkpointer=memory)
Bellek Entegrasyonu
Sistemimizin önemli bir yönü, paylaşılan hafızanın yönetimidir. Üç ajan çalışıyor aynı durumda, ancak aynı zamanda sisteme izin veren uzun süreli belleğe de ihtiyacımız var. önceki araştırmalardan ders almak, halihazırda analiz edilmiş olan kaynakları tekrar ziyaret etmekten kaçınmak ve aşamalı olarak etki alanının bilgi grafiği.
Paylaşılan Bilgi Grafiği
Aramalardan çıkarılan varlıkları izleyen basit bir bilgi grafiği uyguluyoruz, aralarındaki ilişkiler ve doğrulanmış gerçekler. Bu yapıya Analist tarafından danışılır analizleri tarihsel bağlamla zenginleştirmek ve Editörden çapraz referanslar eklemek için.
from typing import Dict, Set, Tuple
import sqlite3
class KnowledgeGraph:
"""Knowledge graph persistente per il Research Assistant."""
def __init__(self, db_path: str = "research_kg.db"):
self.conn = sqlite3.connect(db_path)
self._create_tables()
def _create_tables(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
entity_type TEXT,
first_seen TEXT,
mention_count INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS relations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subject_id INTEGER REFERENCES entities(id),
predicate TEXT,
object_id INTEGER REFERENCES entities(id),
confidence REAL,
source_url TEXT
);
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
statement TEXT,
confidence REAL,
verified_by INTEGER DEFAULT 0,
source_urls TEXT,
created_at TEXT
);
""")
self.conn.commit()
def add_entity(self, name: str, entity_type: str):
self.conn.execute("""
INSERT INTO entities (name, entity_type, first_seen)
VALUES (?, ?, datetime('now'))
ON CONFLICT(name) DO UPDATE
SET mention_count = mention_count + 1
""", (name, entity_type))
self.conn.commit()
def add_relation(self, subject: str, predicate: str,
obj: str, confidence: float,
source_url: str = ""):
self.add_entity(subject, "auto")
self.add_entity(obj, "auto")
sub_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(subject,)).fetchone()[0]
obj_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(obj,)).fetchone()[0]
self.conn.execute("""
INSERT INTO relations
(subject_id, predicate, object_id, confidence, source_url)
VALUES (?, ?, ?, ?, ?)
""", (sub_id, predicate, obj_id, confidence, source_url))
self.conn.commit()
def query_entity(self, name: str) -> Dict:
entity = self.conn.execute(
"SELECT * FROM entities WHERE name LIKE ?",
(f"%{name}%",)).fetchone()
if not entity:
return {}
relations = self.conn.execute("""
SELECT e2.name, r.predicate, r.confidence
FROM relations r
JOIN entities e2 ON r.object_id = e2.id
WHERE r.subject_id = ?
""", (entity[0],)).fetchall()
return {
"name": entity[1],
"type": entity[2],
"mentions": entity[4],
"relations": [{"target": r[0], "predicate": r[1],
"confidence": r[2]} for r in relations]
}
RAG Entegrasyonu
Araştırma Asistanımız, izin veren bir RAG (Geri Alma-Artırılmış Üretim) sistemini entegre eder. Analistin bir vektör veritabanında arşivlenen dahili belgelere ve önceki raporlara bakması. Pinecone'u vektör deposu olarak ve OpenAI'yi yerleştirme oluşturma için kullanıyoruz.
Alma hattı
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = PineconeVectorStore(
index_name="research-assistant",
embedding=embeddings,
namespace="documents"
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " "]
)
def index_document(content: str, metadata: dict):
"""Indicizza un documento nel vector store."""
chunks = text_splitter.split_text(content)
documents = [
Document(page_content=chunk, metadata={
**metadata,
"chunk_index": i,
"total_chunks": len(chunks)
})
for i, chunk in enumerate(chunks)
]
vectorstore.add_documents(documents)
def retrieve_relevant(query: str, top_k: int = 5) -> list:
"""Recupera i documenti più rilevanti per una query."""
results = vectorstore.similarity_search_with_score(
query, k=top_k
)
return [
{
"content": doc.page_content,
"metadata": doc.metadata,
"similarity_score": round(float(score), 3)
}
for doc, score in results
]
@tool
def search_internal_knowledge(query: str,
max_results: int = 5) -> str:
"""Cerca nei documenti interni e nei report precedenti.
Args:
query: La query di ricerca
max_results: Numero massimo di risultati
Returns:
JSON con i documenti rilevanti trovati nel knowledge base
"""
results = retrieve_relevant(query, top_k=max_results)
return json.dumps(results, indent=2)
Hata İşleme ve Geri Dönüş
Çok etmenli bir sistemde hata yönetimi kritik bir husustur. Her ajan başarısız olabilir farklı nedenlerden dolayı: ağ zaman aşımları, kullanılamayan API'ler, ayrıştırılamayan içerik, hız sınırları. Sistemimiz geri dönüş stratejilerini üç düzeyde uygular: araç düzeyi, aracı düzeyi ve sistem düzeyinde.
Üç Seviyeli Geri Dönüş Stratejisi
- Araç seviyesi: her araç dahili olarak kendi hatalarını try/hariç ile yönetir, İstisnaları yaymak yerine yapılandırılmış hata mesajlarını döndürmek. Eğer bir kaynak ulaşılamıyorsa araç, HTTP koduyla birlikte bir hata ve bir ipucu döndürür.
- Aracı düzeyinde: Bir aracı görevini tamamlayamazsa yönlendirici akışı yönlendirebilir. Araştırmacı, ilk kaynaklar mevcut değilse alternatif kaynaklar arar mevcut. Analist çok fazla çelişki bulursa ek kaynaklar ister.
- Sistem düzeyinde: grafiğin maksimum yineleme sınırı vardır (3 döngü) Araştırmacı-Analist). 3 tekrardan sonra güven puanı 0,5'in altında kalırsa sistem Ancak sonuçların güvenilirliğinin düşük olduğuna dair açık bir uyarı içeren bir rapor oluşturur. ve manuel müdahaleyi önerir.
from langgraph.errors import NodeInterrupt
def run_researcher_with_fallback(state: ResearchState) -> dict:
"""Researcher con gestione errori e fallback."""
max_retries = 2
errors = list(state.get("errors", []))
for attempt in range(max_retries):
try:
result = run_researcher(state)
if not result.get("sources"):
errors.append(
f"Attempt {attempt+1}: No sources found"
)
continue
result["errors"] = errors
return result
except Exception as e:
errors.append(
f"Attempt {attempt+1}: {type(e).__name__}: {str(e)}"
)
return {
"sources": [],
"errors": errors,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": state.get("messages", [])
}
def manual_intervention_check(state: ResearchState) -> str:
"""Verifica se serve intervento manuale."""
iteration = state.get("iteration_count", 0)
confidence = state.get("overall_confidence", 0.0)
errors = state.get("errors", [])
if iteration >= 3 and confidence < 0.5:
raise NodeInterrupt(
f"Sistema bloccato dopo {iteration} iterazioni. "
f"Confidence: {confidence:.2f}. "
f"Errori: {len(errors)}. "
f"Intervento manuale richiesto."
)
if len(errors) > 5:
raise NodeInterrupt(
f"Troppi errori accumulati ({len(errors)}). "
f"Verificare la connettivita e i limiti API."
)
return "continue"
Dağıtım
Araştırma Görevlimizi üretime uygun hale getirmek için bir kap içerisinde paketliyoruz. Docker'ı oluşturun ve bunu FastAPI aracılığıyla bir REST hizmeti olarak kullanıma sunun. Bu mimari ölçeklendirmenize olanak tanır yatay olarak sistemi mevcut uygulamalarla entegre edin ve performansı izleyin gerçek zamanlı.
FastAPI Sarıcı
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from uuid import uuid4
import asyncio
app = FastAPI(title="Research Assistant API", version="1.0.0")
class ResearchRequest(BaseModel):
query: str
max_sources: int = 5
report_format: str = "markdown"
class ResearchResponse(BaseModel):
job_id: str
status: str
report: str | None = None
confidence: float | None = None
sources_count: int | None = None
errors: list[str] = []
jobs: dict[str, ResearchResponse] = {}
async def execute_research(job_id: str, request: ResearchRequest):
"""Esegue la ricerca in background."""
try:
config = {"configurable": {"thread_id": job_id}}
initial_state = {
"query": request.query,
"max_sources": request.max_sources,
"report_format": request.report_format,
"messages": [],
"sources": [],
"findings": [],
"overall_confidence": 0.0,
"report": None,
"errors": [],
"iteration_count": 0,
"needs_more_sources": False
}
result = await asyncio.to_thread(
research_app.invoke, initial_state, config
)
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="completed",
report=result.get("report"),
confidence=result.get("overall_confidence"),
sources_count=len(result.get("sources", [])),
errors=result.get("errors", [])
)
except Exception as e:
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="failed",
errors=[str(e)]
)
@app.post("/research", response_model=ResearchResponse)
async def start_research(request: ResearchRequest,
background_tasks: BackgroundTasks):
job_id = str(uuid4())
jobs[job_id] = ResearchResponse(
job_id=job_id, status="processing")
background_tasks.add_task(execute_research, job_id, request)
return jobs[job_id]
@app.get("/research/{job_id}", response_model=ResearchResponse)
async def get_research_status(job_id: str):
if job_id not in jobs:
raise HTTPException(status_code=404,
detail="Job not found")
return jobs[job_id]
Docker Oluşturma
version: "3.9"
services:
research-api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- TAVILY_API_KEY=${TAVILY_API_KEY}
- PINECONE_API_KEY=${PINECONE_API_KEY}
- PINECONE_INDEX=research-assistant
volumes:
- ./data:/app/data
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
depends_on:
- prometheus
volumes:
redis-data:
Sonuçlar ve Alınan Dersler
Üretimde Sistem Metrikleri
| Metrik | Değer | Notlar |
|---|---|---|
| Rapor başına ortalama süre | 2-4 dakika | Sorgunun karmaşıklığına ve kaynak sayısına bağlıdır |
| Rapor kaynakları | 5-12 | Arama başına ortalama 8 doğrulanmış kaynak |
| Ortalama güven puanı | 0,74 | Teknoloji alanında 200 araştırmadan oluşan bir örnek üzerinde |
| Rapor başına maliyet | 0,15 ABD Doları - 0,45 ABD Doları | Tavily API çağrıları da dahil olmak üzere 3 temsilcinin tümü için GPT-4o |
| Geri çekilme oranı | %12 | Birden fazla Araştırmacı-Analist döngüsü gerektiren araştırmalar |
| Manuel müdahaleler | 3% | Güvenirliği 0,5'ten düşük olan ve inceleme gerektiren raporlar |
Öğrenilen Dersler
- Araç açıklamalarının kalitesi farklılaştırıcı faktördür. Açıklamaları olan araç Belirsiz veya muğlaklık, temsilcinin yanlış seçimler yapmasına neden olur. Editör ekibine zaman ayırın Parametrelerin somut örnekleriyle birlikte kesin açıklamaların yapılması performansı büyük ölçüde artırır.
- Hata yönetimi birinci sınıf bir konudur, sonradan akla gelen bir düşünce değildir. bir Çok aracılı sistemde, bir aracıdaki işlenmeyen bir hata art arda yayılır. Her araç ve grafikteki her düğümün açık bir geri dönüş stratejisi olması gerekir.
- Paylaşılan hafıza gerçek farklılaştırıcıdır. Bilgi grafiği olmadan, her arama sıfırdan başlar. Bellek sayesinde sistem giderek gelişir: tanır Halihazırda analiz edilmiş varlıklar, düşük kaliteli kaynakları tekrar ziyaret etmekten kaçınır ve bağlam oluşturur bu da her yeni analizi zenginleştirir.
- Güven puanı yalnızca hesaplanmamalı, kalibre edilmelidir. İlk puanlar fazla iyimserdiler. Etki alanı bilinmeyen kaynaklar için cezalar uyguladık, Çoklu çapraz referanslar için bonuslar ve tartışmalı konular için uyarı eşikleri.
- İşletme maliyeti FinOps stratejileriyle yönetilebilir. GPT-4o-mini'yi kullanın Araştırmacı (daha basit görevleri yapan) için ve Analist ve Editör için GPT-4o maliyeti azaltır Kalite üzerinde önemli bir etki olmaksızın %35 oranında.
Sonuçlar
Bu vaka çalışmasında kavramların nasıl çalıştığını gösteren eksiksiz bir çoklu etmen sistemi oluşturduk. Önceki on iki makalede incelenen yöntemler, gerçek dünyadaki bir uygulamaya entegre edilmektedir. Otonom Araştırma Asistan akademik bir çalışma değildir; çok sayıda alana uygulanabilen mimari bir modeldir. mali durum tespitinden rekabet analizine, bilimsel araştırmalardan izlemeye kadar pazarın.
Bu sistemin inşasında ortaya çıkan kilit noktalar planlamanın merkeziliğidir. Araçlar arasında, hata yönetiminin yardımcı değil yapısal bir yön olarak önemi, sürekli iyileştirme faktörü olarak paylaşılan hafızanın değeri. Her mimari karar, Temsilci sayısından model seçimine kadar her şeyin somut ölçümler ve gereksinimler tarafından yönlendirilmesi gerekir gerçek işletmelerin.
Serinin bir sonraki ve son makalesinde, "Yapay Zeka Ajanlarının Geleceği: Gelişen Yetenekler, YGZ ve Mevcut Sınırlamalar", nereye gittiklerini keşfetmek için koddan yukarı bakacağız AI ajanları. Mevcut sınırlamaları, ortaya çıkan yetenekleri ve araştırma yönlerini analiz edeceğiz Bu hızla gelişen teknolojinin en umut verici ve ekonomik ve etik sonuçları.







