Multi-Agent Systems: Orchestration and Communication
Multi-agent systems are one of the most explosive trends in AI in 2025. Gartner recorded a 1,445% increase in queries about "multi-agent systems" between Q1 2024 and Q2 2025. The reason is simple: many real-world problems are too complex for a single AI agent. A research agent that must collect data, analyze it, write a report and send it by email is not a single problem: it is four specialized problems requiring different skills.
In this article we build real multi-agent systems: from the basic architecture (orchestrator + specialized agents) to advanced patterns like ReAct (Reasoning + Acting), Plan-and-Execute, Supervisor-Worker and LangGraph for stateful orchestration. Every section includes working Python code and patterns tested on real problems.
What You Will Learn
- Architecture and patterns of multi-agent systems
- ReAct pattern: Reasoning + Acting with tool calling
- Plan-and-Execute: planning separated from execution
- Supervisor pattern: orchestrator coordinating specialized agents
- LangGraph for stateful orchestration with control graphs
- Communication and coordination between agents
- Error handling and fallback in multi-agent systems
- Monitoring and observability for multi-agent pipelines
1. Multi-Agent System Architecture
A multi-agent system is composed of multiple agents (LLM + set of tools) that collaborate to achieve a common goal. The key to success is the separation of responsibilities: each agent specializes in a specific domain and knows when to delegate to other agents.
MULTI-AGENT TOPOLOGIES:
1. NETWORK (fully connected):
Every agent can call any other agent.
Pro: maximum flexibility
Con: hard to control, risk of loops
A ◄──► B
▲ ▲
└──► C ◄┘
2. SUPERVISOR (star topology):
A central agent orchestrates all others.
Pro: centralized control, easy to debug
Con: single point of failure, potential bottleneck
SUPERVISOR
├──► Agent A (research)
├──► Agent B (analysis)
└──► Agent C (reporting)
3. HIERARCHICAL:
Multiple supervisors organized in hierarchy.
Pro: scalability, clear separation of concerns
Con: increased latency, coordination complexity
Manager
├──► Sub-manager A
│ ├──► Worker A1
│ └──► Worker A2
└──► Sub-manager B
├──► Worker B1
└──► Worker B2
4. PIPELINE (sequential):
Each agent processes the previous agent's output.
Pro: simple, deterministic, easy to debug
Con: rigid, no feedback loops
Input ──► A ──► B ──► C ──► Output
TOPOLOGY SELECTION:
- Well-defined tasks, clear order → Pipeline
- Dynamic routing tasks → Supervisor
- Complex problems with sub-problems → Hierarchical
- Exploratory research → Network (with guardrails)
2. ReAct Pattern: Reasoning + Acting
ReAct (Reasoning + Acting) is the fundamental pattern for AI agents with tool access. The agent alternates between Thought (reasoning about what to do), Action (executing a tool) and Observation (interpreting the result), repeating the cycle until reaching the final answer.
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain import hub
# ============================================================
# TOOL DEFINITIONS
# ============================================================
@tool
def search_web(query: str) -> str:
"""Search for information on the web. Use for current data or news."""
return f"Search results for '{query}': [simulated results for demo]"
@tool
def analyze_data(data: str, analysis_type: str) -> str:
"""
Analyze numerical or textual data.
analysis_type can be: 'summary', 'trend', 'anomaly', 'comparison'
"""
return f"{analysis_type} analysis completed for: {data}"
@tool
def generate_report(title: str, sections: str) -> str:
"""
Generate a structured report.
sections: JSON with report sections
"""
return f"Report '{title}' generated with sections: {sections}"
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Send an email. Use to communicate results to end user."""
return f"Email sent to {to} with subject '{subject}'"
@tool
def query_database(query: str, database: str = "main") -> str:
"""
Execute queries on internal databases.
database: 'main' for primary DB, 'analytics' for DW
"""
return f"Query on {database}: [simulated results]"
# ============================================================
# CREATE REACT AGENT
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tools = [search_web, analyze_data, generate_report, send_email, query_database]
# Standard ReAct prompt from LangChain Hub
react_prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm=llm, tools=tools, prompt=react_prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=10, # Prevent infinite loops
early_stopping_method="generate",
handle_parsing_errors=True
)
# Complex multi-step task
task = """
Analyze Q4 2024 sales performance from the analytics database,
search for recent industry news, generate a report with recommendations,
and send it to manager@company.com
"""
result = agent_executor.invoke({"input": task})
print(f"\nFinal answer:\n{result['output']}")
# Typical ReAct reasoning trace:
# Thought: I need to get sales data from the database first
# Action: query_database
# Action Input: {"query": "SELECT * FROM sales WHERE quarter='Q4' AND year=2024"}
# Observation: [sales data]
# Thought: Now I search for recent industry news
# Action: search_web
# ...
3. Plan-and-Execute Pattern
The Plan-and-Execute pattern separates planning from execution: first an LLM creates a detailed plan of necessary steps, then agents execute each step sequentially (or in parallel). Compared to ReAct, it allows a more holistic view of the problem and better complexity management.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import List, Dict
import json
class PlanAndExecuteAgent:
"""
Plan-and-Execute agent:
1. PLANNER: creates a step-by-step plan
2. EXECUTOR: executes each step with available tools
3. RESPONDER: synthesizes results into a final response
"""
def __init__(self, llm, tools_dict: Dict[str, callable]):
self.llm = llm
self.tools = tools_dict
self.planner_chain = (
ChatPromptTemplate.from_template("""
You are an expert planner. Create a detailed step-by-step plan to complete the task.
Available tools: {available_tools}
Task: {task}
Create a plan as a JSON list of steps. Each step must have:
- "step_id": progressive number (1, 2, 3...)
- "description": description of the action
- "tool": name of the tool to use (or "llm" for pure reasoning)
- "depends_on": list of step_ids that must complete first
Reply ONLY with JSON, nothing else.""")
| llm
| StrOutputParser()
)
self.executor_chain = (
ChatPromptTemplate.from_template("""
Execute this step of the plan:
Step: {step}
Previous step results: {previous_results}
Briefly describe what you did and the result obtained.""")
| llm
| StrOutputParser()
)
self.responder_chain = (
ChatPromptTemplate.from_template("""
Original task: {task}
Plan executed with these results:
{step_results}
Synthesize the results into a complete, well-structured response for the user.""")
| llm
| StrOutputParser()
)
def run(self, task: str) -> dict:
"""Execute task with Plan-and-Execute"""
# STEP 1: Planning
available_tools = list(self.tools.keys())
plan_json = self.planner_chain.invoke({
"task": task,
"available_tools": ", ".join(available_tools)
})
try:
clean = plan_json.strip().strip('`').strip()
plan = json.loads(clean)
except:
plan = [{"step_id": 1, "description": task, "tool": "llm", "depends_on": []}]
print(f"Plan created: {len(plan)} steps")
# STEP 2: Sequential execution
step_results = {}
for step in plan:
step_id = str(step["step_id"])
tool_name = step.get("tool", "llm")
print(f" Executing step {step_id}: {step['description'][:60]}...")
if tool_name != "llm" and tool_name in self.tools:
try:
result = self.tools[tool_name](step.get("tool_input", step["description"]))
step_results[step_id] = f"Tool {tool_name}: {result}"
except Exception as e:
step_results[step_id] = f"Error in {tool_name}: {str(e)}"
else:
result = self.executor_chain.invoke({
"step": step["description"],
"previous_results": json.dumps(step_results)
})
step_results[step_id] = result
# STEP 3: Final synthesis
final_response = self.responder_chain.invoke({
"task": task,
"step_results": json.dumps(step_results, indent=2)
})
return {"plan": plan, "step_results": step_results, "final_response": final_response}
4. LangGraph: Stateful Orchestration with Graphs
LangGraph is LangChain's library for building stateful multi-agent systems based on directed acyclic graphs (DAGs) and graphs with cycles. It allows explicitly defining the control flow between agents, managing shared state, and implementing feedback loops. It is the best choice for complex multi-agent systems in production.
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, List, Literal
import operator
# ============================================================
# SHARED STATE
# ============================================================
class AgentState(TypedDict):
"""Shared state across all agents in the graph"""
messages: Annotated[List, operator.add] # Append-only message list
task: str # Original task
research_results: str
analysis_results: str
final_report: str
next_agent: str
error_count: int
# ============================================================
# SPECIALIZED AGENTS
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def supervisor_agent(state: AgentState) -> AgentState:
"""Supervisor: decides which agent to call next"""
system_msg = SystemMessage(content="""You are a supervisor AI coordinating a team of agents.
Decide which agent to call based on the current task state.
Available agents:
- research: collects information and data
- analysis: analyzes collected data
- report_writer: writes the final report
- FINISH: task is complete
Reply with ONE name from the available options.""")
messages = [system_msg, HumanMessage(content=f"""
Task: {state['task']}
Research done: {'Yes' if state['research_results'] else 'No'}
Analysis done: {'Yes' if state['analysis_results'] else 'No'}
Report written: {'Yes' if state['final_report'] else 'No'}
Which agent should act now?""")]
response = llm.invoke(messages)
next_agent = response.content.strip().lower()
valid_agents = ["research", "analysis", "report_writer", "finish"]
if next_agent not in valid_agents:
next_agent = "research"
return {"next_agent": next_agent, "messages": [response]}
def research_agent(state: AgentState) -> AgentState:
"""Specialized research and data collection agent"""
messages = [
SystemMessage(content="You are a research AI. Collect relevant data for the task."),
HumanMessage(content=f"Gather information for: {state['task']}")
]
response = llm.invoke(messages)
return {"research_results": response.content, "messages": [response]}
def analysis_agent(state: AgentState) -> AgentState:
"""Specialized data analysis agent"""
messages = [
SystemMessage(content="You are an analysis AI. Analyze the collected data."),
HumanMessage(content=f"""
Data to analyze:
{state['research_results']}
Original task: {state['task']}
Provide structured analysis.""")
]
response = llm.invoke(messages)
return {"analysis_results": response.content, "messages": [response]}
def report_writer_agent(state: AgentState) -> AgentState:
"""Specialized report writing agent"""
messages = [
SystemMessage(content="You are a technical report writer AI."),
HumanMessage(content=f"""
Task: {state['task']}
Research: {state['research_results']}
Analysis: {state['analysis_results']}
Write a professional and complete report.""")
]
response = llm.invoke(messages)
return {"final_report": response.content, "messages": [response]}
# ============================================================
# ROUTING
# ============================================================
def route_to_agent(state: AgentState) -> Literal["research", "analysis", "report_writer", "__end__"]:
next_agent = state.get("next_agent", "research")
if next_agent == "finish" or state.get("error_count", 0) > 5:
return END
return next_agent
# ============================================================
# BUILD GRAPH
# ============================================================
workflow = StateGraph(AgentState)
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("research", research_agent)
workflow.add_node("analysis", analysis_agent)
workflow.add_node("report_writer", report_writer_agent)
workflow.set_entry_point("supervisor")
workflow.add_conditional_edges(
"supervisor",
route_to_agent,
{"research": "research", "analysis": "analysis", "report_writer": "report_writer", END: END}
)
# All agents return to supervisor
workflow.add_edge("research", "supervisor")
workflow.add_edge("analysis", "supervisor")
workflow.add_edge("report_writer", "supervisor")
app = workflow.compile()
# ============================================================
# EXECUTION
# ============================================================
initial_state = {
"task": "Analyze 2025 AI market trends and prepare an executive report",
"messages": [], "research_results": "", "analysis_results": "",
"final_report": "", "next_agent": "", "error_count": 0
}
final_state = app.invoke(initial_state, config={"recursion_limit": 20})
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
5. Agent Communication and Coordination
Effective communication between agents is crucial. There are three main patterns: message passing (agents exchange structured messages), shared state (global shared state as in LangGraph) and blackboard (shared database where agents read and write).
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from datetime import datetime
import uuid
from enum import Enum
class MessageType(Enum):
TASK = "task"
RESULT = "result"
ERROR = "error"
STATUS = "status"
QUERY = "query"
RESPONSE = "response"
@dataclass
class AgentMessage:
"""Structured message for inter-agent communication"""
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
from_agent: str = ""
to_agent: str = ""
message_type: MessageType = MessageType.TASK
content: Any = None
metadata: Dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
correlation_id: Optional[str] = None
def create_response(self, content: Any,
message_type: MessageType = MessageType.RESULT) -> 'AgentMessage':
"""Create a response to this message"""
return AgentMessage(
from_agent=self.to_agent,
to_agent=self.from_agent,
message_type=message_type,
content=content,
correlation_id=self.message_id
)
class MessageBus:
"""Message bus for async communication between agents"""
def __init__(self):
self.queues: Dict[str, List[AgentMessage]] = {}
self.history: List[AgentMessage] = []
def register_agent(self, agent_id: str):
self.queues[agent_id] = []
def send(self, message: AgentMessage):
if message.to_agent not in self.queues:
self.queues[message.to_agent] = []
self.queues[message.to_agent].append(message)
self.history.append(message)
def receive(self, agent_id: str) -> Optional[AgentMessage]:
if agent_id in self.queues and self.queues[agent_id]:
return self.queues[agent_id].pop(0)
return None
def broadcast(self, message: AgentMessage, agents: List[str]):
for agent_id in agents:
msg_copy = AgentMessage(
from_agent=message.from_agent, to_agent=agent_id,
message_type=message.message_type, content=message.content,
correlation_id=message.message_id
)
self.send(msg_copy)
# Usage example
bus = MessageBus()
bus.register_agent("supervisor")
bus.register_agent("research")
# Supervisor assigns task to research
task_msg = AgentMessage(
from_agent="supervisor", to_agent="research",
message_type=MessageType.TASK,
content="Research AI trends for 2025",
metadata={"priority": "high"}
)
bus.send(task_msg)
# Research receives and responds
received = bus.receive("research")
if received:
result_msg = received.create_response(
content="Found 50 relevant sources on AI trends 2025"
)
bus.send(result_msg)
6. Error Handling and Monitoring
Multi-agent systems introduce new failure vectors: agents stuck in loops, communication errors, malformed outputs, excessive latencies. Implementing circuit breakers, timeouts and specific monitoring is essential.
import time
from functools import wraps
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Working normally
OPEN = "open" # Blocked (too many errors)
HALF_OPEN = "half_open" # Testing: try one call
class CircuitBreaker:
"""
Circuit breaker for AI agents.
Prevents cascade failure when an agent fails repeatedly.
"""
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
elapsed = time.time() - self.last_failure_time
if elapsed > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception(
f"Circuit OPEN: agent unavailable. "
f"Retry in {self.recovery_timeout - elapsed:.0f}s"
)
try:
result = func(*args, **kwargs)
self.failure_count = 0
self.state = CircuitState.CLOSED
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
return wrapper
class MultiAgentMonitor:
"""Monitoring for multi-agent systems"""
def __init__(self):
self.agent_stats: Dict[str, Dict] = {}
def record_agent_call(self, agent_id, success, duration_ms, tokens=0):
if agent_id not in self.agent_stats:
self.agent_stats[agent_id] = {
"total_calls": 0, "successes": 0, "failures": 0,
"total_tokens": 0, "total_duration_ms": 0
}
stats = self.agent_stats[agent_id]
stats["total_calls"] += 1
stats["successes" if success else "failures"] += 1
stats["total_tokens"] += tokens
stats["total_duration_ms"] += duration_ms
def get_system_health(self) -> dict:
results = {}
for agent_id, stats in self.agent_stats.items():
total = stats["total_calls"]
success_rate = stats["successes"] / total if total > 0 else 0
results[agent_id] = {
"success_rate": success_rate,
"avg_duration_ms": stats["total_duration_ms"] / max(1, total),
"status": "healthy" if success_rate > 0.9 else
"degraded" if success_rate > 0.7 else "critical"
}
return results
7. Best Practices and Anti-Patterns
Multi-Agent Best Practices
- Single responsibility principle: each agent must have a clearly defined and limited role. An agent that does "everything" is hard to debug and improve.
- Timeout on every agent: no agent should be able to block the system indefinitely. Set realistic timeouts (10-30 seconds for LLM agents).
- Explicit, serializable state: use TypedDict or Pydantic to define state. It must be serializable (JSON) for logging, debugging and recovery.
- Circuit breaker for every agent: prevent cascade failures. If an agent fails repeatedly, the circuit breaker stops calls and allows recovery.
- Trace every decision: log every supervisor decision with explicit reasoning. Debugging multi-agent systems without traces is nearly impossible.
Anti-Patterns to Avoid
- Overly generic agents: a "universal assistant" agent is not more effective than a single LLM, but is much more expensive and harder to control.
- No iteration limit: without max_iterations, a ReAct system can loop infinitely, consuming tokens and money. Always set a limit.
- Mutable global state shared between agents: sharing mutable state introduces race conditions. Use immutable messages or centrally managed state.
- Blindly trusting agent output: an agent can produce malformed output or hallucinations. Always validate output before passing it to the next agent.
Conclusions
Multi-agent systems represent the future of AI applied to complex problems. We explored fundamental patterns (ReAct, Plan-and-Execute, Supervisor), LangGraph for stateful orchestration, inter-agent communication patterns, and error handling in production.
Key takeaways:
- Choose the right topology: pipeline for linear flows, supervisor for dynamic routing
- ReAct is excellent for exploratory tasks with tools; Plan-and-Execute for structured tasks
- LangGraph is the best choice for complex production systems
- Each agent must have a single, well-defined role
- Circuit breakers and timeouts are indispensable in production
- Trace every supervisor decision for effective debugging
In the next article we will explore Prompt Engineering in Production: templates, versioning and systematic testing to ensure quality and consistency over time.
Continue the Series
- Article 7: Context Window Management
- Article 8: Multi-Agent Systems (current)
- Article 9: Prompt Engineering in Production
- Article 10: Knowledge Graphs for AI







