Multi-Agent RAG Orchestration Patterns

Supervisor, swarm, and hierarchical topologies for coordinating specialized retrieval agents

Open In Colab

📖 Read the full article


Table of Contents

  1. Setup
  2. Why Multi-Agent?
  3. Supervisor Pattern
  4. Swarm Pattern (Handoffs)
  5. Pattern Comparison
!pip install -q langchain-openai langgraph langchain-core
import os
# os.environ["OPENAI_API_KEY"] = "your-key"

2. Why Multi-Agent?

Problem Single Agent Multi-Agent
Tool selection Accuracy drops with N tools 2-4 focused tools per agent
Prompt bloat One prompt covers all domains Domain-specific prompts
Error isolation One failure derails chain Scoped to one agent
Debugging Opaque Each agent independently inspectable

3. Supervisor Pattern

A central supervisor agent routes tasks to specialized workers.

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END, START
from langgraph.graph.message import add_messages
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
import math

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)


# --- Worker tools ---
@tool
def search_vector_store(query: str) -> str:
    """Search documentation vector store for relevant passages."""
    return f"Retrieved: LangGraph uses state machines... Checkpointers save state... Query: {query}"

@tool
def search_web(query: str) -> str:
    """Search the web for recent information."""
    return f"Web results for: {query} — Latest release notes and benchmarks..."

@tool
def query_database(sql_description: str) -> str:
    """Query analytics database with natural language."""
    return f"DB: {sql_description} → 1,234 active users, 89% retention"

@tool
def calc(expression: str) -> str:
    """Evaluate a math expression."""
    try:
        return str(eval(expression, {"__builtins__": {}}, {"sqrt": math.sqrt, "abs": abs}))
    except Exception as e:
        return f"Error: {e}"


# --- Create worker agents ---
retrieval_agent = create_react_agent(
    model=llm, tools=[search_vector_store, search_web],
    prompt="You are a retrieval specialist. Search and cite sources.",
)
analysis_agent = create_react_agent(
    model=llm, tools=[query_database, calc],
    prompt="You are a data analyst. Query databases and calculate.",
)

WORKERS = {"retrieval": retrieval_agent, "analysis": analysis_agent}
print("Workers created: retrieval, analysis")
# --- Supervisor Graph ---
class SupervisorState(TypedDict):
    messages: Annotated[list, add_messages]
    next_agent: str
    agent_outputs: dict


def supervisor_node(state: SupervisorState) -> dict:
    agent_outputs = state.get("agent_outputs", {})
    context = "\n".join(f"[{k}]: {v}" for k, v in agent_outputs.items()) or "No outputs yet."

    routing_prompt = f"""You are a supervisor coordinating agents.
Available: retrieval (searches docs/web), analysis (queries DB/calculates)
Agent outputs so far: {context}
Decide which agent to call next. If done, say FINISH.
Reply with one word: retrieval, analysis, or FINISH."""

    response = llm.invoke([{"role": "system", "content": routing_prompt}, *state["messages"]])
    next_agent = response.content.strip().lower()
    return {"next_agent": next_agent if next_agent in WORKERS else "FINISH"}


def worker_node(agent_name: str):
    def node(state: SupervisorState) -> dict:
        user_msg = next((m.content for m in state["messages"] if isinstance(m, HumanMessage)), "")
        context = state.get("agent_outputs", {})
        context_str = "\n".join(f"[{k}]: {v}" for k, v in context.items())
        full_query = f"{user_msg}\nContext: {context_str}" if context_str else user_msg

        result = WORKERS[agent_name].invoke({"messages": [{"role": "user", "content": full_query}]})
        final_msg = result["messages"][-1].content
        return {"agent_outputs": {**context, agent_name: final_msg}}
    return node


def route_after_supervisor(state: SupervisorState) -> str:
    next_agent = state.get("next_agent", "FINISH")
    return "synthesize" if next_agent == "FINISH" else next_agent


def synthesize_node(state: SupervisorState) -> dict:
    outputs = state.get("agent_outputs", {})
    context = "\n\n".join(f"**{k}**:\n{v}" for k, v in outputs.items())
    response = llm.invoke([
        {"role": "system", "content": "Synthesize agent outputs into a clear answer."},
        {"role": "user", "content": f"Question: {state['messages'][0].content}\n\nOutputs:\n{context}"},
    ])
    return {"messages": [{"role": "assistant", "content": response.content}]}


# Build graph
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("retrieval", worker_node("retrieval"))
graph.add_node("analysis", worker_node("analysis"))
graph.add_node("synthesize", synthesize_node)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_after_supervisor, {
    "retrieval": "retrieval", "analysis": "analysis", "synthesize": "synthesize",
})
graph.add_edge("retrieval", "supervisor")
graph.add_edge("analysis", "supervisor")
graph.add_edge("synthesize", END)

supervisor_app = graph.compile()
print("✅ Supervisor graph compiled")
result = supervisor_app.invoke({
    "messages": [{"role": "user", "content": "How many active users do we have and what do the docs say about growth targets?"}],
    "next_agent": "", "agent_outputs": {},
})

print(result["messages"][-1].content)

4. Swarm Pattern (Handoffs)

No central coordinator — each agent can hand off to any other agent dynamically.

from langgraph.types import Command


class SwarmState(TypedDict):
    messages: Annotated[list, add_messages]
    current_agent: str


def triage_node(state: SwarmState) -> Command:
    """Classify and route to the right agent."""
    response = llm.invoke([
        {"role": "system", "content": "Classify: 'retrieval' for info lookup, 'analysis' for data, 'chat' for simple."},
        *state["messages"],
    ])
    target = response.content.strip().lower()
    if target not in ("retrieval", "analysis", "chat"):
        target = "chat"
    return Command(goto=target, update={"current_agent": target})


def retrieval_node(state: SwarmState) -> Command:
    response = llm.invoke([
        {"role": "system", "content": "You are a retrieval specialist. Answer with available info."},
        *state["messages"],
    ])
    return Command(goto=END, update={"messages": [response]})


def analysis_node(state: SwarmState) -> Command:
    response = llm.invoke([
        {"role": "system", "content": "You are a data analyst. Provide calculations and interpretations."},
        *state["messages"],
    ])
    return Command(goto=END, update={"messages": [response]})


def chat_node(state: SwarmState) -> Command:
    response = llm.invoke([
        {"role": "system", "content": "You are a helpful assistant."},
        *state["messages"],
    ])
    return Command(goto=END, update={"messages": [response]})


swarm_graph = StateGraph(SwarmState)
swarm_graph.add_node("triage", triage_node)
swarm_graph.add_node("retrieval", retrieval_node)
swarm_graph.add_node("analysis", analysis_node)
swarm_graph.add_node("chat", chat_node)
swarm_graph.add_edge(START, "triage")

swarm_app = swarm_graph.compile()

result = swarm_app.invoke({
    "messages": [{"role": "user", "content": "What's the weather like?"}],
    "current_agent": "",
})
print(f"Routed to: {result['current_agent']}")
print(result["messages"][-1].content[:300])

5. Pattern Comparison

Topology Control Flow Best For
Supervisor Central coordinator routes Structured multi-step workflows
Swarm Peer-to-peer handoffs Dynamic routing, customer service
Hierarchical Tree of supervisors Large-scale domain-separated teams