graph LR
subgraph Traditional["Traditional API"]
A1["Request"] --> A2["Process<br/>~100ms"] --> A3["Response"]
end
subgraph Agent["Retrieval Agent"]
B1["Query"] --> B2["LLM Call 1<br/>~2s"]
B2 --> B3["Tool Call<br/>~1s"]
B3 --> B4["LLM Call 2<br/>~2s"]
B4 --> B5["Tool Call<br/>~3s"]
B5 --> B6["LLM Call 3<br/>~2s"]
B6 --> B7["Response"]
end
style Traditional fill:#F2F2F2,stroke:#D9D9D9
style Agent fill:#F2F2F2,stroke:#D9D9D9
style A3 fill:#27ae60,color:#fff,stroke:#333
style B7 fill:#27ae60,color:#fff,stroke:#333
Deploying Retrieval Agents in Production
Durable execution, crash recovery, scaling agent workers, streaming intermediate results, and cost monitoring
Keywords: durable execution, crash recovery, agent workers, streaming, cost monitoring, token budget, LangGraph Platform, Temporal, OpenTelemetry, production deployment, task queue, checkpointing, agent observability, scaling agents

Introduction
Building a retrieval agent that works in a notebook is one thing. Running it in production — where requests spike unpredictably, processes crash mid-execution, users expect real-time feedback, and every LLM call costs money — is an entirely different challenge. The gap between “demo agent” and “production agent” is not about smarter prompts or better tools. It is about infrastructure: durable execution, crash recovery, horizontal scaling, intermediate-result streaming, and cost controls.
A retrieval agent in production faces problems that never appear in development:
- A worker process crashes after the third tool call in a five-step plan — does the user get an error, or does the agent resume from where it left off?
- A hundred users submit deep-research queries simultaneously — does the system queue and distribute work, or does it fall over?
- A query takes 45 seconds across six LLM calls — does the user stare at a spinner, or see each reasoning step as it happens?
- A runaway agent loops 20 times on the same tool — who notices, and who stops it before the bill arrives?
This article covers the infrastructure patterns that answer these questions. We walk through durable execution with Temporal and LangGraph Platform, crash recovery via checkpointing and event sourcing, scaling with task queues and worker pools, streaming intermediate agent steps to users, and cost monitoring with OpenTelemetry and token budgets. Every pattern includes Python code you can adapt to your own stack.
Why Production Deployment Is Hard
Retrieval agents differ from traditional web services in ways that break standard deployment assumptions:
| Characteristic | Traditional API | Retrieval Agent |
|---|---|---|
| Latency | 50–200 ms | 10–120 seconds |
| State | Stateless (single request) | Stateful (multi-step conversation) |
| External calls | 0–2 per request | 3–20 per request (LLM + tools) |
| Cost per request | Sub-cent (compute only) | $0.01–$1.00+ (LLM tokens) |
| Failure modes | Timeout, 5xx | Mid-execution crash, tool error, token budget exhaustion, infinite loop |
| Output | Complete response | Incremental (thought → action → observation → …) |
These differences mean you cannot just wrap an agent in a Flask endpoint behind a load balancer. You need purpose-built infrastructure for long-running, stateful, multi-step workloads.
Durable Execution: The Core Abstraction
Durable execution ensures that a long-running process can survive infrastructure failures — process crashes, deployments, machine restarts — and resume from exactly where it left off. For retrieval agents, this means a five-step reasoning chain does not restart from scratch when the worker handling it goes down.
How Durable Execution Works
sequenceDiagram
participant U as User
participant S as Orchestrator
participant W1 as Worker A
participant W2 as Worker B
participant DB as State Store
U->>S: Submit query
S->>W1: Assign task
W1->>DB: Save state (step 1)
W1->>DB: Save state (step 2)
Note over W1: Worker crashes!
S-->>W2: Reassign task
W2->>DB: Load state (step 2)
W2->>DB: Save state (step 3)
W2->>U: Return final answer
The pattern has three components:
- Orchestrator — Assigns tasks to workers and detects failures via heartbeats or timeouts
- State store — Persists execution state (messages, tool results, intermediate outputs) after each step
- Workers — Stateless processes that pick up tasks, load state, execute the next step, and save results
If a worker crashes, the orchestrator reassigns the task to another worker, which loads the last saved state and continues.
The Checkpoint Pattern
The simplest durable execution pattern for agents is step-level checkpointing — save the full agent state after every LLM call or tool execution:
import json
import hashlib
from dataclasses import dataclass, field, asdict
from typing import Any
@dataclass
class AgentCheckpoint:
"""Serializable snapshot of agent execution state."""
run_id: str
step: int
messages: list[dict[str, str]]
tool_results: list[dict[str, Any]]
total_tokens: int = 0
total_cost_usd: float = 0.0
status: str = "running" # running | completed | failed
def serialize(self) -> str:
return json.dumps(asdict(self))
@classmethod
def deserialize(cls, data: str) -> "AgentCheckpoint":
return cls(**json.loads(data))
class CheckpointStore:
"""Persist and retrieve agent checkpoints."""
def __init__(self, backend: str = "redis"):
if backend == "redis":
import redis
self._client = redis.Redis(decode_responses=True)
self._backend = backend
def save(self, checkpoint: AgentCheckpoint) -> None:
key = f"agent:checkpoint:{checkpoint.run_id}"
self._client.set(key, checkpoint.serialize())
self._client.expire(key, 86400) # 24h TTL
def load(self, run_id: str) -> AgentCheckpoint | None:
key = f"agent:checkpoint:{run_id}"
data = self._client.get(key)
if data:
return AgentCheckpoint.deserialize(data)
return None
def delete(self, run_id: str) -> None:
self._client.delete(f"agent:checkpoint:{run_id}")Integrating Checkpoints into the Agent Loop
from openai import OpenAI
import uuid
client = OpenAI()
store = CheckpointStore()
def run_durable_agent(
query: str,
run_id: str | None = None,
model: str = "gpt-4o-mini",
max_steps: int = 10,
) -> str:
"""Agent loop with step-level checkpointing for crash recovery."""
# Resume from checkpoint or start fresh
if run_id:
checkpoint = store.load(run_id)
else:
run_id = str(uuid.uuid4())
checkpoint = None
if checkpoint and checkpoint.status == "completed":
return checkpoint.messages[-1]["content"]
if checkpoint is None:
checkpoint = AgentCheckpoint(
run_id=run_id,
step=0,
messages=[
{"role": "system", "content": "You are a helpful retrieval agent."},
{"role": "user", "content": query},
],
tool_results=[],
)
store.save(checkpoint)
for step in range(checkpoint.step, max_steps):
# Call LLM
response = client.chat.completions.create(
model=model,
messages=checkpoint.messages,
tools=TOOL_SCHEMAS,
tool_choice="auto",
temperature=0,
)
msg = response.choices[0].message
checkpoint.messages.append(msg.model_dump())
checkpoint.total_tokens += response.usage.total_tokens
checkpoint.step = step + 1
# No tool calls → final answer
if not msg.tool_calls:
checkpoint.status = "completed"
store.save(checkpoint)
return msg.content
# Execute tool calls
for tool_call in msg.tool_calls:
result = execute_tool(tool_call)
checkpoint.messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result),
})
checkpoint.tool_results.append({
"step": step,
"tool": tool_call.function.name,
"result": str(result)[:500],
})
# Checkpoint after every step
store.save(checkpoint)
checkpoint.status = "failed"
store.save(checkpoint)
return "Agent reached maximum steps."If the process crashes at any point, calling run_durable_agent(query, run_id=same_id) will load the last checkpoint and resume from the exact step where execution stopped — without re-executing completed LLM calls or tool invocations.
Crash Recovery Patterns
There are three main strategies for recovering from crashes during agent execution, each with different trade-offs:
Strategy 1: Checkpoint-Resume
Save state after every step. On crash, load the last checkpoint and continue.
# Already shown above — the simplest and most common pattern
# Pro: Simple, no replay
# Con: Requires serializable stateStrategy 2: Event Sourcing
Store every event (LLM call, tool result) as an immutable log. On recovery, replay the full event log to reconstruct state.
@dataclass
class AgentEvent:
run_id: str
sequence: int
event_type: str # "llm_call" | "tool_result" | "final_answer"
payload: dict
timestamp: float
class EventStore:
"""Append-only event log for agent execution."""
def __init__(self):
import redis
self._client = redis.Redis(decode_responses=True)
def append(self, event: AgentEvent) -> None:
key = f"agent:events:{event.run_id}"
self._client.rpush(key, json.dumps(asdict(event)))
def replay(self, run_id: str) -> list[AgentEvent]:
key = f"agent:events:{run_id}"
raw_events = self._client.lrange(key, 0, -1)
return [
AgentEvent(**json.loads(e)) for e in raw_events
]
def recover_from_events(run_id: str) -> AgentCheckpoint:
"""Rebuild agent state by replaying the event log."""
store = EventStore()
events = store.replay(run_id)
messages = [{"role": "system", "content": "You are a helpful retrieval agent."}]
total_tokens = 0
for event in events:
if event.event_type == "user_query":
messages.append({"role": "user", "content": event.payload["query"]})
elif event.event_type == "llm_call":
messages.append(event.payload["message"])
total_tokens += event.payload.get("tokens", 0)
elif event.event_type == "tool_result":
messages.append({
"role": "tool",
"tool_call_id": event.payload["tool_call_id"],
"content": event.payload["result"],
})
return AgentCheckpoint(
run_id=run_id,
step=len(events),
messages=messages,
tool_results=[],
total_tokens=total_tokens,
)Strategy 3: Idempotent Replay (Temporal-Style)
Temporal’s approach: the workflow function itself is deterministic. On crash, Temporal replays the workflow from the beginning, but cached results from previous activity executions are returned instantly instead of re-executing.
Comparison
| Strategy | Complexity | Recovery speed | Storage | Best for |
|---|---|---|---|---|
| Checkpoint-Resume | Low | Instant | Last state only | Simple agents, <10 steps |
| Event Sourcing | Medium | O(n) replay | Full history | Auditing, debugging |
| Idempotent Replay | High | O(n) replay (cached) | Event log + results | Long workflows, Temporal |
Scaling Agent Workers
A single agent worker can handle one request at a time (due to synchronous LLM calls). Scaling requires distributing work across a pool of workers via a task queue.
Architecture
graph TD
subgraph API["API Layer"]
A1["Load Balancer"]
end
subgraph Queue["Task Queue"]
Q1["Redis / RabbitMQ / SQS"]
end
subgraph Workers["Agent Worker Pool"]
W1["Worker 1"]
W2["Worker 2"]
W3["Worker 3"]
W4["Worker N"]
end
subgraph Storage["Shared State"]
S1["Postgres / Redis"]
end
A1 --> Q1
Q1 --> W1
Q1 --> W2
Q1 --> W3
Q1 --> W4
W1 --> S1
W2 --> S1
W3 --> S1
W4 --> S1
style API fill:#F2F2F2,stroke:#D9D9D9
style Queue fill:#F2F2F2,stroke:#D9D9D9
style Workers fill:#F2F2F2,stroke:#D9D9D9
style Storage fill:#F2F2F2,stroke:#D9D9D9
Task Queue with Celery
from celery import Celery
import json
app = Celery("agent_workers", broker="redis://localhost:6379/0")
app.conf.update(
task_serializer="json",
result_backend="redis://localhost:6379/1",
task_acks_late=True, # Re-deliver on crash
worker_prefetch_multiplier=1, # One task at a time
task_time_limit=120, # Hard timeout: 2 minutes
task_soft_time_limit=90, # Soft timeout: 1.5 minutes
)
@app.task(bind=True, max_retries=2, default_retry_delay=5)
def run_agent_task(self, query: str, run_id: str | None = None):
"""Execute an agent run as a distributed task."""
try:
result = run_durable_agent(query, run_id=run_id)
return {"status": "completed", "answer": result, "run_id": run_id}
except Exception as exc:
# Retry with the same run_id to resume from checkpoint
raise self.retry(exc=exc)Async Task Queue with asyncio
For higher concurrency without Celery overhead:
import asyncio
import redis.asyncio as aioredis
class AgentWorkerPool:
"""Lightweight async worker pool using Redis as a task queue."""
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
self.redis = aioredis.Redis()
self.queue_key = "agent:task_queue"
async def submit(self, query: str, run_id: str) -> None:
"""Submit a task to the queue."""
task = json.dumps({"query": query, "run_id": run_id})
await self.redis.rpush(self.queue_key, task)
async def worker(self, worker_id: int) -> None:
"""Worker loop: pull tasks, execute agents, save results."""
while True:
# Blocking pop with 5s timeout
item = await self.redis.blpop(self.queue_key, timeout=5)
if item is None:
continue
_, raw_task = item
task = json.loads(raw_task)
try:
result = run_durable_agent(
task["query"], run_id=task["run_id"]
)
await self.redis.set(
f"agent:result:{task['run_id']}",
json.dumps({"status": "completed", "answer": result}),
)
except Exception as e:
await self.redis.set(
f"agent:result:{task['run_id']}",
json.dumps({"status": "failed", "error": str(e)}),
)
async def start(self) -> None:
"""Start the worker pool."""
workers = [
asyncio.create_task(self.worker(i))
for i in range(self.num_workers)
]
await asyncio.gather(*workers)Scaling Heuristics
| Factor | Guideline |
|---|---|
| Workers per CPU | 2–4 (most time is spent waiting on LLM API I/O) |
| Queue depth alert | Trigger auto-scaling when queue depth > 2× worker count |
| Task timeout | 2× expected max agent runtime (e.g., 120s if max is 60s) |
| Retry limit | 2–3 retries with exponential backoff |
| Heartbeat interval | Every 10–30s for long-running tasks |
| Prefetch | 1 task per worker (agents are not parallelizable) |
Streaming Intermediate Results
Users should not wait 30 seconds staring at a loading spinner. Stream each reasoning step — thought, action, observation — as it happens.
Server-Sent Events (SSE) Pattern
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def stream_agent_steps(query: str, run_id: str):
"""Generator that yields agent steps as SSE events."""
checkpoint = AgentCheckpoint(
run_id=run_id, step=0,
messages=[
{"role": "system", "content": "You are a helpful retrieval agent."},
{"role": "user", "content": query},
],
tool_results=[],
)
for step in range(10):
# Send "thinking" event
yield f"data: {json.dumps({'type': 'thinking', 'step': step})}\n\n"
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=checkpoint.messages,
tools=TOOL_SCHEMAS,
tool_choice="auto",
temperature=0,
stream=True,
)
# Stream LLM tokens
full_content = ""
for chunk in response:
delta = chunk.choices[0].delta
if delta.content:
full_content += delta.content
yield f"data: {json.dumps({'type': 'token', 'content': delta.content})}\n\n"
if delta.tool_calls:
for tc in delta.tool_calls:
yield f"data: {json.dumps({'type': 'tool_call', 'tool': tc.function.name if tc.function else None})}\n\n"
# Check for final answer vs tool calls
if full_content and not chunk.choices[0].finish_reason == "tool_calls":
yield f"data: {json.dumps({'type': 'answer', 'content': full_content})}\n\n"
yield "data: [DONE]\n\n"
return
# Execute tools and stream observations
for tool_call in pending_tool_calls:
yield f"data: {json.dumps({'type': 'tool_start', 'tool': tool_call.function.name})}\n\n"
result = execute_tool(tool_call)
yield f"data: {json.dumps({'type': 'observation', 'tool': tool_call.function.name, 'result': str(result)[:500]})}\n\n"
checkpoint.messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result),
})
yield "data: [DONE]\n\n"
@app.get("/agent/stream")
async def agent_stream(query: str):
run_id = str(uuid.uuid4())
return StreamingResponse(
stream_agent_steps(query, run_id),
media_type="text/event-stream",
)LangGraph Streaming
LangGraph provides built-in streaming with event types for each graph node:
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
agent = create_react_agent(model=llm, tools=tools)
async def stream_langgraph_agent(query: str):
"""Stream LangGraph agent execution step by step."""
async for event in agent.astream_events(
{"messages": [{"role": "user", "content": query}]},
version="v2",
):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
yield {
"type": "token",
"content": content,
"node": event.get("metadata", {}).get("langgraph_node"),
}
elif kind == "on_tool_start":
yield {
"type": "tool_start",
"tool": event["name"],
"input": event["data"].get("input"),
}
elif kind == "on_tool_end":
yield {
"type": "tool_end",
"tool": event["name"],
"output": str(event["data"].output)[:500],
}Double-Texting: Handling User Interruptions
What happens when a user sends a new message while the agent is still processing the previous one? LangGraph Platform supports four strategies:
| Strategy | Behavior | Use case |
|---|---|---|
| Reject | Return error; ignore the new input | Strict sequential processing |
| Queue | Finish current run, then process new input | Ordered processing |
| Interrupt | Cancel current run, start new one from latest state | Conversational UX |
| Rollback | Cancel current run, revert to state before it started, process new input | Clean restart |
# LangGraph Platform handles double-texting via API configuration
# When self-hosting, implement Interrupt strategy:
class InterruptibleAgent:
"""Agent that supports cancellation for double-texting."""
def __init__(self):
self._cancel_event = asyncio.Event()
def cancel(self) -> None:
self._cancel_event.set()
async def run_step(self, checkpoint):
"""Run a single agent step, checking for cancellation."""
if self._cancel_event.is_set():
raise asyncio.CancelledError("Interrupted by new user message")
response = await aclient.chat.completions.create(
model="gpt-4o-mini",
messages=checkpoint.messages,
tools=TOOL_SCHEMAS,
tool_choice="auto",
)
return responseCost Monitoring and Token Budgeting
LLM API calls are the dominant cost in agent deployments. A single agent run can consume 10,000–100,000 tokens across multiple steps. Without monitoring, costs spiral out of control.
Token Budget Enforcement
from dataclasses import dataclass
@dataclass
class TokenBudget:
"""Per-run token budget with real-time tracking."""
max_input_tokens: int = 50_000
max_output_tokens: int = 10_000
max_total_cost_usd: float = 0.50
max_steps: int = 15
# Tracking
used_input_tokens: int = 0
used_output_tokens: int = 0
used_cost_usd: float = 0.0
steps_taken: int = 0
# Pricing (per 1M tokens)
INPUT_PRICE = {"gpt-4o-mini": 0.15, "gpt-4o": 2.50}
OUTPUT_PRICE = {"gpt-4o-mini": 0.60, "gpt-4o": 10.00}
def record_usage(self, model: str, input_tokens: int, output_tokens: int):
self.used_input_tokens += input_tokens
self.used_output_tokens += output_tokens
self.used_cost_usd += (
input_tokens * self.INPUT_PRICE.get(model, 1.0) / 1_000_000
+ output_tokens * self.OUTPUT_PRICE.get(model, 4.0) / 1_000_000
)
self.steps_taken += 1
def check(self) -> str | None:
"""Return a violation reason, or None if within budget."""
if self.used_input_tokens > self.max_input_tokens:
return f"Input token limit exceeded ({self.used_input_tokens}/{self.max_input_tokens})"
if self.used_output_tokens > self.max_output_tokens:
return f"Output token limit exceeded ({self.used_output_tokens}/{self.max_output_tokens})"
if self.used_cost_usd > self.max_total_cost_usd:
return f"Cost limit exceeded (${self.used_cost_usd:.4f}/${self.max_total_cost_usd})"
if self.steps_taken > self.max_steps:
return f"Step limit exceeded ({self.steps_taken}/{self.max_steps})"
return None
def run_budget_aware_agent(query: str, budget: TokenBudget) -> dict:
"""Agent loop with per-run token budget enforcement."""
messages = [
{"role": "system", "content": "You are a helpful retrieval agent."},
{"role": "user", "content": query},
]
model = "gpt-4o-mini"
for step in range(budget.max_steps):
response = client.chat.completions.create(
model=model,
messages=messages,
tools=TOOL_SCHEMAS,
tool_choice="auto",
temperature=0,
)
usage = response.usage
budget.record_usage(model, usage.prompt_tokens, usage.completion_tokens)
# Check budget after every LLM call
violation = budget.check()
if violation:
return {
"status": "budget_exceeded",
"reason": violation,
"partial_answer": messages[-1].get("content", ""),
"cost_usd": budget.used_cost_usd,
"total_tokens": budget.used_input_tokens + budget.used_output_tokens,
}
msg = response.choices[0].message
messages.append(msg.model_dump())
if not msg.tool_calls:
return {
"status": "completed",
"answer": msg.content,
"cost_usd": budget.used_cost_usd,
"total_tokens": budget.used_input_tokens + budget.used_output_tokens,
"steps": budget.steps_taken,
}
for tool_call in msg.tool_calls:
result = execute_tool(tool_call)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result),
})
return {"status": "max_steps", "cost_usd": budget.used_cost_usd}OpenTelemetry Instrumentation
OpenTelemetry’s GenAI semantic conventions define standard metrics for LLM operations. Key metrics for cost monitoring:
| Metric | Type | What it tracks |
|---|---|---|
gen_ai.client.token.usage |
Histogram | Input and output token counts per call |
gen_ai.client.operation.duration |
Histogram | Latency per LLM call in seconds |
gen_ai.server.time_to_first_token |
Histogram | Time to first streaming token |
gen_ai.server.time_per_output_token |
Histogram | Decode speed (tokens/sec) |
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
# Setup
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(OTLPSpanExporter())
)
reader = PeriodicExportingMetricReader(OTLPMetricExporter())
metrics.set_meter_provider(MeterProvider(metric_readers=[reader]))
tracer = trace.get_tracer("agent.service")
meter = metrics.get_meter("agent.service")
# Define metrics following OTel GenAI conventions
token_usage = meter.create_histogram(
"gen_ai.client.token.usage",
unit="{token}",
description="Number of input and output tokens used",
)
operation_duration = meter.create_histogram(
"gen_ai.client.operation.duration",
unit="s",
description="GenAI operation duration",
)
agent_run_cost = meter.create_histogram(
"agent.run.cost",
unit="USD",
description="Total cost per agent run",
)
agent_run_steps = meter.create_histogram(
"agent.run.steps",
unit="{step}",
description="Number of steps per agent run",
)
def instrumented_llm_call(messages, model, tools=None):
"""LLM call with OpenTelemetry tracing and metrics."""
import time
with tracer.start_as_current_span(
f"chat {model}",
attributes={
"gen_ai.operation.name": "chat",
"gen_ai.request.model": model,
"gen_ai.provider.name": "openai",
},
) as span:
start = time.perf_counter()
response = client.chat.completions.create(
model=model,
messages=messages,
tools=tools,
tool_choice="auto" if tools else None,
temperature=0,
)
duration = time.perf_counter() - start
# Record span attributes
usage = response.usage
span.set_attribute("gen_ai.response.model", response.model)
span.set_attribute("gen_ai.usage.input_tokens", usage.prompt_tokens)
span.set_attribute("gen_ai.usage.output_tokens", usage.completion_tokens)
# Record metrics
attrs = {
"gen_ai.operation.name": "chat",
"gen_ai.request.model": model,
"gen_ai.provider.name": "openai",
}
token_usage.record(
usage.prompt_tokens,
{**attrs, "gen_ai.token.type": "input"},
)
token_usage.record(
usage.completion_tokens,
{**attrs, "gen_ai.token.type": "output"},
)
operation_duration.record(duration, attrs)
return responseCost Dashboard Queries
With OpenTelemetry data flowing to your observability backend, build dashboards to answer:
# Prometheus/PromQL examples
# Total tokens consumed in the last hour
sum(increase(gen_ai_client_token_usage_sum[1h]))
# Average cost per agent run (using custom metric)
histogram_quantile(0.5, rate(agent_run_cost_bucket[1h]))
# P95 latency per LLM call
histogram_quantile(0.95, rate(gen_ai_client_operation_duration_bucket[1h]))
# Token usage by model
sum by (gen_ai_request_model) (
increase(gen_ai_client_token_usage_sum[1h])
)
# Runs exceeding budget (custom counter)
increase(agent_budget_violations_total[1h])
Deploying with LangGraph Platform
LangGraph Platform (formerly LangGraph Cloud) provides a managed infrastructure for deploying LangGraph agents at scale. It handles task queues, Postgres checkpointing, streaming, double-texting, and monitoring out of the box.
Architecture
graph TD
subgraph Platform["LangGraph Platform"]
API["HTTP/WS API"]
TQ["Task Queue"]
PG["Postgres Checkpointer"]
W["Auto-scaled Workers"]
LS["LangSmith Integration"]
end
U["Users"] --> API
API --> TQ
TQ --> W
W --> PG
W --> LS
style Platform fill:#F2F2F2,stroke:#D9D9D9
style U fill:#4a90d9,color:#fff,stroke:#333
style API fill:#9b59b6,color:#fff,stroke:#333
style TQ fill:#e67e22,color:#fff,stroke:#333
style PG fill:#27ae60,color:#fff,stroke:#333
style W fill:#1abc9c,color:#fff,stroke:#333
style LS fill:#f5a623,color:#fff,stroke:#333
Key Features
| Feature | Description |
|---|---|
| Durable execution | Postgres-backed checkpointer persists state after every node |
| Horizontal scaling | Auto-scaled task queues distribute runs across workers |
| Streaming | Built-in SSE streaming of tokens, tool calls, and node transitions |
| Double-texting | Four strategies: reject, queue, interrupt, rollback |
| Background jobs | Async runs with polling or webhook notification |
| Cron jobs | Scheduled agent runs (e.g., daily report generation) |
| LangSmith tracing | Automatic tracing, monitoring, and cost tracking |
Self-Hosted Deployment
For teams that cannot use a managed service, LangGraph Platform can be self-hosted:
# langgraph.json — deployment configuration
{
"graphs": {
"retrieval_agent": "./agent.py:graph"
},
"dependencies": ["langchain-openai", "langchain-community"],
"env": {
"OPENAI_API_KEY": "",
"LANGSMITH_API_KEY": ""
}
}# agent.py — the graph definition
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_openai import ChatOpenAI
class State(TypedDict):
messages: Annotated[list, add_messages]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_with_tools = llm.bind_tools(tools)
def agent(state: State) -> dict:
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: State) -> str:
last = state["messages"][-1]
if hasattr(last, "tool_calls") and last.tool_calls:
return "tools"
return END
builder = StateGraph(State)
builder.add_node("agent", agent)
builder.add_node("tools", ToolNode(tools))
builder.set_entry_point("agent")
builder.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})
builder.add_edge("tools", "agent")
# Compile with Postgres checkpointer for durable execution
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost:5432/langgraph"
)
graph = builder.compile(checkpointer=checkpointer)Deploying with Temporal
Temporal is a general-purpose durable execution platform. Unlike LangGraph Platform (which is agent-specific), Temporal handles any long-running workflow — making it ideal when agents are one component of a larger business process.
Modeling an Agent as a Temporal Workflow
from datetime import timedelta
from dataclasses import dataclass
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
@dataclass
class AgentInput:
query: str
model: str = "gpt-4o-mini"
max_steps: int = 10
@dataclass
class LLMCallInput:
messages: list
model: str
tools: list | None = None
@dataclass
class ToolCallInput:
tool_name: str
tool_args: dict
# Activities: non-deterministic operations (LLM calls, tool execution)
@activity.defn
async def call_llm(input: LLMCallInput) -> dict:
"""Call the LLM API. This is an Activity because it has side effects."""
response = client.chat.completions.create(
model=input.model,
messages=input.messages,
tools=input.tools,
tool_choice="auto" if input.tools else None,
temperature=0,
)
return {
"message": response.choices[0].message.model_dump(),
"usage": {
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens,
},
}
@activity.defn
async def execute_tool_activity(input: ToolCallInput) -> str:
"""Execute a tool. Activity because it may call external APIs."""
tool_fn = TOOLS.get(input.tool_name)
if not tool_fn:
return f"Unknown tool: {input.tool_name}"
return str(tool_fn(**input.tool_args))
# Workflow: the deterministic agent loop
@workflow.defn(name="RetrievalAgent")
class RetrievalAgentWorkflow:
@workflow.run
async def run(self, input: AgentInput) -> dict:
messages = [
{"role": "system", "content": "You are a helpful retrieval agent."},
{"role": "user", "content": input.query},
]
total_tokens = 0
for step in range(input.max_steps):
# Call LLM (as an Activity — survives crashes)
result = await workflow.execute_activity(
call_llm,
LLMCallInput(
messages=messages,
model=input.model,
tools=TOOL_SCHEMAS,
),
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3),
)
msg = result["message"]
messages.append(msg)
total_tokens += (
result["usage"]["input_tokens"]
+ result["usage"]["output_tokens"]
)
# No tool calls → done
tool_calls = msg.get("tool_calls", [])
if not tool_calls:
return {
"answer": msg.get("content", ""),
"total_tokens": total_tokens,
"steps": step + 1,
}
# Execute each tool call as an Activity
for tc in tool_calls:
tool_result = await workflow.execute_activity(
execute_tool_activity,
ToolCallInput(
tool_name=tc["function"]["name"],
tool_args=json.loads(tc["function"]["arguments"]),
),
start_to_close_timeout=timedelta(seconds=30),
)
messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": tool_result,
})
return {"answer": "Max steps reached", "total_tokens": total_tokens}
# Worker setup
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="agent-tasks",
workflows=[RetrievalAgentWorkflow],
activities=[call_llm, execute_tool_activity],
)
await worker.run()Temporal vs. LangGraph Platform
| Aspect | Temporal | LangGraph Platform |
|---|---|---|
| Purpose | General-purpose durable execution | Agent-specific deployment |
| Crash recovery | Automatic via event sourcing + replay | Postgres checkpointing |
| Streaming | Requires custom implementation (queries/signals) | Built-in SSE streaming |
| Scaling | Worker pools with task queues | Managed auto-scaling |
| Double-texting | Custom via signals/cancellation | Built-in (4 strategies) |
| Agent frameworks | Framework-agnostic | LangGraph-specific |
| Observability | Temporal UI + custom metrics | LangSmith integration |
| Self-hosting | Docker Compose or Kubernetes | Docker or managed cloud |
| Best for | Agents embedded in larger business workflows | Standalone agent deployments |
Production Deployment Checklist
Before deploying a retrieval agent to production, verify each item:
Infrastructure
Reliability
Cost Controls
Observability
User Experience
Conclusion
Deploying retrieval agents in production requires infrastructure that traditional web services do not need. The core challenges — long-running stateful execution, unpredictable costs, multi-step failure modes, and real-time user feedback — each require purpose-built solutions.
Key takeaways:
- Durable execution is non-negotiable — Agents take 10–120 seconds across multiple steps. Without checkpointing, any infrastructure failure restarts the entire run, wasting tokens and time. Use step-level checkpointing (simplest), event sourcing (for audit trails), or Temporal (for complex business workflows).
- Scale with task queues, not threads — Agent workers are I/O bound (waiting on LLM APIs). Distribute work via Redis, RabbitMQ, or managed task queues. Set prefetch to 1 (one task per worker) and auto-scale on queue depth.
- Stream everything — Users need real-time feedback during long runs. Stream thoughts, tool calls, and observations via Server-Sent Events. Handle double-texting with interrupt or queue strategies.
- Budget every run — Token costs are the primary operational expense. Enforce per-run limits on input tokens, output tokens, total cost, and step count. Use OpenTelemetry’s GenAI semantic conventions (
gen_ai.client.token.usage,gen_ai.client.operation.duration) for standard metrics across providers. - LangGraph Platform provides agent-specific infrastructure (checkpointing, streaming, double-texting, LangSmith tracing) as a managed service. Temporal provides general-purpose durable execution for agents that are part of larger business workflows.
- Observability is the foundation — You cannot control what you cannot see. Instrument every LLM call with distributed tracing, record token counts as metrics, set alerts on cost anomalies, and log every step with a
run_idfor end-to-end debugging.
Start with the simplest infrastructure that covers your scale — a single worker with Redis checkpointing, SSE streaming, and a token budget. Add task queues when you need concurrency, Temporal when you need business workflow orchestration, and managed platforms when you need operational simplicity.
References
- LangChain, “Announcing LangGraph v0.1 & LangGraph Cloud: Running Agents at Scale, Reliably,” blog.langchain.com, Jun. 2024. Available: https://blog.langchain.com/langgraph-cloud/
- Temporal Technologies, “Core Application — Python SDK,” docs.temporal.io, 2024. Available: https://docs.temporal.io/develop/python/core-application
- OpenTelemetry, “Semantic Conventions for Generative AI Systems,” opentelemetry.io, 2025. Available: https://opentelemetry.io/docs/specs/semconv/gen-ai/
- T. R. Sumers, S. Yao, K. Narasimhan, and T. L. Griffiths, “Cognitive Architectures for Language Agents,” TMLR, arXiv:2309.02427, 2024. Available: https://arxiv.org/abs/2309.02427
- Z. Xi et al., “The Rise and Potential of Large Language Model Based Agents: A Survey,” arXiv:2309.07864, 2023. Available: https://arxiv.org/abs/2309.07864
- LangChain, “LangSmith Observability,” docs.langchain.com, 2024. Available: https://docs.langchain.com/langsmith/observability
- S. Yao et al., “ReAct: Synergizing Reasoning and Acting in Language Models,” ICLR 2023, arXiv:2210.03629. Available: https://arxiv.org/abs/2210.03629
Read More
- Understand the ReAct agent loop that these deployment patterns protect — the Thought-Action-Observation cycle, stopping conditions, and error recovery.
- Wrap your tools for production with Tool Use and Function Calling — schema validation, error handling, and dynamic tool selection.
- Deploy LangGraph agents with checkpointers, streaming, and human-in-the-loop — the framework-level patterns that LangGraph Platform automates.
- Scale multi-agent orchestration to production — supervisor routing, parallel fan-out, and coordinator patterns across worker pools.
- Persist agent memory across sessions — checkpoint memory alongside execution state for durable long-running agents.
- Add checkpointing to planning and query decomposition — multi-step plans need step-level recovery to avoid re-executing completed sub-queries.
- Deploy deep research agents with budget controls — open-ended investigation requires strict token budgets and cost monitoring.
- Enforce guardrails and safety in production — rate limits, authorization gates, and input validation as production-critical safety layers.
- Monitor deployed agents with evaluation and debugging — trajectory scoring, LangSmith traces, and automated failure classification.
- Track production behavior with Observability for Multi-Turn LLM Conversations — the observability foundation that makes cost monitoring and debugging possible.
- Apply FinOps Best Practices for LLM Applications — organizational patterns for managing LLM spend at scale.