マルチエージェント システム: オーケストレーションとコミュニケーション
I マルチエージェントシステム これらは、2025 年の AI における最も爆発的なトレンドの 1 つです。 Gartner の報告によると、 2024 年第 1 四半期と 2025 年第 2 四半期。理由は簡単です。実際の問題の多くは複雑すぎるからです。 単一の AI エージェントの場合。データを収集し、分析しなければならない調査エージェント レポートを書いて電子メールで送信することは 1 つの問題ではなく、4 つの問題があります。 さまざまなスキルを必要とするスペシャリスト。
この記事では、基本的なアーキテクチャから実際のマルチエージェント システムを構築します。 (オーケストレーター + 専門エージェント) から次のような高度なパターンまで 反応する (推理+演技)、 計画と実行, 監督者兼作業者 e ランググラフ オーケストレーション用 ステートフルな。各セクションには、実際の問題でテストされた実際の Python コードとパターンが含まれています。
何を学ぶか
- マルチエージェント システムのアーキテクチャとパターン
- ReActパターン:推論+ツール呼び出しによる行動
- 計画と実行: 計画と実行を分離する
- スーパーバイザー パターン: 専門エージェントを調整するオーケストレーター
- コントロール グラフを使用したステートフル オーケストレーションのための LangGraph
- エージェント間のコミュニケーションと調整
- マルチエージェント システムでのエラー処理とフォールバック
- マルチエージェントパイプラインの監視と可観測性
1. マルチエージェントシステムのアーキテクチャ
マルチエージェントシステムは複数のエージェントで構成されます。 エージェント (LLM + ツールセット) 共通の目標を達成するために協力する人たち。成功の鍵は 責任の分離: 各エージェントの専門分野は次のとおりです。 特定のドメインを管理し、他のエージェントにいつ委任すべきかを知っています。
TOPOLOGIE MULTI-AGENT:
1. NETWORK (fully connected):
Ogni agente può chiamare qualsiasi altro agente.
Pro: massima flessibilità
Con: difficile da controllare, rischio di loop
A ◄──► B
▲ ▲
└──► C ◄┘
2. SUPERVISOR (star topology):
Un agente centrale orchestra tutti gli altri.
Pro: controllo centralizzato, facile da debuggare
Con: single point of failure, bottleneck
SUPERVISOR
├──► Agent A (ricerca)
├──► Agent B (analisi)
└──► Agent C (report)
3. HIERARCHICAL:
Supervisori multipli organizzati in gerarchia.
Pro: scalabilità, separazione chiara delle responsabilità
Con: latenza aumentata, complessità coordinazione
Manager
├──► Sub-manager A
│ ├──► Worker A1
│ └──► Worker A2
└──► Sub-manager B
├──► Worker B1
└──► Worker B2
4. PIPELINE (sequential):
Ogni agente processa l'output del precedente.
Pro: semplice, deterministico, facile da debuggare
Con: rigido, nessuna retroazione
Input ──► A ──► B ──► C ──► Output
SCELTA DELLA TOPOLOGIA:
- Task ben definiti, ordine chiaro → Pipeline
- Task con routing dinamico → Supervisor
- Problemi complessi con sottoproblemi → Hierarchical
- Ricerca esplorativa → Network (con guardrail)
2. ReAct パターン: 推論 + 行動
反応する (推論 + 行動) が AI エージェントの基本パターン ツールにアクセスできるようになります。エージェントは以下を交互に行います。 思い (何をすべきかについての推論)、 アクション (ツールの実行) e 観察 (の解釈 結果)、最終的な答えに到達するまでサイクルを繰り返します。
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain import hub
from typing import Optional
import requests
import json
# ============================================================
# DEFINIZIONE DEI TOOL
# ============================================================
@tool
def search_web(query: str) -> str:
"""Cerca informazioni sul web. Usa per trovare dati aggiornati o notizie."""
# Integrazione con Tavily, SerpAPI o simili
# Per semplicità, simuliamo una risposta
return f"Risultati ricerca per '{query}': [risultati simulati per demo]"
@tool
def analyze_data(data: str, analysis_type: str) -> str:
"""
Analizza dati numerici o testuali.
analysis_type può essere: 'summary', 'trend', 'anomaly', 'comparison'
"""
return f"Analisi {analysis_type} completata: [analisi simulata]"
@tool
def generate_report(title: str, sections: str) -> str:
"""
Genera un report strutturato.
sections: JSON con le sezioni del report
"""
return f"Report '{title}' generato con sezioni: {sections}"
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Invia un'email. Usa per comunicare risultati all'utente finale."""
return f"Email inviata a {to} con oggetto '{subject}'"
@tool
def query_database(query: str, database: str = "main") -> str:
"""
Esegue query su database interni.
database: 'main' per il DB principale, 'analytics' per il DW
"""
return f"Query su {database}: [risultati simulati]"
# ============================================================
# CREAZIONE AGENTE REACT
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Tool set disponibili all'agente
tools = [search_web, analyze_data, generate_report, send_email, query_database]
# Prompt ReAct standard (da LangChain Hub)
react_prompt = hub.pull("hwchase17/react")
# Il prompt include: instructions per pensiero/azione/osservazione
# con schema: "Thought: ...\nAction: tool_name\nAction Input: ...\nObservation: ..."
# Crea l'agente ReAct
agent = create_react_agent(
llm=llm,
tools=tools,
prompt=react_prompt
)
# AgentExecutor: gestisce il loop thought-action-observation
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # Mostra ogni step del reasoning
max_iterations=10, # Previeni loop infiniti
early_stopping_method="generate", # Genera risposta se troppi step
handle_parsing_errors=True # Gestisci errori di parsing gracefully
)
# Esempio: task complesso che richiede più step
task = """
Analizza le performance di vendita Q4 2024 dal database analytics,
cerca notizie recenti sul settore, genera un report con raccomandazioni
e invialo a manager@company.com
"""
result = agent_executor.invoke({"input": task})
print(f"\nRisposta finale:\n{result['output']}")
# Output tipico del reasoning ReAct:
# Thought: Devo prima ottenere i dati di vendita dal database
# Action: query_database
# Action Input: {"query": "SELECT * FROM sales WHERE quarter='Q4' AND year=2024"}
# Observation: [dati vendite]
# Thought: Ora cerco notizie recenti sul settore
# Action: search_web
# Action Input: "notizie settore vendite Q4 2024"
# Observation: [risultati]
# Thought: Ho tutti i dati, genero il report
# Action: generate_report
# ...
3. 計画と実行のパターン
パターン 計画と実行 計画を実行から切り離す: まず LLM が必要な手順の詳細な計画を作成し、次にエージェントが実行します。 各ステップを順次 (または並行して) 実行します。 ReActと比較して視覚が可能 問題に対するより総合的なアプローチと、複雑さのより適切な管理。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import List, Dict, Any
import json
class PlanAndExecuteAgent:
"""
Agente Plan-and-Execute:
1. PLANNER: crea un piano step-by-step
2. EXECUTOR: esegue ogni step con i tool disponibili
3. RESPONDER: sintetizza i risultati in una risposta finale
"""
def __init__(self, llm, tools_dict: Dict[str, callable]):
self.llm = llm
self.tools = tools_dict
# PLANNER: LLM ottimizzato per pianificazione
self.planner_chain = (
ChatPromptTemplate.from_template("""
Sei un pianificatore esperto. Crea un piano dettagliato step-by-step per completare
il seguente task.
Tool disponibili: {available_tools}
Task: {task}
Crea un piano come lista JSON di steps. Ogni step deve avere:
- "step_id": numero progressivo (1, 2, 3...)
- "description": descrizione dell'azione
- "tool": nome del tool da usare (o "llm" per ragionamento puro)
- "depends_on": lista di step_id che devono completarsi prima (per parallelismo)
Rispondi SOLO con il JSON, nient'altro.""")
| llm
| StrOutputParser()
)
# EXECUTOR: esegue un singolo step
self.executor_chain = (
ChatPromptTemplate.from_template("""
Esegui questo step del piano:
Step: {step}
Risultati steps precedenti: {previous_results}
Se devi usare un tool, fornisci l'input necessario in modo preciso.
Descrivi brevemente cosa hai fatto e il risultato ottenuto.""")
| llm
| StrOutputParser()
)
# RESPONDER: sintesi finale
self.responder_chain = (
ChatPromptTemplate.from_template("""
Task originale: {task}
Piano eseguito con questi risultati:
{step_results}
Sintetizza i risultati in una risposta completa e strutturata per l'utente.""")
| llm
| StrOutputParser()
)
def _parse_plan(self, plan_json: str) -> List[Dict]:
"""Parse del piano JSON"""
try:
# Rimuovi eventuali backtick o prefissi markdown
clean = plan_json.strip().strip('`').strip()
if clean.startswith('json'):
clean = clean[4:].strip()
return json.loads(clean)
except json.JSONDecodeError:
# Fallback: crea un piano semplice
return [{"step_id": 1, "description": "Esegui il task", "tool": "llm", "depends_on": []}]
def _execute_step(self, step: Dict, previous_results: Dict) -> str:
"""Esegui un singolo step del piano"""
tool_name = step.get("tool", "llm")
if tool_name != "llm" and tool_name in self.tools:
# Usa il tool specifico
try:
tool_input = step.get("tool_input", step["description"])
result = self.tools[tool_name](tool_input)
return f"Tool {tool_name} eseguito: {result}"
except Exception as e:
return f"Errore nel tool {tool_name}: {str(e)}"
else:
# Ragionamento puro con LLM
return self.executor_chain.invoke({
"step": step["description"],
"previous_results": json.dumps(previous_results, ensure_ascii=False)
})
def run(self, task: str) -> dict:
"""Esegui il task con Plan-and-Execute"""
print(f"\nTask: {task}\n")
# STEP 1: Pianificazione
available_tools = list(self.tools.keys())
plan_json = self.planner_chain.invoke({
"task": task,
"available_tools": ", ".join(available_tools)
})
plan = self._parse_plan(plan_json)
print(f"Piano creato: {len(plan)} steps")
# STEP 2: Esecuzione sequenziale degli step
step_results = {}
completed_steps = set()
# Ordina per dipendenze (topological sort semplificato)
for step in plan:
step_id = step["step_id"]
depends_on = step.get("depends_on", [])
# Aspetta che i prerequisiti siano completati
while not all(str(d) in completed_steps for d in depends_on):
import time; time.sleep(0.1) # In pratica usa async
print(f" Esecuzione step {step_id}: {step['description'][:60]}...")
result = self._execute_step(step, step_results)
step_results[str(step_id)] = result
completed_steps.add(str(step_id))
# STEP 3: Sintesi finale
final_response = self.responder_chain.invoke({
"task": task,
"step_results": json.dumps(step_results, ensure_ascii=False, indent=2)
})
return {
"plan": plan,
"step_results": step_results,
"final_response": final_response
}
# Utilizzo
tools_dict = {
"search_web": lambda q: f"Risultati ricerca: {q}",
"analyze_data": lambda d: f"Analisi completata per: {d}",
"generate_report": lambda t: f"Report generato: {t}",
}
agent = PlanAndExecuteAgent(llm=llm, tools_dict=tools_dict)
result = agent.run("Analizza i dati di vendita 2024 e prepara un report esecutivo")
print(result["final_response"])
4. LangGraph: グラフを使用したステートフル オーケストレーション
ランググラフ マルチエージェント システムを構築するための LangChain のライブラリです 有向非巡回グラフ (DAG) およびサイクルを含むグラフに基づくステートフル。定義できるようにします エージェント間のフローを明示的に制御し、共有状態を管理し、 フィードバックループを実装します。複雑なマルチエージェント システムに最適です。 生産中です。
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, List, Literal
import operator
import json
# ============================================================
# STATO CONDIVISO DEL SISTEMA
# ============================================================
class AgentState(TypedDict):
"""Stato condiviso tra tutti gli agenti nel grafo"""
messages: Annotated[List, operator.add] # Lista messaggi (append-only)
task: str # Task originale
plan: List[str] # Piano di esecuzione
current_step: int # Step corrente
research_results: str # Output agente ricerca
analysis_results: str # Output agente analisi
final_report: str # Report finale
next_agent: str # Prossimo agente da chiamare
error_count: int # Contatore errori
# ============================================================
# AGENTI SPECIALIZZATI
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def supervisor_agent(state: AgentState) -> AgentState:
"""
Agente supervisor: decide quale agente chiamare.
Analizza lo stato corrente e decide il prossimo passo.
"""
system_msg = SystemMessage(content="""Sei un supervisor AI che coordina un team di agenti.
Devi decidere quale agente chiamare in base allo stato corrente del task.
Agenti disponibili:
- research: raccoglie informazioni e dati
- analysis: analizza i dati raccolti
- report_writer: scrive il report finale
- FINISH: il task è completato
Rispondi con UN SOLO nome tra quelli disponibili.""")
messages = [system_msg, HumanMessage(content=f"""
Task: {state['task']}
Research completata: {'Si' if state['research_results'] else 'No'}
Analisi completata: {'Si' if state['analysis_results'] else 'No'}
Report scritto: {'Si' if state['final_report'] else 'No'}
Quale agente deve agire ora?""")]
response = llm.invoke(messages)
next_agent = response.content.strip().lower()
# Valida la risposta
valid_agents = ["research", "analysis", "report_writer", "finish"]
if next_agent not in valid_agents:
next_agent = "research" # Fallback sicuro
return {"next_agent": next_agent, "messages": [response]}
def research_agent(state: AgentState) -> AgentState:
"""Agente specializzato in ricerca e raccolta dati"""
messages = [
SystemMessage(content="Sei un ricercatore AI. Raccogli dati rilevanti per il task."),
HumanMessage(content=f"Raccogli informazioni per: {state['task']}")
]
response = llm.invoke(messages)
return {
"research_results": response.content,
"messages": [response]
}
def analysis_agent(state: AgentState) -> AgentState:
"""Agente specializzato in analisi dei dati"""
messages = [
SystemMessage(content="Sei un analista AI. Analizza i dati raccolti."),
HumanMessage(content=f"""
Dati da analizzare:
{state['research_results']}
Task originale: {state['task']}
Fornisci un'analisi strutturata.""")
]
response = llm.invoke(messages)
return {
"analysis_results": response.content,
"messages": [response]
}
def report_writer_agent(state: AgentState) -> AgentState:
"""Agente specializzato nella scrittura di report"""
messages = [
SystemMessage(content="Sei un writer AI specializzato in report tecnici."),
HumanMessage(content=f"""
Task: {state['task']}
Ricerca: {state['research_results']}
Analisi: {state['analysis_results']}
Scrivi un report professionale e completo.""")
]
response = llm.invoke(messages)
return {
"final_report": response.content,
"messages": [response]
}
# ============================================================
# ROUTING FUNCTION
# ============================================================
def route_to_agent(state: AgentState) -> Literal["research", "analysis", "report_writer", END]:
"""Funzione di routing: decide il nodo successivo nel grafo"""
next_agent = state.get("next_agent", "research")
if next_agent == "finish" or state.get("error_count", 0) > 5:
return END
return next_agent
# ============================================================
# COSTRUZIONE DEL GRAFO
# ============================================================
# Crea il grafo stateful
workflow = StateGraph(AgentState)
# Aggiungi nodi (agenti)
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("research", research_agent)
workflow.add_node("analysis", analysis_agent)
workflow.add_node("report_writer", report_writer_agent)
# Definisci entry point
workflow.set_entry_point("supervisor")
# Definisci edges condizionali
workflow.add_conditional_edges(
"supervisor",
route_to_agent,
{
"research": "research",
"analysis": "analysis",
"report_writer": "report_writer",
END: END
}
)
# Tutti gli agenti tornano al supervisor
workflow.add_edge("research", "supervisor")
workflow.add_edge("analysis", "supervisor")
workflow.add_edge("report_writer", "supervisor")
# Compila il grafo
app = workflow.compile()
# ============================================================
# ESECUZIONE
# ============================================================
initial_state = {
"task": "Analizza le tendenze del mercato AI nel 2025 e prepara un report esecutivo",
"messages": [],
"plan": [],
"current_step": 0,
"research_results": "",
"analysis_results": "",
"final_report": "",
"next_agent": "",
"error_count": 0
}
# Esegui il grafo
config = {"recursion_limit": 20}
final_state = app.invoke(initial_state, config=config)
print("\n=== REPORT FINALE ===")
print(final_state["final_report"])
5. エージェント間のコミュニケーションと調整
エージェント間の効果的なコミュニケーションは非常に重要です。主に次の 3 つのパターンがあります。 メッセージパッシング (エージェントは構造化されたメッセージを交換します)、 共有状態 (LangGraph のようなグローバル共有状態) e 黒板 (エージェントが書き込みおよび読み取りを行う共有データベース)。
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from datetime import datetime
import uuid
from enum import Enum
class MessageType(Enum):
TASK = "task" # Assegna un task a un agente
RESULT = "result" # Restituisce il risultato di un task
ERROR = "error" # Segnala un errore
STATUS = "status" # Aggiornamento di stato
QUERY = "query" # Richiesta di informazioni
RESPONSE = "response" # Risposta a una query
@dataclass
class AgentMessage:
"""Messaggio strutturato per comunicazione inter-agent"""
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
from_agent: str = ""
to_agent: str = ""
message_type: MessageType = MessageType.TASK
content: Any = None
metadata: Dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
correlation_id: Optional[str] = None # Per tracciare conversazioni
def is_response_to(self, original: 'AgentMessage') -> bool:
return self.correlation_id == original.message_id
def create_response(self, content: Any, message_type: MessageType = MessageType.RESULT):
"""Crea una risposta a questo messaggio"""
return AgentMessage(
from_agent=self.to_agent,
to_agent=self.from_agent,
message_type=message_type,
content=content,
correlation_id=self.message_id
)
class MessageBus:
"""Bus di messaggi per comunicazione asincrona tra agenti"""
def __init__(self):
self.queues: Dict[str, List[AgentMessage]] = {}
self.history: List[AgentMessage] = []
def register_agent(self, agent_id: str):
"""Registra un agente nel bus"""
self.queues[agent_id] = []
def send(self, message: AgentMessage):
"""Invia un messaggio a un agente"""
if message.to_agent not in self.queues:
self.queues[message.to_agent] = []
self.queues[message.to_agent].append(message)
self.history.append(message)
def receive(self, agent_id: str) -> Optional[AgentMessage]:
"""Ricevi il primo messaggio dalla coda di un agente"""
if agent_id in self.queues and self.queues[agent_id]:
return self.queues[agent_id].pop(0)
return None
def broadcast(self, message: AgentMessage, agents: List[str]):
"""Invia lo stesso messaggio a più agenti"""
for agent_id in agents:
msg_copy = AgentMessage(
from_agent=message.from_agent,
to_agent=agent_id,
message_type=message.message_type,
content=message.content,
metadata=message.metadata,
correlation_id=message.message_id
)
self.send(msg_copy)
def get_conversation(self, correlation_id: str) -> List[AgentMessage]:
"""Ottieni tutti i messaggi di una conversazione"""
return [m for m in self.history if
m.message_id == correlation_id or
m.correlation_id == correlation_id]
# Esempio utilizzo
bus = MessageBus()
bus.register_agent("supervisor")
bus.register_agent("research")
bus.register_agent("analysis")
# Supervisor assegna task alla ricerca
task_msg = AgentMessage(
from_agent="supervisor",
to_agent="research",
message_type=MessageType.TASK,
content="Ricerca tendenze AI 2025",
metadata={"priority": "high", "deadline": "2025-03-01"}
)
bus.send(task_msg)
# Research riceve e risponde
received = bus.receive("research")
if received:
result_msg = received.create_response(
content="Trovate 50 fonti rilevanti sulle tendenze AI 2025",
message_type=MessageType.RESULT
)
bus.send(result_msg)
6. エラーの管理と監視
マルチエージェント システムでは、エージェントのスタックという新たな障害ベクトルが発生します。 ループ、通信エラー、不正な出力、過剰な遅延。それは基本的なことです サーキット ブレーカー、タイムアウト、および特定の監視を実装します。
import asyncio
import time
from functools import wraps
from typing import Callable, TypeVar
from enum import Enum
T = TypeVar('T')
class CircuitState(Enum):
CLOSED = "closed" # Funziona normalmente
OPEN = "open" # Bloccato (troppi errori)
HALF_OPEN = "half_open" # Test: prova una chiamata
class CircuitBreaker:
"""
Circuit breaker per agenti AI.
Previene cascade failure quando un agente fallisce ripetutamente.
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_max_calls: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_calls = 0
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise Exception(
f"Circuit OPEN: agente non disponibile. "
f"Retry in {self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
)
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise Exception("Circuit HALF_OPEN: attendere recovery")
self.half_open_calls += 1
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
return wrapper
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class MultiAgentMonitor:
"""Monitoring per sistemi multi-agent"""
def __init__(self):
self.agent_stats: Dict[str, Dict] = {}
self.task_history: List[Dict] = []
def record_agent_call(
self,
agent_id: str,
task: str,
success: bool,
duration_ms: float,
tokens_used: int = 0
):
"""Registra una chiamata a un agente"""
if agent_id not in self.agent_stats:
self.agent_stats[agent_id] = {
"total_calls": 0, "successes": 0, "failures": 0,
"total_tokens": 0, "total_duration_ms": 0
}
stats = self.agent_stats[agent_id]
stats["total_calls"] += 1
stats["successes" if success else "failures"] += 1
stats["total_tokens"] += tokens_used
stats["total_duration_ms"] += duration_ms
def get_system_health(self) -> dict:
"""Ritorna lo stato di salute del sistema"""
results = {}
for agent_id, stats in self.agent_stats.items():
total = stats["total_calls"]
success_rate = stats["successes"] / total if total > 0 else 0
avg_duration = stats["total_duration_ms"] / total if total > 0 else 0
results[agent_id] = {
"success_rate": success_rate,
"avg_duration_ms": avg_duration,
"total_calls": total,
"status": "healthy" if success_rate > 0.9 else
"degraded" if success_rate > 0.7 else "critical"
}
return results
7. ベストプラクティスとアンチパターン
マルチエージェントのベストプラクティス
- 単一責任の原則: 各エージェントは明確に定義された限定された役割を持たなければなりません。 「すべて」を行うエージェントは、デバッグや改善が困難です。
- 各エージェントのタイムアウト: どのエージェントもシステムを無期限にブロックできるべきではありません。現実的なタイムアウトを設定します (LLM エージェントの場合は 10 ~ 30 秒)。
- 明示的でシリアル化可能な状態: TypedDict または Pydantic を使用して状態を定義します。ロギング、デバッグ、リカバリのためにシリアル化可能 (JSON) である必要があります。
- 各エージェントのサーキット ブレーカー: カスケード障害を防ぎます。エージェントが繰り返し失敗すると、サーキット ブレーカーが通話を停止し、回復できるようにします。
- あらゆる決定を追跡します。 すべての監督者の決定を明確な理由とともに記録します。トレースなしでマルチエージェント システムをデバッグすることはほぼ不可能です。
避けるべきアンチパターン
- 一般的すぎるエージェント: 「ユニバーサル アシスタント」エージェントは、単一の LLM と同等の効果はありませんが、はるかに高価であり、制御が困難です。
- 反復制限なし: max_iterations がないと、ReAct システムは無限ループで実行され、トークンとお金を消費する可能性があります。常に制限を設定します。
- 共有された変更可能なグローバル状態: エージェント間で可変状態を共有すると、競合状態が発生します。不変メッセージまたは集中管理された状態を使用します。
- エージェントの出力を盲目的に信頼する: エージェントは、奇形または幻覚性の出力を生成する可能性があります。出力を次のエージェントに渡す前に、必ず出力を検証してください。
結論
マルチエージェント システムは、複雑な問題に適用される AI の未来を表します。 基本的なパターン (ReAct、計画と実行、スーパーバイザー) を調査しました。 ステートフル オーケストレーション、エージェント間の通信パターン、および 本番環境でのエラー管理。
重要なポイント:
- 適切なトポロジを選択します: リニア フローの場合はパイプライン、動的ルーティングの場合はスーパーバイザ
- ReAct は、ツールを使用した探索的なタスクに最適です。構造化されたタスクの計画と実行
- LangGraph は実稼働環境の複雑なシステムに最適な選択肢です
- 各エージェントには明確に定義された単一の役割が必要です
- サーキットブレーカーとタイムアウトは本番環境では不可欠です
- 効果的なデバッグのためにすべての監督者の決定を追跡します
次の記事では、 生産における迅速なエンジニアリング: 品質と一貫性を確保するためのテンプレート、バージョン管理、体系的なテスト 時間が経つにつれて。
シリーズは続く
- 第 7 条: コンテキスト ウィンドウの管理
- 第 8 条: マルチエージェント システム (現行)
- 第 9 条: 生産における迅速なエンジニアリング
- 第 10 条: AI のナレッジ グラフ
さらに詳しく: MLOps: モデルの提供 e RAG 用の LangChain.







