Introducere: Construirea unui sistem autonom de cercetare
În acest al treisprezecelea articol al seriei despre agenții AI, trecem de la teorie la practică prin construcție un sistem complet: a Asistent de cercetare autonom formată din trei agenţi specializaţi care colaborează pentru a produce rapoarte de cercetare structurate și verificate. Acest studiu de caz le integrează pe toate conceptele explorate în cele douăsprezece articole anterioare: orchestrare multi-agent, memorie partajată, apelare instrumente avansate, testare, securitate, FinOps și implementare.
Problema cu care ne confruntăm este concretă și răspândită în lumea întreprinderilor: un analist trebuie să colecteze informații din mai multe surse de pe web, verificați fiabilitatea acestora, extrageți informații relevante și elaborarea unui raport structurat. Acest proces, efectuat manual, necesită ore de muncă. Cu a sistem multi-agent bine conceput, putem reduce timpul la câteva minute menținând în același timp un nivel de înaltă calitate și urmăribile.
Sistemul nostru se bazează pe o arhitectură cu trei agenți coordonată prin modele Secvenţial e Ia mâna pe care le-am analizat în articolele despre orchestrare. Fiecare agent are un rol instrumente precise, dedicate și un contract de intrare/ieșire bine definit. Comunicarea are loc prin o stare partajată persistentă, iar sistemul integrează RAG (Retrieval-Augmented Generation) pentru a îmbogăți analiza cu documente interne.
Ce veți învăța în acest articol
- Cum să proiectați o arhitectură multi-agenți pentru căutarea autonomă cu modele secvențiale și Handoff
- Implementarea completa a trei agenti specializati: Cercetator, Analist si Editor
- Integrarea RAG cu Pinecone pentru a îmbogăți analiza cu documente interne
- Gestionarea memoriei partajate între agenți cu grafic de cunoștințe
- Strategii de gestionare a erorilor și de rezervă pentru sisteme multi-agenți
- Implementare cu Docker Compose și FastAPI pentru a expune sistemul prin API
- Performanța, acuratețea și măsurarea costurilor sistemului în producție
Arhitectura sistemului
Arhitectura asistentului nostru de cercetare se bazează pe trei agenți aranjați într-o conductă secvențială. Fiecare agent primește rezultatul celui precedent, îl prelucrează cu propriile sale instrumente specializate și produce o ieșire structurată pentru următorul. Între agenți, o stare comună menține contextul cercetare completă, permițând fiecărui agent să acceseze informațiile colectate din faze precedente.
Flux de sistem multi-agent
| Fază | Agent | Intrare | Ieșiri | Instrumente |
|---|---|---|---|---|
| 1 | Cercetător | Interogare utilizator + parametri | Surse validate cu rezumat | Căutare pe web, URL Scraper, Validator sursă |
| 2 | Analist | Surse validate | Constatări cu scor de încredere | Referință încrucișată, Extractor de chei, Detector de contradicții |
| 3 | Editor | Constatări structurate | Raport final formatat | Motor de șabloane, Formatator de citate, Generator de export |
Graficul de orchestrație
Sistemul folosește LangGraph pentru orchestrare. Graficul principal definește fluxul secvenţial între cei trei agenți, cu margini condiționate care gestionează cazurile de eroare și solicitările de clarificare. Analistul, de exemplu, poate solicita Cercetătorului să caute surse suplimentare dacă datele colectate sunt insuficiente sau contradictorii.
+------------------+
| User Query |
+--------+---------+
|
v
+--------+---------+
| RESEARCHER |
| (Web Search, |
| URL Scraping, |
| Validation) |
+--------+---------+
|
Fonti validate
|
v
+--------+---------+
+---->| ANALYST |
| | (Cross-Ref, |
| | Extraction, |
| | Fact-Check) |
| +--------+---------+
| |
Richiesta| Findings + Score
fonti | |
aggiuntive v
| +--------+---------+
+-----| ROUTER |
| (Score > 0.7?) |
+--------+---------+
|
Score OK
|
v
+--------+---------+
| EDITOR |
| (Template, |
| Citations, |
| Export) |
+--------+---------+
|
v
+--------+---------+
| Report Finale |
+------------------+
Definiția statului comun
Starea partajată este inima comunicării dintre agenți. Definește structura datelor care
acestea curg prin grafic, asigurând siguranța tipului și trasabilitatea. Folosim a
TypedDict pentru a defini în mod explicit fiecare câmp de stat.
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import add_messages
from langchain_core.messages import BaseMessage
class Source(TypedDict):
url: str
title: str
content: str
credibility_score: float
domain: str
extraction_date: str
class Finding(TypedDict):
claim: str
evidence: List[str]
source_urls: List[str]
confidence: float
category: str
contradictions: Optional[List[str]]
class ResearchState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
query: str
max_sources: int
sources: List[Source]
findings: List[Finding]
overall_confidence: float
report: Optional[str]
report_format: str
errors: List[str]
iteration_count: int
needs_more_sources: bool
Agentul 1: Cercetatorul
Cercetatorul este agentul responsabil cu colectarea surselor. Primește interogarea utilizatorului, îl descompune în subinterogări de căutare, caută pe web prin API-ul Tavily, descărcări și analizează conținutul paginilor găsite și validează credibilitatea fiecărei surse. Rezultatul este o listă de surse validate cu un rezumat al conținutului relevant.
Definiţia Tools
Cercetatorul are trei instrumente specializate: unul pentru cercetarea web, unul pentru scraping a conţinutului paginilor şi unul pentru validarea credibilităţii surselor.
from langchain_core.tools import tool
from tavily import TavilyClient
import httpx
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import json
tavily_client = TavilyClient(api_key="tvly-...")
@tool
def web_search(query: str, max_results: int = 5) -> str:
"""Cerca informazioni sul web per una query specifica.
Args:
query: La query di ricerca da eseguire
max_results: Numero massimo di risultati (default: 5)
Returns:
JSON con i risultati della ricerca inclusi URL, titolo e snippet
"""
response = tavily_client.search(
query=query,
max_results=max_results,
search_depth="advanced",
include_raw_content=True,
include_domains=["arxiv.org", "github.com", "medium.com",
"techcrunch.com", "reuters.com"]
)
results = []
for r in response.get("results", []):
results.append({
"url": r["url"],
"title": r["title"],
"snippet": r["content"][:500],
"raw_content": r.get("raw_content", "")[:2000],
"score": r.get("score", 0.0)
})
return json.dumps(results, indent=2)
@tool
def scrape_url(url: str) -> str:
"""Scarica e analizza il contenuto di una pagina web.
Args:
url: L'URL della pagina da analizzare
Returns:
Il testo estratto dalla pagina (max 3000 caratteri)
"""
try:
headers = {"User-Agent": "ResearchBot/1.0"}
response = httpx.get(url, headers=headers, timeout=15.0,
follow_redirects=True)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
return text[:3000]
except Exception as e:
return f"Errore durante lo scraping di {url}: {str(e)}"
@tool
def validate_source(url: str, title: str, content_snippet: str) -> str:
"""Valida la credibilita di una fonte basandosi su dominio e contenuto.
Args:
url: L'URL della fonte
title: Il titolo della pagina
content_snippet: Un estratto del contenuto
Returns:
JSON con il punteggio di credibilita e la motivazione
"""
domain = urlparse(url).netloc.lower()
high_credibility = ["arxiv.org", "nature.com", "science.org",
"ieee.org", "acm.org", "gov", ".edu"]
medium_credibility = ["github.com", "medium.com", "techcrunch.com",
"reuters.com", "bloomberg.com"]
score = 0.5
reasons = []
for hc in high_credibility:
if hc in domain:
score = 0.9
reasons.append(f"Dominio accademico/istituzionale: {domain}")
break
else:
for mc in medium_credibility:
if mc in domain:
score = 0.7
reasons.append(f"Dominio tecnico riconosciuto: {domain}")
break
if len(content_snippet) > 200:
score += 0.05
reasons.append("Contenuto sostanziale presente")
if title and len(title) > 10:
score += 0.02
reasons.append("Titolo descrittivo presente")
score = min(score, 1.0)
return json.dumps({
"url": url,
"credibility_score": round(score, 2),
"reasons": reasons,
"domain": domain
})
Definiția Agent Researcher
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
researcher_llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
researcher_system_prompt = """Sei un Research Agent specializzato nella
raccolta di fonti attendibili dal web.
OBIETTIVO: Data una query di ricerca, trova e valida fonti di alta qualità.
PROCESSO:
1. Analizza la query e identifica 2-3 sotto-query specifiche
2. Per ogni sotto-query, usa web_search per trovare risultati
3. Per i risultati più promettenti, usa scrape_url per ottenere il
contenuto completo
4. Usa validate_source per verificare la credibilita di ogni fonte
5. Restituisci SOLO le fonti con credibility_score >= 0.6
OUTPUT: Produci un JSON con la lista delle fonti validate, ciascuna con:
- url, title, content (summary del contenuto rilevante)
- credibility_score, domain, extraction_date
NON inventare informazioni. Se non trovi fonti sufficienti, segnalalo."""
researcher_tools = [web_search, scrape_url, validate_source]
researcher_agent = create_react_agent(
model=researcher_llm,
tools=researcher_tools,
prompt=researcher_system_prompt
)
Agentul 2: Analistul
Analistul primește sursele validate de Cercetător și le analizează în profunzime. Scopul lui este de a extrage concluziile cheie, de a verifica coerența între surse prin referințe încrucișate și identifica orice contradicții. Rezultatul este o listă de constatări structurate, fiecare cu un scor de încredere bazat pe numărul și calitatea surselor care îl susțin.
Instrumentul pentru analist
from collections import Counter
import re
@tool
def cross_reference_check(claim: str, sources_json: str) -> str:
"""Verifica un'affermazione incrociando multiple fonti.
Args:
claim: L'affermazione da verificare
sources_json: JSON con le fonti da analizzare
Returns:
JSON con il risultato della verifica è il numero di fonti
che supportano, contraddicono o non menzionano l'affermazione
"""
sources = json.loads(sources_json)
supporting = []
contradicting = []
neutral = []
claim_keywords = set(claim.lower().split())
for source in sources:
content = source.get("content", "").lower()
keyword_matches = sum(1 for kw in claim_keywords
if kw in content)
match_ratio = keyword_matches / max(len(claim_keywords), 1)
if match_ratio > 0.6:
supporting.append(source["url"])
elif match_ratio > 0.3:
neutral.append(source["url"])
else:
contradicting.append(source["url"])
total = len(sources)
confidence = len(supporting) / max(total, 1)
return json.dumps({
"claim": claim,
"supporting_sources": supporting,
"contradicting_sources": contradicting,
"neutral_sources": neutral,
"confidence": round(confidence, 2),
"verdict": "confirmed" if confidence > 0.6
else "uncertain" if confidence > 0.3
else "unverified"
})
@tool
def extract_key_findings(content: str, topic: str) -> str:
"""Estrae i finding principali da un testo rispetto a un topic.
Args:
content: Il testo da analizzare
topic: L'argomento di riferimento per l'estrazione
Returns:
JSON con i finding estratti e la loro rilevanza
"""
sentences = re.split(r'[.!?]+', content)
topic_keywords = set(topic.lower().split())
findings = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 20:
continue
words = set(sentence.lower().split())
relevance = len(words.intersection(topic_keywords))
relevance = relevance / max(len(topic_keywords), 1)
if relevance > 0.3:
findings.append({
"text": sentence[:200],
"relevance_score": round(relevance, 2)
})
findings.sort(key=lambda x: x["relevance_score"], reverse=True)
return json.dumps(findings[:10])
@tool
def detect_contradictions(findings_json: str) -> str:
"""Identifica contraddizioni tra i findings raccolti.
Args:
findings_json: JSON con i findings da analizzare
Returns:
JSON con le coppie di findings potenzialmente in contraddizione
"""
findings = json.loads(findings_json)
contradictions = []
negation_words = {"not", "no", "never", "non", "without",
"unlike", "contrary", "however", "but",
"decrease", "decline", "drop", "reduce"}
for i, f1 in enumerate(findings):
for f2 in findings[i+1:]:
text1 = set(f1.get("text", "").lower().split())
text2 = set(f2.get("text", "").lower().split())
overlap = text1.intersection(text2)
neg_in_1 = bool(text1.intersection(negation_words))
neg_in_2 = bool(text2.intersection(negation_words))
if len(overlap) > 3 and neg_in_1 != neg_in_2:
contradictions.append({
"finding_1": f1.get("text", "")[:100],
"finding_2": f2.get("text", "")[:100],
"shared_keywords": list(overlap)[:5],
"severity": "high" if len(overlap) > 5 else "medium"
})
return json.dumps(contradictions)
Definiția agentului analist
analyst_llm = ChatOpenAI(model="gpt-4o", temperature=0.0)
analyst_system_prompt = """Sei un Analyst Agent specializzato nel fact-checking
e nell'estrazione di insight da fonti multiple.
OBIETTIVO: Analizzare le fonti fornite dal Researcher, estrarre i finding
principali e verificarne la coerenza.
PROCESSO:
1. Per ogni fonte, usa extract_key_findings per estrarre i punti chiave
2. Usa cross_reference_check per verificare ogni finding chiave
3. Usa detect_contradictions per identificare incoerenze
4. Assegna un confidence score complessivo ai findings
OUTPUT: Produci un JSON con:
- findings: lista di finding con claim, evidence, source_urls, confidence
- overall_confidence: media pesata dei confidence score
- contradictions: lista di contraddizioni trovate
- recommendation: "proceed" se confidence > 0.7, "needs_more_sources" altrimenti
Sii critico e rigoroso. Non dare per scontato nulla."""
analyst_tools = [cross_reference_check, extract_key_findings,
detect_contradictions]
analyst_agent = create_react_agent(
model=analyst_llm,
tools=analyst_tools,
prompt=analyst_system_prompt
)
Agentul 3: Editorul
Editorul este agentul responsabil cu realizarea raportului final. Primește constatări structurate de la Analist, le organizează într-o structură logică, formatează citatele conform standardelor academice și generează raportul în formatul solicitat de utilizator (Markdown, HTML sau JSON). Editorul nu inventează continuturi: se limiteaza la structurarea, contextualizarea si lizibila informatiilor colectate și verificate de agenți anteriori.
Instrumente editor
from datetime import datetime
@tool
def apply_report_template(findings_json: str, query: str,
template_type: str = "executive") -> str:
"""Applica un template di report ai findings strutturati.
Args:
findings_json: JSON con i findings da formattare
query: La query originale dell'utente
template_type: Tipo di template (executive, technical, brief)
Returns:
Report formattato in Markdown
"""
findings = json.loads(findings_json)
date = datetime.now().strftime("%Y-%m-%d")
if template_type == "executive":
sections = [
f"# Research Report: {query}",
f"*Generated on {date}*\n",
"## Executive Summary\n",
"## Key Findings\n",
"## Detailed Analysis\n",
"## Sources and References\n",
"## Methodology\n",
"---",
"*This report was generated by an AI Research Assistant.*"
]
elif template_type == "technical":
sections = [
f"# Technical Analysis: {query}",
f"*Report Date: {date}*\n",
"## Abstract\n",
"## Data Sources\n",
"## Analysis\n",
"## Results\n",
"## Limitations\n",
"## References\n"
]
else:
sections = [
f"# Brief: {query}",
f"*{date}*\n",
"## Summary\n",
"## Key Points\n",
"## Sources\n"
]
return "\n".join(sections)
@tool
def format_citations(sources_json: str,
style: str = "apa") -> str:
"""Formatta le citazioni delle fonti secondo uno standard accademico.
Args:
sources_json: JSON con le fonti da citare
style: Stile di citazione (apa, chicago, ieee)
Returns:
Lista di citazioni formattate
"""
sources = json.loads(sources_json)
citations = []
for i, source in enumerate(sources, 1):
title = source.get("title", "Untitled")
url = source.get("url", "")
domain = source.get("domain", "")
date = source.get("extraction_date",
datetime.now().strftime("%Y-%m-%d"))
if style == "apa":
citation = (f"[{i}] {domain}. ({date}). "
f"*{title}*. Retrieved from {url}")
elif style == "ieee":
citation = (f"[{i}] \"{title},\" {domain}, "
f"{date}. [Online]. Available: {url}")
else:
citation = f"[{i}] {title}. {url} ({date})"
citations.append(citation)
return "\n".join(citations)
@tool
def generate_export(report_markdown: str,
output_format: str = "markdown") -> str:
"""Genera l'export finale del report nel formato richiesto.
Args:
report_markdown: Il report in formato Markdown
output_format: Formato di output (markdown, html, json)
Returns:
Il report nel formato specificato
"""
if output_format == "markdown":
return report_markdown
elif output_format == "html":
lines = report_markdown.split("\n")
html_lines = []
for line in lines:
if line.startswith("# "):
html_lines.append(f"<h1>{line[2:]}</h1>")
elif line.startswith("## "):
html_lines.append(f"<h2>{line[3:]}</h2>")
elif line.startswith("- "):
html_lines.append(f"<li>{line[2:]}</li>")
elif line.strip():
html_lines.append(f"<p>{line}</p>")
return "\n".join(html_lines)
elif output_format == "json":
return json.dumps({"report": report_markdown,
"format": output_format,
"generated_at": datetime.now().isoformat()})
return report_markdown
Definirea Agentului Editor
editor_llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
editor_system_prompt = """Sei un Editor Agent specializzato nella
produzione di report professionali e ben strutturati.
OBIETTIVO: Trasformare i findings dell'Analyst in un report leggibile
e citato correttamente.
PROCESSO:
1. Usa apply_report_template per creare la struttura del report
2. Popola ogni sezione con i findings pertinenti
3. Usa format_citations per generare la bibliografia
4. Usa generate_export per produrre il formato finale
REGOLE DI STILE:
- Scrivi in modo chiaro e professionale
- Ogni affermazione deve avere una citazione [N]
- Evidenzia il livello di confidenza per ogni finding
- Segnala esplicitamente le aree di incertezza
- Non inventare MAI dati o citazioni non presenti nei findings
OUTPUT: Il report completo nel formato richiesto."""
editor_tools = [apply_report_template, format_citations,
generate_export]
editor_agent = create_react_agent(
model=editor_llm,
tools=editor_tools,
prompt=editor_system_prompt
)
Orchestrație cu LangGraph
Acum să adunăm cei trei agenți într-un grafic LangGraph. Graficul definește fluxul secvenţial cu un nod de router între analist și editor care verifică dacă scorul general de încredere este suficient pentru a continua cu generarea raportului sau dacă sunt necesare surse suplimentare.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
def run_researcher(state: ResearchState) -> dict:
"""Nodo Researcher: raccoglie e valida le fonti."""
query = state["query"]
max_sources = state.get("max_sources", 5)
result = researcher_agent.invoke({
"messages": [("human",
f"Cerca fonti per: {query}. "
f"Trova almeno {max_sources} fonti attendibili.")]
})
last_message = result["messages"][-1].content
try:
sources = json.loads(last_message)
except json.JSONDecodeError:
sources = []
return {
"sources": sources,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": result["messages"]
}
def run_analyst(state: ResearchState) -> dict:
"""Nodo Analyst: analizza le fonti e produce findings."""
sources = state["sources"]
result = analyst_agent.invoke({
"messages": [("human",
f"Analizza queste fonti e produci findings strutturati:\n"
f"{json.dumps(sources, indent=2)}")]
})
last_message = result["messages"][-1].content
try:
analysis = json.loads(last_message)
findings = analysis.get("findings", [])
confidence = analysis.get("overall_confidence", 0.0)
needs_more = analysis.get("recommendation") == "needs_more_sources"
except json.JSONDecodeError:
findings = []
confidence = 0.0
needs_more = True
return {
"findings": findings,
"overall_confidence": confidence,
"needs_more_sources": needs_more,
"messages": result["messages"]
}
def run_editor(state: ResearchState) -> dict:
"""Nodo Editor: genera il report finale."""
findings = state["findings"]
query = state["query"]
report_format = state.get("report_format", "markdown")
result = editor_agent.invoke({
"messages": [("human",
f"Genera un report per la query '{query}' "
f"basato su questi findings:\n"
f"{json.dumps(findings, indent=2)}\n"
f"Formato richiesto: {report_format}")]
})
report = result["messages"][-1].content
return {
"report": report,
"messages": result["messages"]
}
def should_continue_or_edit(state: ResearchState) -> str:
"""Router: decide se procedere all'Editor o tornare al Researcher."""
if (state.get("needs_more_sources", False)
and state.get("iteration_count", 0) < 3):
return "researcher"
return "editor"
# Costruzione del grafo
graph = StateGraph(ResearchState)
graph.add_node("researcher", run_researcher)
graph.add_node("analyst", run_analyst)
graph.add_node("editor", run_editor)
graph.add_edge(START, "researcher")
graph.add_edge("researcher", "analyst")
graph.add_conditional_edges(
"analyst",
should_continue_or_edit,
{"researcher": "researcher", "editor": "editor"}
)
graph.add_edge("editor", END)
memory = MemorySaver()
research_app = graph.compile(checkpointer=memory)
Integrarea memoriei
Un aspect crucial al sistemului nostru este gestionarea memoriei partajate. Cei trei agenți operează pe aceeași stare, dar avem nevoie și de memorie pe termen lung care să permită sistemului pentru a învăța din cercetările anterioare, evitați să revedeți sursele deja analizate și să construiți progresiv un grafic de cunoștințe al domeniului.
Graficul de cunoștințe partajate
Implementăm un grafic simplu de cunoștințe care urmărește entitățile extrase din căutări, relaţiile dintre ele şi faptele verificate. Această structură este consultată de analist pentru a îmbogăți analizele cu context istoric și de la Editor pentru a introduce referințe încrucișate.
from typing import Dict, Set, Tuple
import sqlite3
class KnowledgeGraph:
"""Knowledge graph persistente per il Research Assistant."""
def __init__(self, db_path: str = "research_kg.db"):
self.conn = sqlite3.connect(db_path)
self._create_tables()
def _create_tables(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
entity_type TEXT,
first_seen TEXT,
mention_count INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS relations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subject_id INTEGER REFERENCES entities(id),
predicate TEXT,
object_id INTEGER REFERENCES entities(id),
confidence REAL,
source_url TEXT
);
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
statement TEXT,
confidence REAL,
verified_by INTEGER DEFAULT 0,
source_urls TEXT,
created_at TEXT
);
""")
self.conn.commit()
def add_entity(self, name: str, entity_type: str):
self.conn.execute("""
INSERT INTO entities (name, entity_type, first_seen)
VALUES (?, ?, datetime('now'))
ON CONFLICT(name) DO UPDATE
SET mention_count = mention_count + 1
""", (name, entity_type))
self.conn.commit()
def add_relation(self, subject: str, predicate: str,
obj: str, confidence: float,
source_url: str = ""):
self.add_entity(subject, "auto")
self.add_entity(obj, "auto")
sub_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(subject,)).fetchone()[0]
obj_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(obj,)).fetchone()[0]
self.conn.execute("""
INSERT INTO relations
(subject_id, predicate, object_id, confidence, source_url)
VALUES (?, ?, ?, ?, ?)
""", (sub_id, predicate, obj_id, confidence, source_url))
self.conn.commit()
def query_entity(self, name: str) -> Dict:
entity = self.conn.execute(
"SELECT * FROM entities WHERE name LIKE ?",
(f"%{name}%",)).fetchone()
if not entity:
return {}
relations = self.conn.execute("""
SELECT e2.name, r.predicate, r.confidence
FROM relations r
JOIN entities e2 ON r.object_id = e2.id
WHERE r.subject_id = ?
""", (entity[0],)).fetchall()
return {
"name": entity[1],
"type": entity[2],
"mentions": entity[4],
"relations": [{"target": r[0], "predicate": r[1],
"confidence": r[2]} for r in relations]
}
Integrare RAG
Asistentul nostru de cercetare integrează un sistem RAG (Retrieval-Augmented Generation) care permite Analistul să consulte documentele interne și rapoartele anterioare arhivate într-o bază de date vectorială. Folosim Pinecone ca magazin de vectori și OpenAI pentru generarea de încorporare.
Conducta de recuperare
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = PineconeVectorStore(
index_name="research-assistant",
embedding=embeddings,
namespace="documents"
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " "]
)
def index_document(content: str, metadata: dict):
"""Indicizza un documento nel vector store."""
chunks = text_splitter.split_text(content)
documents = [
Document(page_content=chunk, metadata={
**metadata,
"chunk_index": i,
"total_chunks": len(chunks)
})
for i, chunk in enumerate(chunks)
]
vectorstore.add_documents(documents)
def retrieve_relevant(query: str, top_k: int = 5) -> list:
"""Recupera i documenti più rilevanti per una query."""
results = vectorstore.similarity_search_with_score(
query, k=top_k
)
return [
{
"content": doc.page_content,
"metadata": doc.metadata,
"similarity_score": round(float(score), 3)
}
for doc, score in results
]
@tool
def search_internal_knowledge(query: str,
max_results: int = 5) -> str:
"""Cerca nei documenti interni e nei report precedenti.
Args:
query: La query di ricerca
max_results: Numero massimo di risultati
Returns:
JSON con i documenti rilevanti trovati nel knowledge base
"""
results = retrieve_relevant(query, top_k=max_results)
return json.dumps(results, indent=2)
Gestionarea erorilor și alternativă
Într-un sistem multi-agent, tratarea erorilor este un aspect critic. Fiecare agent poate eșua din diferite motive: expirări ale rețelei, API-uri indisponibile, conținut care nu poate fi analizat, limite de rată. Sistemul nostru implementează strategii de rezervă la trei niveluri: la nivel de instrument, la nivel de agent și la nivel de sistem.
Strategie de rezervă pe trei niveluri
- Nivelul sculei: fiecare instrument tratează intern propriile erori cu try/except, returnarea mesajelor de eroare structurate în loc de propagarea excepțiilor. Dacă o sursă nu este accesibil, instrumentul returnează o eroare cu codul HTTP și un indiciu.
- La nivel de agent: Dacă un agent nu își finalizează sarcina, routerul poate redirecționa fluxul. Cercetătorul caută surse alternative dacă primele nu sunt disponibile disponibile. Analistul solicită surse suplimentare dacă găsește prea multe contradicții.
- La nivel de sistem: graficul are o limită maximă de iterații (3 cicluri Cercetător-Analist). Dacă după 3 iterații scorul de încredere rămâne sub 0,5, sistemul Cu toate acestea, generează un raport cu un avertisment explicit despre fiabilitatea scăzută a rezultatelor și sugerează intervenția manuală.
from langgraph.errors import NodeInterrupt
def run_researcher_with_fallback(state: ResearchState) -> dict:
"""Researcher con gestione errori e fallback."""
max_retries = 2
errors = list(state.get("errors", []))
for attempt in range(max_retries):
try:
result = run_researcher(state)
if not result.get("sources"):
errors.append(
f"Attempt {attempt+1}: No sources found"
)
continue
result["errors"] = errors
return result
except Exception as e:
errors.append(
f"Attempt {attempt+1}: {type(e).__name__}: {str(e)}"
)
return {
"sources": [],
"errors": errors,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": state.get("messages", [])
}
def manual_intervention_check(state: ResearchState) -> str:
"""Verifica se serve intervento manuale."""
iteration = state.get("iteration_count", 0)
confidence = state.get("overall_confidence", 0.0)
errors = state.get("errors", [])
if iteration >= 3 and confidence < 0.5:
raise NodeInterrupt(
f"Sistema bloccato dopo {iteration} iterazioni. "
f"Confidence: {confidence:.2f}. "
f"Errori: {len(errors)}. "
f"Intervento manuale richiesto."
)
if len(errors) > 5:
raise NodeInterrupt(
f"Troppi errori accumulati ({len(errors)}). "
f"Verificare la connettivita e i limiti API."
)
return "continue"
Desfăşurare
Pentru ca Asistentul nostru de cercetare să fie utilizabil în producție, îl ambalăm într-un container Docker și expuneți-l ca serviciu REST prin FastAPI. Această arhitectură vă permite să scalați pe orizontală, integrați sistemul cu aplicațiile existente și monitorizați performanța în timp real.
FastAPI Wrapper
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from uuid import uuid4
import asyncio
app = FastAPI(title="Research Assistant API", version="1.0.0")
class ResearchRequest(BaseModel):
query: str
max_sources: int = 5
report_format: str = "markdown"
class ResearchResponse(BaseModel):
job_id: str
status: str
report: str | None = None
confidence: float | None = None
sources_count: int | None = None
errors: list[str] = []
jobs: dict[str, ResearchResponse] = {}
async def execute_research(job_id: str, request: ResearchRequest):
"""Esegue la ricerca in background."""
try:
config = {"configurable": {"thread_id": job_id}}
initial_state = {
"query": request.query,
"max_sources": request.max_sources,
"report_format": request.report_format,
"messages": [],
"sources": [],
"findings": [],
"overall_confidence": 0.0,
"report": None,
"errors": [],
"iteration_count": 0,
"needs_more_sources": False
}
result = await asyncio.to_thread(
research_app.invoke, initial_state, config
)
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="completed",
report=result.get("report"),
confidence=result.get("overall_confidence"),
sources_count=len(result.get("sources", [])),
errors=result.get("errors", [])
)
except Exception as e:
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="failed",
errors=[str(e)]
)
@app.post("/research", response_model=ResearchResponse)
async def start_research(request: ResearchRequest,
background_tasks: BackgroundTasks):
job_id = str(uuid4())
jobs[job_id] = ResearchResponse(
job_id=job_id, status="processing")
background_tasks.add_task(execute_research, job_id, request)
return jobs[job_id]
@app.get("/research/{job_id}", response_model=ResearchResponse)
async def get_research_status(job_id: str):
if job_id not in jobs:
raise HTTPException(status_code=404,
detail="Job not found")
return jobs[job_id]
Docker Compose
version: "3.9"
services:
research-api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- TAVILY_API_KEY=${TAVILY_API_KEY}
- PINECONE_API_KEY=${PINECONE_API_KEY}
- PINECONE_INDEX=research-assistant
volumes:
- ./data:/app/data
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
depends_on:
- prometheus
volumes:
redis-data:
Rezultate și lecții învățate
Măsuri de sistem în producție
| Metric | Valoare | Note |
|---|---|---|
| Timp mediu per raport | 2-4 minute | Depinde de complexitatea interogării și de numărul de surse |
| Surse pentru rapoarte | 5-12 | Media de 8 surse validate per căutare |
| Scorul mediu de încredere | 0,74 | Pe un eșantion de 200 de studii de cercetare în domeniul tehnologic |
| Cost pe raport | 0,15 USD - 0,45 USD | GPT-4o pentru toți cei 3 agenți, inclusiv apelurile API Tavily |
| Rata de retragere | 12% | Cercetare care necesită mai mult de un ciclu Cercetător-Analist |
| Intervenții manuale | 3% | Rapoarte cu încredere mai mică de 0,5 care necesită revizuire |
Lecții învățate
- Calitatea descrierilor instrumentelor este factorul de diferențiere. Instrument cu descrieri vag sau ambiguu îl determină pe agent să facă alegeri greșite. Investește timp în echipa editorială de descrieri precise, cu exemple concrete ale parametrilor, îmbunătățește drastic performanța.
- Tratarea erorilor este o preocupare de primă clasă, nu o idee ulterioară. Într-o sistem multi-agent, o eroare netratată într-un agent se propagă în cascadă. Fiecare instrument iar fiecare nod din grafic trebuie să aibă o strategie explicită de rezervă.
- Memoria partajată este adevăratul factor de diferențiere. Fără graficul de cunoștințe, fiecare căutare începe de la zero. Cu memoria, sistemul se îmbunătățește progresiv: recunoaște entități deja analizate, evită revizuirea surselor de calitate scăzută și construiește context care îmbogățește fiecare nouă analiză.
- Scorul de încredere trebuie calibrat, nu doar calculat. Primele scoruri erau prea optimisti. Am introdus penalități pentru sursele cu un domeniu necunoscut, bonusuri pentru mai multe referințe încrucișate și praguri de precauție pentru subiecte controversate.
- Costul operațional este gestionabil cu strategiile FinOps. Utilizați GPT-4o-mini pentru cercetător (care face sarcini mai simple) și GPT-4o pentru analist și editor reduce costurile cu 35% fără un impact semnificativ asupra calității.
Concluzii
În acest studiu de caz am construit un sistem complet multi-agent care demonstrează modul în care conceptele explorate în cele douăsprezece articole anterioare se integrează într-o aplicație din lumea reală. Cercetare autonomă Asistentul nu este un exercițiu academic: este un model arhitectural aplicabil în numeroase domenii, de la due diligence financiară la analiză competitivă, de la cercetare științifică la monitorizare a pietei.
Punctele cheie care au reieșit din construcția acestui sistem sunt centralitatea planificării a instrumentelor, importanța managementului erorilor ca aspect structural și nu auxiliar este valoarea memoriei partajate ca factor de îmbunătățire continuă. Fiecare decizie arhitecturală, de la numărul de agenți până la alegerea modelelor, trebuie ghidat de metrici și cerințe concrete a afacerilor reale.
În următorul și ultimul articol al seriei, „Viitorul agenților AI: Capabilități emergente, AGI și limitări curente", vom căuta din cod pentru a explora unde se îndreaptă Agenți AI. Vom analiza limitările actuale, capacitățile emergente, direcțiile de cercetare cele mai promițătoare și implicațiile economice și etice ale acestei tehnologii cu evoluție rapidă.







