Why Multi-Agent?
Single agents hit limits. They can't maintain context across long tasks, struggle with diverse skill requirements, and become unreliable as complexity grows.
Multi-agent systems solve this by dividing work among specialized agents. Here's how to design them.
Architecture Patterns
Pattern 1: Sequential Pipeline
Agents process in sequence, each adding value:
class SequentialPipeline: def __init__(self, agents: list[Agent]): self.agents = agents async def run(self, input: str) -> str: current = input for agent in self.agents: current = await agent.process(current) return current # Example: Document processing pipeline pipeline = SequentialPipeline([ ExtractionAgent(), # Extract key information ValidationAgent(), # Verify accuracy SummarizationAgent(), # Create summary FormattingAgent() # Format output ])
Use when: Tasks have clear stages, each building on the previous.
Pattern 2: Router/Specialist
A router agent directs queries to specialists:
class RouterArchitecture: def __init__( self, router: RouterAgent, specialists: dict[str, Agent] ): self.router = router self.specialists = specialists async def run(self, query: str) -> str: # Router decides which specialist to use routing_decision = await self.router.route(query) specialist = self.specialists[routing_decision.specialist] # Specialist handles the query return await specialist.process( query, context=routing_decision.context ) # Example: Customer service architecture = RouterArchitecture( router=IntentRouter(), specialists={ "billing": BillingAgent(), "technical": TechnicalSupportAgent(), "sales": SalesAgent(), "general": GeneralAgent() } )
Use when: Different query types need different expertise.
Pattern 3: Hierarchical Delegation
A manager agent breaks down tasks and delegates:
class HierarchicalSystem: def __init__(self, manager: ManagerAgent, workers: list[Agent]): self.manager = manager self.workers = workers async def run(self, task: str) -> str: # Manager creates execution plan plan = await self.manager.plan(task) results = {} for subtask in plan.subtasks: worker = self.select_worker(subtask) results[subtask.id] = await worker.execute(subtask) # Manager synthesizes results return await self.manager.synthesize(task, results) # Example: Research task system = HierarchicalSystem( manager=ResearchManager(), workers=[ WebSearchAgent(), DocumentAnalysisAgent(), DataExtractionAgent(), SynthesisAgent() ] )
Use when: Complex tasks need decomposition and synthesis.
Pattern 4: Collaborative Discussion
Agents discuss and refine together:
class CollaborativeDiscussion: def __init__( self, agents: list[Agent], moderator: ModeratorAgent, max_rounds: int = 5 ): self.agents = agents self.moderator = moderator self.max_rounds = max_rounds async def run(self, topic: str) -> str: discussion = Discussion(topic) for round in range(self.max_rounds): for agent in self.agents: contribution = await agent.contribute(discussion) discussion.add(agent.name, contribution) # Check for consensus if await self.moderator.has_consensus(discussion): break return await self.moderator.summarize(discussion) # Example: Code review system = CollaborativeDiscussion( agents=[ SecurityReviewer(), PerformanceReviewer(), MaintainabilityReviewer() ], moderator=ReviewModerator() )
Use when: Multiple perspectives improve output quality.
Communication Protocols
Structured Messages
class AgentMessage(BaseModel): sender: str recipient: str message_type: Literal["request", "response", "info", "error"] content: dict requires_response: bool deadline: Optional[datetime] priority: int = 5
Shared State
class SharedState: def __init__(self): self.state = {} self.locks = {} async def read(self, key: str) -> Any: return self.state.get(key) async def write(self, key: str, value: Any, agent_id: str): async with self.locks.setdefault(key, asyncio.Lock()): self.state[key] = { "value": value, "updated_by": agent_id, "updated_at": now() }
Error Handling in Multi-Agent Systems
class ResilientMultiAgent: async def run_with_fallback( self, task: str, primary_agent: Agent, fallback_agent: Agent ) -> str: try: result = await asyncio.wait_for( primary_agent.run(task), timeout=30.0 ) if self.validate_result(result): return result except Exception as e: self.log_error(primary_agent, e) # Fallback return await fallback_agent.run(task) async def run_with_retry( self, task: str, agent: Agent, max_retries: int = 3 ) -> str: last_error = None for attempt in range(max_retries): try: return await agent.run(task) except RecoverableError as e: last_error = e await asyncio.sleep(2 ** attempt) raise MaxRetriesExceeded(last_error)
Observability
class MultiAgentTracer: def trace_execution(self, system_id: str): return ExecutionTrace( system_id=system_id, spans=[] ) def start_agent_span( self, trace: ExecutionTrace, agent_id: str, task: str ) -> Span: span = Span( trace_id=trace.id, agent_id=agent_id, task=task, start_time=now() ) trace.spans.append(span) return span def end_agent_span( self, span: Span, result: str, tokens_used: int ): span.end_time = now() span.result = result span.tokens_used = tokens_used
Conclusion
Multi-agent architectures unlock capabilities single agents can't achieve. Choose your pattern based on task structure, start simple, and add complexity only when needed. And always—always—build in observability from day one.