소개: 이론에서 생산까지
이전 기사에서 우리는 AI 에이전트 구축을 위한 개별 프레임워크인 LangChain과 그래프 기반 워크플로를 위한 LangGraph, 구조화된 팀을 위한 CrewAI, 대화를 위한 AutoGen 다중 에이전트. 이제 가장 복잡한 문제에 직면해 보겠습니다. 다중 에이전트 시스템을 조정하는 방법 생산 중, 신뢰성, 확장성 및 관찰 가능성은 선택 사항이 아닙니다. 그러나 기본 요구 사항.
2~3개의 에이전트가 포함된 프로토타입에서 수십 개의 에이전트가 포함된 프로덕션 지원 시스템으로 전환 조정에는 상당한 아키텍처적 도약이 필요합니다. 오케스트레이션 패턴은 이제 표준화될 만큼 충분히 성숙했는지 여부를 결정하는 것은 올바른 아키텍처의 선택입니다. 전체 프로젝트의 성공 또는 실패. 흔히 저지르는 실수는 복잡성을 과소평가하는 것입니다. 최근 연구에 따르면 순진한 접근 방식("에이전트 가방")은 17배 오류 에스컬레이션, 각 에이전트가 문제를 증폭시키는 경우 다른 사람들의 문제를 해결하는 대신.
이 기사에서는 5가지 표준 오케스트레이션 패턴, 3가지 아키텍처를 분석합니다. 기본(허브 앤 스포크, 피어 투 피어, 메시지 큐), 상태 관리 전략 분산, 내결함성 및 모니터링. 목표는 다음과 같은 실질적인 지침을 제공하는 것입니다. 강력하고 유지 관리가 가능한 다중 에이전트 시스템을 설계합니다.
이 기사에서 배울 내용
- 5가지 표준 오케스트레이션 패턴: 순차, 동시, 그룹 채팅, 핸드오프, 계획 우선
- 허브 앤 스포크 아키텍처: 중앙 코디네이터를 사용하는 시기와 방법
- Peer-to-Peer 아키텍처: 에이전트 간 직접 통신
- 메시지 큐 아키텍처: RabbitMQ, Kafka, Pulsar를 사용한 분리 및 복원력
- 분산 상태 관리: 이벤트 소싱, CQRS, 분산 트랜잭션
- 내결함성: 회로 차단기, 시간 초과, 재시도 정책, 점진적 성능 저하
- 모니터링 및 관찰 가능성: 분산 추적, 특수 측정항목 및 대시보드
- 결정 매트릭스: 사용 사례에 적합한 아키텍처를 선택하는 방법
5가지 표준 오케스트레이션 패턴
다중 에이전트 오케스트레이션은 다섯 가지 기본 패턴을 기반으로 하며 각 패턴은 다음에 최적화되어 있습니다. 특정 유형의 작업 부하. 복잡한 시스템에서는 다음 패턴이 결합됩니다. 주요 흐름에는 순차 패턴을 사용할 수 있고 하위 작업 병렬화에는 동시 패턴을 사용할 수 있습니다. 전문적인 결정을 위임하기 위한 핸드오프(Handoff).
오케스트레이션의 5가지 패턴
| 패턴 | 흐름 | 찬성 | 에 맞서 | 사용 사례 |
|---|---|---|---|---|
| 잇달아 일어나는 | A → B → C | 단순함, 예측 가능 | 느리고 유사점 없음 | ETL 파이프라인, 콘텐츠 워크플로우 |
| 경쟁 상대 | A, B, C 병렬 | 빠르고 효율적 | 동기화 복잡성 | 다중 소스 분석, 검색 |
| 그룹채팅 | 협업 스레드 | 유연한 신흥 | 예측할 수 없고 비싸다 | 브레인스토밍, 반복 검토 |
| 핸드오프 | 동적 위임 | 적응형, 전문화형 | 복잡한 라우팅 | 고객 지원, 분류 |
| 계획 우선 | 계획 → 실행 | 전략적, 최적화됨 | 초기 간접비 | 복잡한 작업, 디자인 |
패턴 1: 순차적
패턴에서는 잇달아 일어나는, 에이전트는 선형 체인에서 작동합니다. 출력 에이전트 A의 입력은 에이전트 B의 입력이 되고, 그 출력은 에이전트 C의 입력이 됩니다. 각 에이전트는 이전 에이전트의 결과를 강화, 변환 또는 검증합니다. 패턴이에요 구현 및 디버깅이 더 쉽습니다.
# Pattern Sequential: Pipeline di Content Creation
from typing import List, Dict
class SequentialPipeline:
def __init__(self, agents: List):
self.agents = agents
async def execute(self, initial_input: str) -> Dict:
current_output = initial_input
results = []
for agent in self.agents:
result = await agent.process(current_output)
results.append({
"agent": agent.name,
"input": current_output,
"output": result
})
current_output = result
return {
"final_output": current_output,
"pipeline_trace": results
}
# Utilizzo: Researcher -> Writer -> Editor -> Publisher
pipeline = SequentialPipeline([
ResearchAgent("Researcher"),
WriterAgent("Writer"),
EditorAgent("Editor"),
PublisherAgent("Publisher")
])
result = await pipeline.execute(
"Scrivi un articolo sulle best practices di Kubernetes"
)
순차 패턴의 장점은 완벽한 추적성: 각 단계마다 입력, 출력 및 담당 에이전트를 정확히 알고 있습니다. 가장 큰 단점은 총 시간은 병렬 처리 가능성 없이 모든 에이전트의 시간을 합한 것입니다.
패턴 2: 동시
패턴에서는 경쟁 상대, 여러 에이전트가 독립적인 작업을 동시에 수행합니다. 그런 다음 결과는 전용 구성 요소에 의해 집계됩니다. 이 패턴은 다음과 같은 경우에 이상적입니다. 하위 작업은 서로 종속성이 없으며 속도가 중요합니다.
import asyncio
from typing import List, Dict
class ConcurrentOrchestrator:
def __init__(self, agents: List, aggregator):
self.agents = agents
self.aggregator = aggregator
async def execute(self, task: str) -> Dict:
# Esecuzione parallela di tutti gli agenti
tasks = [agent.process(task) for agent in self.agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filtraggio errori e aggregazione
successful = []
errors = []
for agent, result in zip(self.agents, results):
if isinstance(result, Exception):
errors.append({"agent": agent.name, "error": str(result)})
else:
successful.append({"agent": agent.name, "result": result})
# Aggregazione dei risultati
final = await self.aggregator.aggregate(successful)
return {
"final_output": final,
"successful_agents": len(successful),
"failed_agents": len(errors),
"errors": errors
}
# Utilizzo: 3 agenti analizzano lo stesso topic da angolazioni diverse
orchestrator = ConcurrentOrchestrator(
agents=[
TechnicalAnalyst("TechAnalyst"),
MarketAnalyst("MarketAnalyst"),
RiskAnalyst("RiskAnalyst")
],
aggregator=SynthesisAgent("Synthesizer")
)
패턴 3: 그룹 채팅
패턴 그룹채팅 에이전트가 협업 스레드를 생성합니다. AutoGen에서 본 것처럼 그들은 자유롭게 대화합니다. 가장 유연한 패턴입니다 하지만 예측하기 가장 어렵습니다. 결과의 품질은 다음에 따라 크게 달라집니다. 역할 및 종료 기준 정의.
패턴 4: 핸드오프
패턴에서는 핸드오프, 라우팅 에이전트가 작업을 분석하고 위임합니다. 가장 적합한 전문 에이전트에게 문의하세요. 고객 시스템의 지배적인 패턴입니다. 분류 에이전트가 요청을 분류하여 올바른 부서로 라우팅하는 AI를 지원합니다.
class HandoffRouter:
def __init__(self, specialists: Dict[str, 'Agent']):
self.specialists = specialists
self.router_llm = RouterLLM()
async def route(self, task: str) -> Dict:
# L'LLM determina quale specialista gestire il task
classification = await self.router_llm.classify(task)
specialist_name = classification["specialist"]
confidence = classification["confidence"]
if confidence < 0.7:
# Bassa confidenza: escalation a un umano
return {
"status": "escalated",
"reason": "Low confidence routing",
"confidence": confidence
}
specialist = self.specialists.get(specialist_name)
if not specialist:
return {"status": "error", "reason": f"No specialist: {specialist_name}"}
result = await specialist.handle(task)
return {
"status": "completed",
"specialist": specialist_name,
"confidence": confidence,
"result": result
}
# Configurazione per customer support
router = HandoffRouter({
"billing": BillingAgent("BillingSpecialist"),
"technical": TechnicalAgent("TechSupport"),
"sales": SalesAgent("SalesSpecialist"),
"general": GeneralAgent("GeneralSupport")
})
패턴 5: 계획 우선
패턴에서는 계획 우선, 기획 담당자가 복잡한 작업을 분석하고, 이를 하위 작업으로 분해하고 종속성과 실행 순서를 정의한 다음 오케스트레이터가 계획을 수행합니다. 이 패턴은 다음과 같은 복잡한 작업의 기본입니다. 행동하기 전에 전략이 필요합니다.
class PlanFirstOrchestrator:
def __init__(self, planner, executor_pool: Dict[str, 'Agent']):
self.planner = planner
self.executor_pool = executor_pool
async def execute(self, complex_task: str) -> Dict:
# Fase 1: Pianificazione
plan = await self.planner.create_plan(complex_task)
# plan = [
# {"step": 1, "agent": "researcher", "task": "...", "deps": []},
# {"step": 2, "agent": "analyst", "task": "...", "deps": [1]},
# {"step": 3, "agent": "writer", "task": "...", "deps": [1, 2]}
# ]
# Fase 2: Esecuzione rispettando le dipendenze
completed = {}
for step in self._topological_sort(plan):
# Attendi dipendenze
dep_results = {
d: completed[d] for d in step["deps"]
}
agent = self.executor_pool[step["agent"]]
result = await agent.process(
task=step["task"],
context=dep_results
)
completed[step["step"]] = result
return {"plan": plan, "results": completed}
def _topological_sort(self, plan):
"""Ordina i passi rispettando le dipendenze"""
# Implementazione topological sort per DAG
sorted_steps = []
visited = set()
# ... sorting logic ...
return sorted_steps
허브 앤 스포크 아키텍처
아키텍처 허브 앤 스포크 중앙 조정자(허브)를 제공합니다. 작업자 에이전트 그룹(스포크)을 관리합니다. 모든 통신은 허브를 통해 이루어집니다. 에이전트는 서로 직접 통신하지 않습니다. 이것은 가장 건축적인 모델입니다 프로덕션의 다중 에이전트 시스템에서 일반적입니다.
Architettura Hub-and-Spoke:
+------------------+
| ORCHESTRATOR |
| (Hub) |
+--------+---------+
|
+---------+-------+-------+---------+
| | | |
+-----v----+ +--v------+ +----v-----+ +-v--------+
| Agent A | | Agent B | | Agent C | | Agent D |
| (Search) | | (Analyze| | (Write) | | (Review) |
+----------+ +---------+ +----------+ +----------+
(Spoke) (Spoke) (Spoke) (Spoke)
허브 앤 스포크의 장점
- 중앙 집중식 제어: 허브는 모든 사람의 상태를 완벽하게 보여줍니다. 상담원은 라우팅 및 우선순위에 대해 정보를 바탕으로 결정을 내릴 수 있습니다.
- 간단한 모니터링: 모든 통신은 단일 지점을 통과합니다. 로깅, 추적 및 감사를 간단하게 만듭니다.
- 중앙 집중식 오류 관리: 허브는 재시도, 폴백 및 모든 에이전트에 대해 일관되게 회로 차단기를 적용합니다.
- 쉬운 진화: 작업자 에이전트를 추가하거나 제거하는 것은 그만큼 간단합니다. 다른 에이전트를 변경하지 않고 허브에 등록하세요.
허브 앤 스포크의 단점
- 단일 실패 지점: 허브에 장애가 발생하면 전체 시스템이 중지됩니다. 허브에 HA(고가용성)가 필요하므로 복잡성이 증가합니다.
- 병목: 에이전트 수가 많아지면 허브에서 병목 현상이 발생합니다. 또는 통신 빈도가 크게 증가합니다.
- 제한된 확장성: 특정 에이전트 수 이상으로 확장하려면 다음이 필요합니다. 허브를 샤딩하거나 다른 아키텍처로 마이그레이션합니다.
- 추가 대기 시간: 에이전트 간의 각 통신에는 홉스루가 추가됩니다. 허브로 인해 엔드투엔드 대기 시간이 늘어납니다.
class HubOrchestrator:
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.results: Dict[str, any] = {}
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
def register_agent(self, name: str, agent: 'Agent'):
self.agents[name] = agent
self.circuit_breakers[name] = CircuitBreaker(
failure_threshold=3,
recovery_timeout=60
)
async def dispatch(self, agent_name: str, task: Dict) -> Dict:
cb = self.circuit_breakers[agent_name]
if cb.is_open():
return {"status": "circuit_open", "agent": agent_name}
try:
result = await asyncio.wait_for(
self.agents[agent_name].process(task),
timeout=30.0
)
cb.record_success()
return result
except asyncio.TimeoutError:
cb.record_failure()
return {"status": "timeout", "agent": agent_name}
except Exception as e:
cb.record_failure()
return {"status": "error", "agent": agent_name, "error": str(e)}
피어 투 피어 아키텍처
건축에서는 피어 투 피어(P2P), 상담원이 직접 통신합니다. 중앙 코디네이터 없이 그들 사이에서. 각 에이전트는 자율적이며 검색하고 상호 작용할 수 있습니다. 시스템의 다른 에이전트와 함께. 이 모델은 고전적인 분산 시스템에서 영감을 받았습니다. 가십 프로토콜 및 합의 알고리즘과 같은 것입니다.
Architettura Peer-to-Peer:
+----------+ messaggi diretti +----------+
| Agent A | <---------------------> | Agent B |
+-----+----+ +----+-----+
| |
| +----------+ |
+-------->| Agent C |<-------------+
| +----+-----+ |
| | |
| +----v-----+ |
+-------->| Agent D |<-------------+
+----------+
피어 투 피어의 장점
- 단일 장애 지점 없음: 한 에이전트가 실패하면 다른 에이전트가 계속 진행됩니다. 기능을 수행하고 작업을 재분배할 수 있습니다.
- 수평적 확장성: 새 에이전트를 추가하면 변경이 필요하지 않습니다. 아키텍처에 맞게 네트워크에 연결하기만 하면 됩니다.
- 낮은 대기 시간: 에이전트 간 직접 통신으로 홉스루(hop through)를 제거합니다. 중앙 코디네이터.
- 자연적인 회복력: 시스템이 자동으로 결함에 적응합니다. 작업 중인 에이전트를 통해 통신 경로를 다시 지정합니다.
피어 투 피어의 단점
- 높은 복잡성: 검색, 라우팅, 합의 로직이 분산되어 있습니다. 각 에이전트마다 시스템을 개발하고 유지 관리하기가 더 어려워집니다.
- 일관성 문제: 자율 에이전트와 에이전트 간의 일관된 상태를 유지합니다. 분산 시스템의 근본적인 문제(CAP 정리).
- 복잡한 디버깅: 중앙 관측점 없이 추적 의사소통 흐름과 문제 진단이 훨씬 더 어렵습니다.
- 통신 오버헤드: 각 에이전트는 다수의 에이전트와 연결을 유지해야 합니다. 다른 에이전트로 인해 네트워크 리소스 소비가 증가합니다.
메시지 대기열 아키텍처
기반으로 한 아키텍처 메시지 대기열 메시지 브로커를 소개합니다(예: RabbitMQ, Apache Kafka 또는 Apache Pulsar)를 에이전트 간의 중개자로 사용합니다. 에이전트 그들은 특정 주제나 대기열에 메시지를 게시하고 다른 에이전트는 이를 수신하기 위해 구독합니다. 이 모델은 아키텍처를 구현합니다. 이벤트 중심 디커플링을 제공하는, 복원력과 확장성이 뛰어납니다.
Architettura Message Queue:
+----------+ +----------+
| Agent A |---publish---> +----------------+ -->| Agent C |
+----------+ | | +----------+
| MESSAGE BROKER |
+----------+ | (Kafka/RabbitMQ| +----------+
| Agent B |---publish---> | /Pulsar) | -->| Agent D |
+----------+ +----------------+ +----------+
|
+------v-------+
| Dead Letter |
| Queue (DLQ) |
+--------------+
Topic/Code:
- tasks.research (Agent A pubblica, Agent C consuma)
- tasks.analysis (Agent A pubblica, Agent D consuma)
- results.research (Agent C pubblica, Agent B consuma)
- errors.global (tutti pubblicano, monitoring consuma)
메시지 대기열의 장점
- 총 디커플링: 상담원이 서로를 직접 알 필요는 없습니다. 그들은 의사소통을 한다 주제를 통해 시스템의 나머지 부분에 영향을 주지 않고 상담원을 추가하거나 제거할 수 있습니다.
- 기본 탄력성: 에이전트가 삭제된 경우에도 메시지는 대기열에 유지됩니다. 수신자가 일시적으로 오프라인 상태입니다. 다시 활성화되면 누적된 메시지를 처리합니다.
- 독립적인 확장성: 각 에이전트 유형은 독립적으로 확장될 수 있습니다. 분석으로 인해 병목 현상이 발생하는 경우 분석 에이전트 인스턴스를 더 추가합니다.
- 재생 및 감사: Kafka를 사용하면 전체 메시지 기록을 사용할 수 있습니다. 재생, 디버깅 및 감사 규정 준수.
메시지 대기열의 단점
- 운영 복잡성- 프로덕션에서 Kafka 또는 RabbitMQ 클러스터를 관리합니다. 상당한 DevOps 기술과 전용 인프라가 필요합니다.
- 추가 대기 시간: 직렬화, 지속성 및 역직렬화 직접 통신에 비해 메시지의 대기 시간이 발생합니다.
- 가능한 일관성: 시스템은 본질적으로 최종 일관성이 있습니다. 이는 강력한 일관성이 필요한 워크플로에 문제가 될 수 있습니다.
- 인프라 비용: 메시지 브로커는 추가 기능입니다. 리소스, 모니터링 및 유지 관리가 필요합니다.
import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
class KafkaAgentBus:
def __init__(self, bootstrap_servers: str):
self.servers = bootstrap_servers
self.producer = None
self.consumers = {}
async def start(self):
self.producer = AIOKafkaProducer(
bootstrap_servers=self.servers,
value_serializer=lambda v: json.dumps(v).encode()
)
await self.producer.start()
async def publish(self, topic: str, message: Dict):
await self.producer.send_and_wait(topic, message)
async def subscribe(self, topic: str, handler):
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=self.servers,
value_deserializer=lambda v: json.loads(v.decode()),
group_id=f"agent-group-{topic}"
)
await consumer.start()
async for msg in consumer:
try:
await handler(msg.value)
except Exception as e:
# Pubblica su Dead Letter Queue
await self.publish("errors.dlq", {
"original_topic": topic,
"message": msg.value,
"error": str(e)
})
# Utilizzo
bus = KafkaAgentBus("localhost:9092")
await bus.start()
# Agent A pubblica task di ricerca
await bus.publish("tasks.research", {
"task_id": "t-001",
"query": "Best practices microservizi 2026",
"priority": "high"
})
# Agent C consuma e processa
await bus.subscribe("tasks.research", research_agent.handle)
분산 상태 관리
다중 에이전트 시스템에서는 상태 관리하기 가장 민감한 문제입니다. 각 에이전트는 전역 상태에 대해 부분적이거나 오래된 보기를 가질 수 있습니다. 일관되지 않은 결정. 분산 방식으로 상태를 관리하는 세 가지 주요 접근 방식이 있습니다.
이벤트 소싱
와 함께이벤트 소싱, 상태는 현재 스냅샷으로 저장되지 않습니다. 그러나 불변의 사건의 연속이다. 모든 이벤트를 재생하여 현재 상태를 얻습니다. 처음부터. 이 접근 방식은 완전한 감사 추적과 재구축 기능을 제공합니다. 어느 시점의 상태.
class EventStore:
def __init__(self):
self.events: List[Dict] = []
def append(self, event: Dict):
event["timestamp"] = datetime.utcnow().isoformat()
event["sequence"] = len(self.events)
self.events.append(event)
def get_state(self, entity_id: str) -> Dict:
"""Ricostruisci lo stato applicando tutti gli eventi"""
state = {}
for event in self.events:
if event.get("entity_id") == entity_id:
state = self._apply_event(state, event)
return state
def _apply_event(self, state: Dict, event: Dict) -> Dict:
event_type = event["type"]
if event_type == "TaskCreated":
state["status"] = "created"
state["data"] = event["data"]
elif event_type == "TaskAssigned":
state["assigned_to"] = event["agent"]
state["status"] = "assigned"
elif event_type == "TaskCompleted":
state["status"] = "completed"
state["result"] = event["result"]
return state
CQRS(명령 쿼리 책임 분리)
CQRS 읽기 작업에서 쓰기 작업(명령)을 분리합니다. (쿼리). 명령은 이벤트 저장소를 통해 상태를 변경하는 반면 쿼리는 이벤트 저장소에서 읽혀집니다. 최적화된 예측. 이를 통해 읽기 및 쓰기를 독립적으로 확장할 수 있습니다. 각 쿼리 유형에 최적화된 읽기 모델을 갖습니다.
- 지휘측: 명령의 유효성을 검사하고, 이벤트를 생성하고, 일관성을 유지합니다.
- 쿼리 측: 특정 쿼리에 최적화된 구체화된 예측을 읽습니다.
- 예상: 이벤트에 의해 비동기적으로 업데이트되는 비정규화된 뷰
분산 트랜잭션: 사가 패턴
작업에 여러 에이전트가 포함되고 원자성이어야 하는 경우 사가 패턴 분산 트랜잭션을 일련의 로컬 트랜잭션으로 처리합니다. 파산시 자체 보상 조치.
class SagaOrchestrator:
def __init__(self):
self.steps: List[SagaStep] = []
def add_step(self, execute_fn, compensate_fn, name: str):
self.steps.append(SagaStep(execute_fn, compensate_fn, name))
async def execute(self) -> Dict:
completed = []
try:
for step in self.steps:
result = await step.execute()
completed.append((step, result))
return {"status": "success", "steps": len(completed)}
except Exception as e:
# Compensazione in ordine inverso
for step, _ in reversed(completed):
try:
await step.compensate()
except Exception as comp_error:
# Log errore compensazione, non propagare
logger.error(f"Compensation failed: {comp_error}")
return {"status": "rolled_back", "error": str(e)}
# Utilizzo: transazione multi-agente
saga = SagaOrchestrator()
saga.add_step(
execute_fn=lambda: research_agent.analyze(data),
compensate_fn=lambda: research_agent.cleanup(),
name="research"
)
saga.add_step(
execute_fn=lambda: writer_agent.generate(analysis),
compensate_fn=lambda: writer_agent.discard_draft(),
name="writing"
)
saga.add_step(
execute_fn=lambda: publisher_agent.publish(article),
compensate_fn=lambda: publisher_agent.unpublish(),
name="publishing"
)
내결함성
프로덕션 환경의 다중 에이전트 시스템에서는 실패가 불가피합니다. 응답하는 LLM 오류, 네트워크 시간 초과, 잘못된 출력을 생성하는 에이전트, 예산. 강력한 시스템은 전파 없이 이러한 모든 시나리오를 우아하게 처리해야 합니다. 계단식 오류. 핵심은 실패를 위한 설계입니다.
회로 차단기 패턴
Il 회로 차단기 에이전트의 실패를 모니터링하고, 이후에는 연속 오류 구성 가능, "회로 개방"으로 추가 호출 방지 쿨다운 기간. 이는 실패한 에이전트로 인해 다음이 발생하는 연쇄 효과를 방지합니다. 전체 시스템의 속도 저하 또는 오류.
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normale operativita
OPEN = "open" # Circuito aperto, rifiuta chiamate
HALF_OPEN = "half_open" # Test con una singola chiamata
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = timedelta(seconds=recovery_timeout)
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.success_count = 0
def is_open(self) -> bool:
if self.state == CircuitState.OPEN:
if datetime.utcnow() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return False
return True
return False
def record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count += 1
def record_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
지수 백오프를 사용한 재시도 정책
Le 재시도 정책 실패한 작업을 재시도하는 방법과 시기를 정의합니다. 지수 백오프는 너무 잦은 재시도로 인해 서비스가 과부하되는 것을 방지합니다. 시도 간격을 점차적으로 늘립니다.
import random
async def retry_with_backoff(
fn,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
):
for attempt in range(max_retries + 1):
try:
return await fn()
except Exception as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay *= (0.5 + random.random())
logger.warning(
f"Attempt {attempt + 1}/{max_retries + 1} failed: {e}. "
f"Retrying in {delay:.1f}s"
)
await asyncio.sleep(delay)
우아한 저하
La 우아한 저하 시스템이 항상 결과를 생성하도록 보장 일부 구성 요소에 오류가 발생하는 경우에도 유용합니다. 일반적인 오류를 반환하는 대신, 시스템은 출력 품질을 저하시키지만 기능은 유지합니다.
- 대체 에이전트: 기본 에이전트가 실패하면 더 간단한 백업 에이전트(예: 규칙 기반)가 대신됩니다.
- 부분적인 결과: 에이전트 5명 중 3명이 작업을 완료하면 시스템은 고지 사항과 함께 부분 결과를 반환합니다.
- 결과 캐시: 이전의 유사한 응답이 대체 항목으로 반환됩니다.
- 기본값: 중요하지 않은 필드의 경우 담당 상담원이 응답하지 않는 경우 기본값이 사용됩니다.
모니터링 및 관찰 가능성
모니터링이 없는 다중 에이전트 시스템은 어둠 속에서 운전하는 것과 같습니다. 그만큼'관찰 가능성 출력을 관찰하여 시스템의 내부 상태를 이해하는 능력입니다. 나를 위해 다중 에이전트 시스템에는 로깅, 메트릭, 분산 추적이라는 세 가지 요소가 필요합니다.
분산 추적
Il 분산 추적 모든 에이전트를 통해 요청을 추적합니다. 그들은 이를 처리하여 흐름의 엔드투엔드 시각화를 생성합니다. 다음과 같은 도구 저격병 o 집킨스 어디에 쌓였는지 볼 수 있게 해주세요 대기 시간 및 오류가 발생하는 위치.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
tracer = trace.get_tracer("multi-agent-system")
class TracedAgent:
def __init__(self, agent, name: str):
self.agent = agent
self.name = name
async def process(self, task: Dict) -> Dict:
with tracer.start_as_current_span(
f"agent.{self.name}.process",
attributes={
"agent.name": self.name,
"agent.task_type": task.get("type", "unknown"),
"agent.task_id": task.get("id", "")
}
) as span:
try:
result = await self.agent.process(task)
span.set_attribute("agent.status", "success")
span.set_attribute("agent.tokens_used",
result.get("tokens", 0))
return result
except Exception as e:
span.set_status(trace.StatusCode.ERROR, str(e))
span.record_exception(e)
raise
다중 에이전트 시스템의 주요 지표
표준 인프라 지표(CPU, 메모리, 네트워크) 외에도 다중 에이전트 시스템 시스템 상태와 성능을 모니터링하려면 특정 측정항목이 필요합니다.
필수 지표
| 미터법 | 설명 | 목표 | 경보 |
|---|---|---|---|
| 에이전트 지연 시간(p95) | 95번째 백분위수에서의 응답 시간 | 에이전트당 5초 미만 | > 15초 |
| 오류율 | 에이전트당 실패한 작업 비율 | < 2% | > 10% |
| 토큰 비용 / 작업 | 작업당 토큰의 평균 비용 | 예산에 따른 | > 평균 2배 |
| 루프 감지율 | 루프가 있는 대화 비율 | < 1% | > 5% |
| 핸드오프 성공률 | 성공적인 위임 비율 | > 95% | < 85% |
| 종단 간 지연 시간 | 입력부터 결과까지의 총 시간 | 30초 미만 | > 60대 |
| 회로 차단기 열림 | 차단기 개방횟수/시간 | 0 | > 3/시간 |
전문 모니터링 플랫폼
다중 에이전트 시스템을 모니터링하려면 Grafana 및 Prometheus와 같은 일반 도구가 필요합니다. 전문 플랫폼과 결합:
- 랭스미스: 추적, 디버깅, 평가를 위한 LangChain 플랫폼 LLM 응용 프로그램. 콜 체인 시각화, 비용 분석 및 A/B 테스트를 제공합니다.
- 랭퓨즈: 관찰 가능성에 초점을 맞춘 LangSmith의 오픈 소스 대안 AI 애플리케이션을 위한 분석.
- 가중치 및 편향: 실험 추적, 신속한 버전 관리 및 시간 경과에 따른 성능 모니터링.
- 프로메테우스 + 그라파나: 인프라 및 사용자 정의 지표를 위한 클래식 스택, 구성 가능한 경고 포함.
- 예거/집킨: 서로 다른 엔드투엔드 분산 추적용 시스템 에이전트.
결정 매트릭스: 올바른 아키텍처 선택
아키텍처 선택은 여러 요인에 따라 달라집니다. 보편적인 해결책은 없습니다: 각 프로젝트에는 서로 다른 제약 조건과 우선 순위가 있습니다. 다음 표는 실용적인 가이드를 제공합니다 결정을 안내합니다.
다중 에이전트 아키텍처에 대한 결정 매트릭스
| 요인 | 허브 앤 스포크 | 피어 투 피어 | 메시지 대기열 |
|---|---|---|---|
| 팀 규모 | 에이전트 2~10명 | 5~50명의 에이전트 | 10~100명 이상의 상담원 |
| 필요한 지연 시간 | 중간(10초 미만) | 낮음(5초 미만) | 관대함(30초 미만) |
| 내결함성 | 중간(HA 허브) | 높음(기본) | 매우 높음(브로커) |
| 운영 복잡성 | 낮은 | 높은 | 중간-높음 |
| 인프라 비용 | 베이스 | 중간 | 높은 |
| 디버깅 | 쉬운 | 어려운 | 중간 |
| 확장성 | 제한된 | 좋은 | 훌륭한 |
| 이상적인 사용 사례 | MVP, 소규모 팀 | 분산 시스템 | 기업, 높은 신뢰성 |
실용적인 조언
대부분의 프로젝트에서는 다음으로 시작합니다. 허브 앤 스포크. 그것은 간단하다. 디버깅하기 쉽고 에이전트가 10개 미만인 시스템에 충분합니다. 확장성이 있는 경우 문제가 되어 다음 방향으로 이동합니다. 메시지 대기열. 아키텍처 피어 투 피어 다중 에이전트 AI 시스템에는 거의 필요하지 않으며, 대기 시간이 중요한 요소인 특정 사용 사례에만 고려됩니다. 상담원 수가 매우 많습니다.
결론
다중 에이전트 오케스트레이션은 아키텍처 및 아키텍처 전문 지식이 모두 필요한 분야입니다. 소프트웨어와 AI. 우리가 분석한 패턴(순차, 동시, 그룹 채팅, Handoff, Plan-First)는 복잡한 시스템을 구축하기 위해 결합되는 기본 빌딩 블록입니다. 세 가지 아키텍처(허브 앤 스포크, 피어 투 피어, 메시지 큐)는 서로 다른 장단점을 제공합니다. 단순성, 확장성 및 탄력성 사이.
핵심 메시지는 다음과 같습니다. 복잡성을 과소평가하지 마세요. 다중 에이전트 시스템 프로덕션에서는 단순히 "여러 에이전트가 협업"하는 것이 아닙니다. 이는 분산 시스템입니다. 분산 시스템의 모든 고전적인 문제(일관성, 내결함성, 관찰 가능성) LLM의 비결정성에 의해 증폭됩니다. 내결함성, 모니터링 및 처음부터 상태 관리는 과도한 엔지니어링이 아니라 생존입니다.
다음 기사에서는 AI 에이전트용 메모리: 장비하는 방법 단기 및 장기 기억 에이전트, 검색(RAG), 임베딩 및 벡터 패턴 데이터베이스, 그리고 기억이 에이전트의 추론 및 계획 능력에 어떻게 영향을 미치는지.







