!pip install -q langchain-openai langgraph langchain-coreMulti-Agent RAG Orchestration Patterns
Supervisor, swarm, and hierarchical topologies for coordinating specialized retrieval agents
Table of Contents
import os
# os.environ["OPENAI_API_KEY"] = "your-key"2. Why Multi-Agent?
| Problem | Single Agent | Multi-Agent |
|---|---|---|
| Tool selection | Accuracy drops with N tools | 2-4 focused tools per agent |
| Prompt bloat | One prompt covers all domains | Domain-specific prompts |
| Error isolation | One failure derails chain | Scoped to one agent |
| Debugging | Opaque | Each agent independently inspectable |
3. Supervisor Pattern
A central supervisor agent routes tasks to specialized workers.
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END, START
from langgraph.graph.message import add_messages
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
import math
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# --- Worker tools ---
@tool
def search_vector_store(query: str) -> str:
"""Search documentation vector store for relevant passages."""
return f"Retrieved: LangGraph uses state machines... Checkpointers save state... Query: {query}"
@tool
def search_web(query: str) -> str:
"""Search the web for recent information."""
return f"Web results for: {query} — Latest release notes and benchmarks..."
@tool
def query_database(sql_description: str) -> str:
"""Query analytics database with natural language."""
return f"DB: {sql_description} → 1,234 active users, 89% retention"
@tool
def calc(expression: str) -> str:
"""Evaluate a math expression."""
try:
return str(eval(expression, {"__builtins__": {}}, {"sqrt": math.sqrt, "abs": abs}))
except Exception as e:
return f"Error: {e}"
# --- Create worker agents ---
retrieval_agent = create_react_agent(
model=llm, tools=[search_vector_store, search_web],
prompt="You are a retrieval specialist. Search and cite sources.",
)
analysis_agent = create_react_agent(
model=llm, tools=[query_database, calc],
prompt="You are a data analyst. Query databases and calculate.",
)
WORKERS = {"retrieval": retrieval_agent, "analysis": analysis_agent}
print("Workers created: retrieval, analysis")# --- Supervisor Graph ---
class SupervisorState(TypedDict):
messages: Annotated[list, add_messages]
next_agent: str
agent_outputs: dict
def supervisor_node(state: SupervisorState) -> dict:
agent_outputs = state.get("agent_outputs", {})
context = "\n".join(f"[{k}]: {v}" for k, v in agent_outputs.items()) or "No outputs yet."
routing_prompt = f"""You are a supervisor coordinating agents.
Available: retrieval (searches docs/web), analysis (queries DB/calculates)
Agent outputs so far: {context}
Decide which agent to call next. If done, say FINISH.
Reply with one word: retrieval, analysis, or FINISH."""
response = llm.invoke([{"role": "system", "content": routing_prompt}, *state["messages"]])
next_agent = response.content.strip().lower()
return {"next_agent": next_agent if next_agent in WORKERS else "FINISH"}
def worker_node(agent_name: str):
def node(state: SupervisorState) -> dict:
user_msg = next((m.content for m in state["messages"] if isinstance(m, HumanMessage)), "")
context = state.get("agent_outputs", {})
context_str = "\n".join(f"[{k}]: {v}" for k, v in context.items())
full_query = f"{user_msg}\nContext: {context_str}" if context_str else user_msg
result = WORKERS[agent_name].invoke({"messages": [{"role": "user", "content": full_query}]})
final_msg = result["messages"][-1].content
return {"agent_outputs": {**context, agent_name: final_msg}}
return node
def route_after_supervisor(state: SupervisorState) -> str:
next_agent = state.get("next_agent", "FINISH")
return "synthesize" if next_agent == "FINISH" else next_agent
def synthesize_node(state: SupervisorState) -> dict:
outputs = state.get("agent_outputs", {})
context = "\n\n".join(f"**{k}**:\n{v}" for k, v in outputs.items())
response = llm.invoke([
{"role": "system", "content": "Synthesize agent outputs into a clear answer."},
{"role": "user", "content": f"Question: {state['messages'][0].content}\n\nOutputs:\n{context}"},
])
return {"messages": [{"role": "assistant", "content": response.content}]}
# Build graph
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("retrieval", worker_node("retrieval"))
graph.add_node("analysis", worker_node("analysis"))
graph.add_node("synthesize", synthesize_node)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_after_supervisor, {
"retrieval": "retrieval", "analysis": "analysis", "synthesize": "synthesize",
})
graph.add_edge("retrieval", "supervisor")
graph.add_edge("analysis", "supervisor")
graph.add_edge("synthesize", END)
supervisor_app = graph.compile()
print("✅ Supervisor graph compiled")result = supervisor_app.invoke({
"messages": [{"role": "user", "content": "How many active users do we have and what do the docs say about growth targets?"}],
"next_agent": "", "agent_outputs": {},
})
print(result["messages"][-1].content)4. Swarm Pattern (Handoffs)
No central coordinator — each agent can hand off to any other agent dynamically.
from langgraph.types import Command
class SwarmState(TypedDict):
messages: Annotated[list, add_messages]
current_agent: str
def triage_node(state: SwarmState) -> Command:
"""Classify and route to the right agent."""
response = llm.invoke([
{"role": "system", "content": "Classify: 'retrieval' for info lookup, 'analysis' for data, 'chat' for simple."},
*state["messages"],
])
target = response.content.strip().lower()
if target not in ("retrieval", "analysis", "chat"):
target = "chat"
return Command(goto=target, update={"current_agent": target})
def retrieval_node(state: SwarmState) -> Command:
response = llm.invoke([
{"role": "system", "content": "You are a retrieval specialist. Answer with available info."},
*state["messages"],
])
return Command(goto=END, update={"messages": [response]})
def analysis_node(state: SwarmState) -> Command:
response = llm.invoke([
{"role": "system", "content": "You are a data analyst. Provide calculations and interpretations."},
*state["messages"],
])
return Command(goto=END, update={"messages": [response]})
def chat_node(state: SwarmState) -> Command:
response = llm.invoke([
{"role": "system", "content": "You are a helpful assistant."},
*state["messages"],
])
return Command(goto=END, update={"messages": [response]})
swarm_graph = StateGraph(SwarmState)
swarm_graph.add_node("triage", triage_node)
swarm_graph.add_node("retrieval", retrieval_node)
swarm_graph.add_node("analysis", analysis_node)
swarm_graph.add_node("chat", chat_node)
swarm_graph.add_edge(START, "triage")
swarm_app = swarm_graph.compile()
result = swarm_app.invoke({
"messages": [{"role": "user", "content": "What's the weather like?"}],
"current_agent": "",
})
print(f"Routed to: {result['current_agent']}")
print(result["messages"][-1].content[:300])5. Pattern Comparison
| Topology | Control Flow | Best For |
|---|---|---|
| Supervisor | Central coordinator routes | Structured multi-step workflows |
| Swarm | Peer-to-peer handoffs | Dynamic routing, customer service |
| Hierarchical | Tree of supervisors | Large-scale domain-separated teams |