はじめに: 理論から生産まで
以前の記事では、AI エージェントを構築するための個別のフレームワーク、LangChain と グラフベースのワークフローには LangGraph、構造化されたチームには CrewAI、会話には AutoGen マルチエージェント。次に、最も複雑な課題に取り組んでみましょう。 マルチエージェント システムを調整する方法 生産中、信頼性、スケーラビリティ、可観測性はオプションではありません。 しかし、基本的な要件。
2 つまたは 3 つのエージェントを備えたプロトタイプから、数十のエージェントを備えた運用準備が整ったシステムに移行します。 調整には大幅なアーキテクチャの飛躍が必要です。オーケストレーション パターンは現在、 標準化されるほど成熟しているかどうかを決定するのは、適切なアーキテクチャの選択です。 プロジェクト全体の成功か失敗か。よくある間違いは、複雑さを過小評価することです。 最近の研究によると、単純なアプローチ (「エージェントのバッグ」) では、 17 倍のエラー エスカレーション、各エージェントが問題を増幅させる 解決するのではなく、他人のことを考えます。
この記事では、5 つの標準オーケストレーション パターン、3 つのアーキテクチャを分析します。 メイン (ハブアンドスポーク、ピアツーピア、メッセージキュー)、状態管理戦略 分散、耐障害性、監視。目標は、実践的な指導を提供することです。 堅牢で保守可能なマルチエージェント システムを設計します。
この記事で学べること
- 5 つの標準的なオーケストレーション パターン: シーケンシャル、コンカレント、グループ チャット、ハンドオフ、プランファースト
- ハブアンドスポーク アーキテクチャ: 中央コーディネーターをいつ、どのように使用するか
- ピアツーピア アーキテクチャ: エージェント間の直接通信
- メッセージ キュー アーキテクチャ: RabbitMQ、Kafka、Pulsar によるデカップリングと回復力
- 分散状態管理: イベント ソーシング、CQRS、分散トランザクション
- フォールト トレランス: サーキット ブレーカー、タイムアウト、再試行ポリシー、グレースフル デグラデーション
- 監視と可観測性: 分散トレース、特殊なメトリクスとダッシュボード
- 意思決定マトリックス: ユースケースに適したアーキテクチャを選択する方法
5 つの標準的なオーケストレーション パターン
マルチエージェント オーケストレーションは 5 つの基本パターンに基づいており、それぞれが最適化されています。 特定の種類のワークロード。複雑なシステムでは、これらのパターンが組み合わされます。 メインフローにはシーケンシャルパターンを使用でき、サブタスクを並列化するにはコンカレントを使用できます。 専門的な決定を委任するハンドオフ。
オーケストレーションの5つのパターン
| パターン | 流れ | プロ | に対して | 使用事例 |
|---|---|---|---|---|
| 一連 | A→B→C | シンプル、予測可能 | 遅い、類似点がない | ETL パイプライン、コンテンツ ワークフロー |
| 同時 | A、B、C を並列に | 高速かつ効率的 | 同期の複雑さ | マルチソース分析、検索 |
| グループチャット | 共同スレッド | 柔軟で新しい | 予測不能、高価 | ブレーンストーミング、反復レビュー |
| 渡す | 動的な委任 | 適応型、特化型 | 複雑なルーティング | カスタマーサポート、トリアージ |
| 計画第一 | 計画→実行 | 戦略的で最適化された | 初期オーバーヘッド | 複雑なタスク、デザイン |
パターン1:シーケンシャル
パターンでは 一連、エージェントは線形チェーンで動作します。出力 エージェント A の入力はエージェント B の入力となり、その出力はエージェント C の入力になります。 各エージェントは、その前のエージェントの結果を強化、変換、または検証します。それがパターンです 実装とデバッグが簡単になります。
# Pattern Sequential: Pipeline di Content Creation
from typing import List, Dict
class SequentialPipeline:
def __init__(self, agents: List):
self.agents = agents
async def execute(self, initial_input: str) -> Dict:
current_output = initial_input
results = []
for agent in self.agents:
result = await agent.process(current_output)
results.append({
"agent": agent.name,
"input": current_output,
"output": result
})
current_output = result
return {
"final_output": current_output,
"pipeline_trace": results
}
# Utilizzo: Researcher -> Writer -> Editor -> Publisher
pipeline = SequentialPipeline([
ResearchAgent("Researcher"),
WriterAgent("Writer"),
EditorAgent("Editor"),
PublisherAgent("Publisher")
])
result = await pipeline.execute(
"Scrivi un articolo sulle best practices di Kubernetes"
)
Sequential パターンの利点は次のとおりです。 完全なトレーサビリティ:各フェーズごと 入力、出力、および責任のあるエージェントを正確に知っています。主な欠点は、 合計時間は、並列処理の可能性を除いた、すべてのエージェントの時間の合計です。
パターン 2: 同時実行
パターンでは 同時、複数のエージェントが独立したタスクを並行して実行します。 結果は専用のコンポーネントによって集約されます。このパターンは次の場合に最適です。 サブタスク間には依存関係がないため、速度が重要です。
import asyncio
from typing import List, Dict
class ConcurrentOrchestrator:
def __init__(self, agents: List, aggregator):
self.agents = agents
self.aggregator = aggregator
async def execute(self, task: str) -> Dict:
# Esecuzione parallela di tutti gli agenti
tasks = [agent.process(task) for agent in self.agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filtraggio errori e aggregazione
successful = []
errors = []
for agent, result in zip(self.agents, results):
if isinstance(result, Exception):
errors.append({"agent": agent.name, "error": str(result)})
else:
successful.append({"agent": agent.name, "result": result})
# Aggregazione dei risultati
final = await self.aggregator.aggregate(successful)
return {
"final_output": final,
"successful_agents": len(successful),
"failed_agents": len(errors),
"errors": errors
}
# Utilizzo: 3 agenti analizzano lo stesso topic da angolazioni diverse
orchestrator = ConcurrentOrchestrator(
agents=[
TechnicalAnalyst("TechAnalyst"),
MarketAnalyst("MarketAnalyst"),
RiskAnalyst("RiskAnalyst")
],
aggregator=SynthesisAgent("Synthesizer")
)
パターン3:グループチャット
パターン グループチャット エージェントが連携する共同スレッドを作成します。 AutoGen で見たように、彼らは自由に会話します。最も柔軟なパターンです しかし、最も予測不可能でもあります。結果の品質は以下に大きく依存します。 役割と終了基準の定義。
パターン 4: ハンドオフ
パターンでは 渡す、ルーティング エージェントがタスクを分析し、委任します。 最適な専門エージェントへ。これは顧客のシステムで主流のパターンです サポート AI。トリアージ エージェントがリクエストを分類し、適切な部門にルーティングします。
class HandoffRouter:
def __init__(self, specialists: Dict[str, 'Agent']):
self.specialists = specialists
self.router_llm = RouterLLM()
async def route(self, task: str) -> Dict:
# L'LLM determina quale specialista gestire il task
classification = await self.router_llm.classify(task)
specialist_name = classification["specialist"]
confidence = classification["confidence"]
if confidence < 0.7:
# Bassa confidenza: escalation a un umano
return {
"status": "escalated",
"reason": "Low confidence routing",
"confidence": confidence
}
specialist = self.specialists.get(specialist_name)
if not specialist:
return {"status": "error", "reason": f"No specialist: {specialist_name}"}
result = await specialist.handle(task)
return {
"status": "completed",
"specialist": specialist_name,
"confidence": confidence,
"result": result
}
# Configurazione per customer support
router = HandoffRouter({
"billing": BillingAgent("BillingSpecialist"),
"technical": TechnicalAgent("TechSupport"),
"sales": SalesAgent("SalesSpecialist"),
"general": GeneralAgent("GeneralSupport")
})
パターン 5: 計画優先
パターンでは 計画第一、計画エージェントが複雑なタスクを分析し、 それをサブタスクに分解し、依存関係と実行順序を定義してから、 オーケストレーターが計画を実行します。このパターンは、次のような複雑なタスクの基本です。 行動の前に戦略が必要です。
class PlanFirstOrchestrator:
def __init__(self, planner, executor_pool: Dict[str, 'Agent']):
self.planner = planner
self.executor_pool = executor_pool
async def execute(self, complex_task: str) -> Dict:
# Fase 1: Pianificazione
plan = await self.planner.create_plan(complex_task)
# plan = [
# {"step": 1, "agent": "researcher", "task": "...", "deps": []},
# {"step": 2, "agent": "analyst", "task": "...", "deps": [1]},
# {"step": 3, "agent": "writer", "task": "...", "deps": [1, 2]}
# ]
# Fase 2: Esecuzione rispettando le dipendenze
completed = {}
for step in self._topological_sort(plan):
# Attendi dipendenze
dep_results = {
d: completed[d] for d in step["deps"]
}
agent = self.executor_pool[step["agent"]]
result = await agent.process(
task=step["task"],
context=dep_results
)
completed[step["step"]] = result
return {"plan": plan, "results": completed}
def _topological_sort(self, plan):
"""Ordina i passi rispettando le dipendenze"""
# Implementazione topological sort per DAG
sorted_steps = []
visited = set()
# ... sorting logic ...
return sorted_steps
ハブアンドスポークアーキテクチャ
アーキテクチャ ハブアンドスポーク 中央コーディネーター (ハブ) を提供します。 ワーカー エージェント (スポーク) のグループを管理します。すべての通信はハブを経由します。 エージェントは互いに直接通信しません。これは最も建築的なモデルです 運用環境のマルチエージェント システムでは一般的です。
Architettura Hub-and-Spoke:
+------------------+
| ORCHESTRATOR |
| (Hub) |
+--------+---------+
|
+---------+-------+-------+---------+
| | | |
+-----v----+ +--v------+ +----v-----+ +-v--------+
| Agent A | | Agent B | | Agent C | | Agent D |
| (Search) | | (Analyze| | (Write) | | (Review) |
+----------+ +---------+ +----------+ +----------+
(Spoke) (Spoke) (Spoke) (Spoke)
ハブアンドスポークの利点
- 集中管理: ハブでは全員のステータスを完全に把握できます エージェントは、情報に基づいてルーティングと優先順位について決定を下すことができます。
- 簡易モニタリング: すべての通信は単一のポイントを通過します。 ロギング、トレース、監査が簡単になります。
- 一元的なエラー管理: ハブは再試行、フォールバック、および すべてのエージェントに対して一貫してサーキット ブレーカーを適用します。
- 簡単進化: ワーカー エージェントの追加または削除は非常に簡単です 他のエージェントを変更せずにハブに登録します。
ハブアンドスポークの欠点
- 単一障害点: ハブに障害が発生すると、システム全体が停止します。 ハブに高可用性 (HA) が必要となるため、複雑さが増します。
- ボトルネック: エージェントの数が増えるとハブがボトルネックになります または通信頻度が大幅に増加します。
- 限られたスケーラビリティ: 一定数のエージェントを超えてスケーリングするには、 ハブをシャーディングするか、別のアーキテクチャに移行します。
- 追加の遅延: エージェント間の通信ごとにホップスルーが追加されます ハブに接続され、エンドツーエンドの遅延が増加します。
class HubOrchestrator:
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.results: Dict[str, any] = {}
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
def register_agent(self, name: str, agent: 'Agent'):
self.agents[name] = agent
self.circuit_breakers[name] = CircuitBreaker(
failure_threshold=3,
recovery_timeout=60
)
async def dispatch(self, agent_name: str, task: Dict) -> Dict:
cb = self.circuit_breakers[agent_name]
if cb.is_open():
return {"status": "circuit_open", "agent": agent_name}
try:
result = await asyncio.wait_for(
self.agents[agent_name].process(task),
timeout=30.0
)
cb.record_success()
return result
except asyncio.TimeoutError:
cb.record_failure()
return {"status": "timeout", "agent": agent_name}
except Exception as e:
cb.record_failure()
return {"status": "error", "agent": agent_name, "error": str(e)}
ピアツーピアアーキテクチャ
建築において ピアツーピア (P2P)、エージェントは直接通信します 中央のコーディネーターなしで、彼らの間で。各エージェントは自律しており、検出して対話することができます システム内の他のエージェントと。このモデルは古典的な分散システムからインスピレーションを受けています ゴシッププロトコルやコンセンサスアルゴリズムなど。
Architettura Peer-to-Peer:
+----------+ messaggi diretti +----------+
| Agent A | <---------------------> | Agent B |
+-----+----+ +----+-----+
| |
| +----------+ |
+-------->| Agent C |<-------------+
| +----+-----+ |
| | |
| +----v-----+ |
+-------->| Agent D |<-------------+
+----------+
ピアツーピアの利点
- 単一障害点がない: 1 つのエージェントが失敗しても、他のエージェントは続行します 機能し、作業を再分配できます。
- 水平方向のスケーラビリティ: 新しいエージェントを追加する場合は変更は必要ありません アーキテクチャに接続したら、ネットワークに接続するだけです。
- 低遅延: エージェント間の直接通信により、ホップスルーが排除されます。 中央コーディネーター。
- 自然な回復力: システムは自動的に障害に適応します。 動作中のエージェントを介して通信を再ルーティングします。
ピアツーピアの欠点
- 高い複雑性: 検出、ルーティング、コンセンサスロジックが分散されています 各エージェントに存在するため、システムの開発と保守がより困難になります。
- 一貫性の課題: 自律エージェントと自律エージェントの間で一貫した状態を維持します。 分散システムの基本的な問題 (CAP 定理)。
- 複雑なデバッグ: 中央の観測点がない場合、トレースします。 コミュニケーション フローと問題の診断は大幅に困難になります。
- 通信オーバーヘッド: 各エージェントは多くのエージェントとの接続を維持する必要があります。 他のエージェントの影響により、ネットワーク リソースの消費が増加します。
メッセージキューのアーキテクチャ
に基づいたアーキテクチャ メッセージキュー メッセージブローカーを導入します(たとえば、 RabbitMQ、Apache Kafka、または Apache Pulsar) をエージェント間の仲介者として使用します。エージェント 彼らは特定のトピックまたはキューにメッセージをパブリッシュし、他のエージェントはそれらを受信するためにサブスクライブします。 このモデルはアーキテクチャを実装しています イベント駆動型 デカップリングを提供します。 優れた復元力と拡張性。
Architettura Message Queue:
+----------+ +----------+
| Agent A |---publish---> +----------------+ -->| Agent C |
+----------+ | | +----------+
| MESSAGE BROKER |
+----------+ | (Kafka/RabbitMQ| +----------+
| Agent B |---publish---> | /Pulsar) | -->| Agent D |
+----------+ +----------------+ +----------+
|
+------v-------+
| Dead Letter |
| Queue (DLQ) |
+--------------+
Topic/Code:
- tasks.research (Agent A pubblica, Agent C consuma)
- tasks.analysis (Agent A pubblica, Agent D consuma)
- results.research (Agent C pubblica, Agent B consuma)
- errors.global (tutti pubblicano, monitoring consuma)
メッセージキューの利点
- 完全なデカップリング: エージェントはお互いを直接知る必要はありません。彼らはコミュニケーションをとります トピックを通じて、システムの他の部分に影響を与えることなくエージェントを追加または削除できます。
- 本来の回復力: エージェントが 受信者が一時的にオフラインになっています。再びアクティブになると、蓄積されたメッセージが処理されます。
- 独立したスケーラビリティ: 各エージェント タイプは個別にスケーリングできます。 分析がボトルネックである場合は、分析エージェント インスタンスをさらに追加します。
- リプレイと監査: Kafka を使用すると、メッセージ履歴全体が利用可能になります。 再生、デバッグ、監査のコンプライアンス。
メッセージキューの欠点
- 運用の複雑さ- 運用環境で Kafka または RabbitMQ クラスターを管理する 重要な DevOps スキルと専用インフラストラクチャが必要です。
- 追加の遅延: シリアル化、永続化、および逆シリアル化 直接通信に比べて、メッセージの遅延が発生します。
- 可能な一貫性: システムは本質的に最終的に一貫性があります。 これは、強い一貫性を必要とするワークフローでは問題になる可能性があります。
- インフラストラクチャコスト: メッセージ ブローカーはアドオンです これにはリソース、監視、メンテナンスが必要です。
import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
class KafkaAgentBus:
def __init__(self, bootstrap_servers: str):
self.servers = bootstrap_servers
self.producer = None
self.consumers = {}
async def start(self):
self.producer = AIOKafkaProducer(
bootstrap_servers=self.servers,
value_serializer=lambda v: json.dumps(v).encode()
)
await self.producer.start()
async def publish(self, topic: str, message: Dict):
await self.producer.send_and_wait(topic, message)
async def subscribe(self, topic: str, handler):
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=self.servers,
value_deserializer=lambda v: json.loads(v.decode()),
group_id=f"agent-group-{topic}"
)
await consumer.start()
async for msg in consumer:
try:
await handler(msg.value)
except Exception as e:
# Pubblica su Dead Letter Queue
await self.publish("errors.dlq", {
"original_topic": topic,
"message": msg.value,
"error": str(e)
})
# Utilizzo
bus = KafkaAgentBus("localhost:9092")
await bus.start()
# Agent A pubblica task di ricerca
await bus.publish("tasks.research", {
"task_id": "t-001",
"query": "Best practices microservizi 2026",
"priority": "high"
})
# Agent C consuma e processa
await bus.subscribe("tasks.research", research_agent.handle)
分散状態管理
マルチエージェント システムでは、 stato それは管理するのが最もデリケートな問題です。 各エージェントは、グローバル状態の部分的または古いビューを持っている可能性があり、その結果、 一貫性のない決定。分散方式で状態を管理するには、主に 3 つのアプローチがあります。
イベントソーシング
Con l'イベントソーシング、状態は現在のスナップショットとして保存されません。 ただし、不変の一連のイベントとして。現在の状態はすべてのイベントを再生することで取得されます 最初から。このアプローチでは、完全な監査証跡と再構築機能が提供されます。 任意の時点での状態。
class EventStore:
def __init__(self):
self.events: List[Dict] = []
def append(self, event: Dict):
event["timestamp"] = datetime.utcnow().isoformat()
event["sequence"] = len(self.events)
self.events.append(event)
def get_state(self, entity_id: str) -> Dict:
"""Ricostruisci lo stato applicando tutti gli eventi"""
state = {}
for event in self.events:
if event.get("entity_id") == entity_id:
state = self._apply_event(state, event)
return state
def _apply_event(self, state: Dict, event: Dict) -> Dict:
event_type = event["type"]
if event_type == "TaskCreated":
state["status"] = "created"
state["data"] = event["data"]
elif event_type == "TaskAssigned":
state["assigned_to"] = event["agent"]
state["status"] = "assigned"
elif event_type == "TaskCompleted":
state["status"] = "completed"
state["result"] = event["result"]
return state
CQRS (コマンドクエリ責任分離)
CQRS 書き込み操作 (コマンド) を読み取り操作から分離します。 (質問)。コマンドはイベント ストアを通じて状態を変更しますが、クエリはイベント ストアから読み取ります。 最適化された投影。これにより、読み取りと書き込みを独立してスケールできるようになります。 そして、クエリの種類ごとに最適化された読み取りモデルを使用します。
- コマンド側: コマンドの検証、イベントの生成、一貫性の維持
- クエリ側: 特定のクエリ用に最適化された、具体化されたプロジェクションを読み取ります
- 予測: イベントによって非同期的に更新される非正規化ビュー
分散トランザクション: Saga パターン
操作に複数のエージェントが関与し、アトミックである必要がある場合、 サーガパターン 分散トランザクションを一連のローカル トランザクションとして処理します。 破産した場合の自己賠償訴訟。
class SagaOrchestrator:
def __init__(self):
self.steps: List[SagaStep] = []
def add_step(self, execute_fn, compensate_fn, name: str):
self.steps.append(SagaStep(execute_fn, compensate_fn, name))
async def execute(self) -> Dict:
completed = []
try:
for step in self.steps:
result = await step.execute()
completed.append((step, result))
return {"status": "success", "steps": len(completed)}
except Exception as e:
# Compensazione in ordine inverso
for step, _ in reversed(completed):
try:
await step.compensate()
except Exception as comp_error:
# Log errore compensazione, non propagare
logger.error(f"Compensation failed: {comp_error}")
return {"status": "rolled_back", "error": str(e)}
# Utilizzo: transazione multi-agente
saga = SagaOrchestrator()
saga.add_step(
execute_fn=lambda: research_agent.analyze(data),
compensate_fn=lambda: research_agent.cleanup(),
name="research"
)
saga.add_step(
execute_fn=lambda: writer_agent.generate(analysis),
compensate_fn=lambda: writer_agent.discard_draft(),
name="writing"
)
saga.add_step(
execute_fn=lambda: publisher_agent.publish(article),
compensate_fn=lambda: publisher_agent.unpublish(),
name="publishing"
)
フォールトトレランス
実稼働環境のマルチエージェント システムでは、障害は避けられません。応答する LLM エラー、ネットワークタイムアウト、不正な形式の出力を生成するエージェント、規定を超えるコストが発生します。 予算。堅牢なシステムは、伝播することなく、これらすべてのシナリオを適切に処理する必要があります。 カスケードエラー。重要なのは失敗に備えた設計です。
サーキットブレーカーのパターン
Il サーキットブレーカー エージェントの障害を監視し、数値の後に 連続エラーを構成可能。「回路を開き」、さらなる呼び出しを防止します。 クールダウン期間。これにより、エージェントの失敗によって引き起こされるカスケード効果が防止されます。 システム全体の速度低下または障害。
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normale operativita
OPEN = "open" # Circuito aperto, rifiuta chiamate
HALF_OPEN = "half_open" # Test con una singola chiamata
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = timedelta(seconds=recovery_timeout)
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.success_count = 0
def is_open(self) -> bool:
if self.state == CircuitState.OPEN:
if datetime.utcnow() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return False
return True
return False
def record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count += 1
def record_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
指数バックオフを使用した再試行ポリシー
Le 再試行ポリシー 失敗した操作をいつどのように再試行するかを定義します。 指数関数的バックオフは、あまりに頻繁な再試行によるサービスの過負荷を防ぎます。 試行の間隔を徐々に長くしていきます。
import random
async def retry_with_backoff(
fn,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
):
for attempt in range(max_retries + 1):
try:
return await fn()
except Exception as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay *= (0.5 + random.random())
logger.warning(
f"Attempt {attempt + 1}/{max_retries + 1} failed: {e}. "
f"Retrying in {delay:.1f}s"
)
await asyncio.sleep(delay)
グレースフルデグラデーション
La 優雅な劣化 システムが常に結果を生成するようにします 一部のコンポーネントに障害が発生した場合でも役立ちます。一般的なエラーを返す代わりに、 システムの出力品質は低下しますが、機能は維持されます。
- フォールバックエージェント: プライマリ エージェントに障害が発生した場合、より単純なバックアップ エージェント (ルールベースなど) が代わりに使用されます。
- 部分的な結果: 5 人のエージェントのうち 3 人がタスクを完了した場合、システムは免責事項を含む部分的な結果を返します。
- 結果キャッシュ: 以前の同様の応答がフォールバックとして返されます
- デフォルト値: 重要でないフィールドの場合、担当エージェントが応答しない場合はデフォルト値が使用されます
監視と可観測性
監視のないマルチエージェント システムは、暗闇の中で運転するようなものです。ザ」可観測性 それは、システムの出力を観察することによってシステムの内部状態を理解する能力です。私にとって マルチエージェント システムでは、ロギング、メトリクス、分散トレースの 3 つの柱が必要です。
分散トレーシング
Il 分散トレーシング すべてのエージェントを介してリクエストを追跡します。 彼らはそれを処理し、フローのエンドツーエンドの視覚化を作成します。のようなツール イェーガー o ジップキンス どこに蓄積されているかを確認できるようにする レイテンシーとエラーが発生する場所。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
tracer = trace.get_tracer("multi-agent-system")
class TracedAgent:
def __init__(self, agent, name: str):
self.agent = agent
self.name = name
async def process(self, task: Dict) -> Dict:
with tracer.start_as_current_span(
f"agent.{self.name}.process",
attributes={
"agent.name": self.name,
"agent.task_type": task.get("type", "unknown"),
"agent.task_id": task.get("id", "")
}
) as span:
try:
result = await self.agent.process(task)
span.set_attribute("agent.status", "success")
span.set_attribute("agent.tokens_used",
result.get("tokens", 0))
return result
except Exception as e:
span.set_status(trace.StatusCode.ERROR, str(e))
span.record_exception(e)
raise
マルチエージェント システムの主要な指標
標準のインフラストラクチャ指標 (CPU、メモリ、ネットワーク) に加えて、マルチエージェント システム システムの健全性とパフォーマンスを監視するには、特定のメトリックが必要です。
重要な指標
| メトリック | 説明 | ターゲット | アラーム |
|---|---|---|---|
| エージェントの待ち時間 (p95) | 95 パーセンタイルでの応答時間 | エージェントあたり 5 秒未満 | > 15秒 |
| エラー率 | エージェントあたりの失敗したタスクの割合 | < 2% | > 10% |
| トークンコスト/タスク | タスクあたりのトークンの平均コスト | 予算に応じて | > 平均の 2 倍 |
| ループ検出率 | ループのある会話の割合 | < 1% | > 5% |
| ハンドオフ成功率 | 成功した委任の割合 | > 95% | < 85% |
| エンドツーエンドの遅延 | 入力から結果までの合計時間 | 30代未満 | > 60代 |
| サーキットブレーカーが開く | ブレーカーの開放回数/時間 | 0 | > 3 / 時間 |
特化した監視プラットフォーム
マルチエージェント システムを監視するには、Grafana や Prometheus などの汎用ツールを使用します。 特殊なプラットフォームと組み合わせる:
- ラング・スミス: トレース、デバッグ、評価のための LangChain プラットフォーム LLM アプリケーション。コール チェーンの視覚化、コスト分析、A/B テストを提供します。
- ラングフューズ: 可観測性と可観測性に重点を置いた、LangSmith に代わるオープンソース AI アプリケーションの分析。
- 重みとバイアス: 実験の追跡、プロンプトのバージョン管理、および パフォーマンスを長期的に監視します。
- プロメテウス + グラファナ: インフラストラクチャとカスタム メトリクス用のクラシック スタック、 設定可能なアラート機能を備えています。
- イェーガー/ジプキン: 異なるエンドツーエンドの分散トレース用 システムエージェント。
意思決定マトリックス: 適切なアーキテクチャの選択
アーキテクチャの選択は、複数の要因に依存します。普遍的な解決策はありません。 各プロジェクトには異なる制約と優先順位があります。次の表に実践的なガイドを示します。 決定を導くために。
マルチエージェントアーキテクチャの意思決定マトリックス
| 要素 | ハブアンドスポーク | ピアツーピア | メッセージキュー |
|---|---|---|---|
| チームの規模 | エージェント2~10名 | 5~50人のエージェント | 10 ~ 100 人以上のエージェント |
| 必要なレイテンシ | 中 (10 秒未満) | 低 (<5 秒) | 耐性がある (30 代未満) |
| フォールトトレランス | 中 (HA ハブ) | 高 (ネイティブ) | 非常に高い(ブローカー) |
| 運用の複雑さ | 低い | 高い | 中~高 |
| インフラストラクチャのコスト | ベース | 中くらい | 高い |
| デバッグ | 簡単 | 難しい | 中くらい |
| スケーラビリティ | 限定 | 良い | 素晴らしい |
| 理想的な使用例 | MVP、小規模チーム | 分散システム | エンタープライズ、高信頼性 |
実践的なアドバイス
ほとんどのプロジェクトでは、次から始めます。 ハブアンドスポーク。簡単なことですが、 デバッグが簡単で、エージェントが 10 未満のシステムには十分です。スケーラビリティの場合 問題になる、~に移行する メッセージキュー。アーキテクチャ ピアツーピア マルチエージェント AI システムでは必要になることはほとんどありません。 レイテンシが重要な要素である特定の使用例でのみ考慮されます。 エージェントの数は非常に多いです。
結論
マルチエージェント オーケストレーションは、アーキテクチャとアーキテクチャの両方の専門知識を必要とする分野です。 ソフトウェアとAI。私たちが分析したパターン (順次、同時、グループ チャット、 Handoff、Plan-First) は、複雑なシステムを構築するために組み合わされる基本的な構成要素です。 3 つのアーキテクチャ (ハブ アンド スポーク、ピアツーピア、メッセージ キュー) には、異なるトレードオフがあります。 シンプルさ、拡張性、回復力の間で。
重要なメッセージは次のとおりです。 複雑さを過小評価しないでください。マルチエージェントシステム 本番環境では、単に「複数のエージェントが連携する」だけではありません。それは分散システムです。 分散システムの古典的なすべての問題 (一貫性、耐障害性、可観測性)、 LLM の非決定性によって増幅されます。耐障害性、監視、および 当初からの状態管理はオーバーエンジニアリングではなく、生き残ることです。
次の記事では、 AIエージェント用のメモリ: 装備方法 短期および長期記憶エージェント、検索 (RAG)、埋め込みおよびベクトル パターン データベース、および記憶がエージェントの推論および計画能力にどのような影響を与えるかについて説明します。







