!pip install -q langchain langchain-openai langgraph langsmithObservability for Multi-Turn LLM Conversations with LangSmith
A practical guide to tracing, monitoring, and debugging multi-turn LLM conversations
Article: Observability for Multi-Turn LLM Conversations
Table of Contents
1. Setup & Installation
2. Environment Configuration
Configure LangSmith tracing and verify the connection.
import os
# Enable LangSmith tracing
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGSMITH_PROJECT"] = "multi-turn-observability"
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
# Verify connection
from langsmith import Client
client = Client()
print(f"LangSmith connected. Projects available: {len(list(client.list_projects()))}")3. Define Tools
Create tools that the agent can use during conversations.
from langchain_core.tools import tool
@tool
def search_orders(customer_id: str) -> str:
"""Search for a customer's recent orders by customer ID."""
# Simulated order data
orders = {
"C001": [
{"order_id": "ORD-1234", "product": "Wireless Headphones", "status": "Delivered", "total": "$79.99"},
{"order_id": "ORD-1235", "product": "USB-C Cable", "status": "Shipped", "total": "$12.99"}
],
"C002": [
{"order_id": "ORD-1300", "product": "Laptop Stand", "status": "Processing", "total": "$45.00"}
]
}
result = orders.get(customer_id, [])
return str(result) if result else f"No orders found for customer {customer_id}"
@tool
def cancel_order(order_id: str) -> str:
"""Cancel an order by order ID. Returns confirmation or error."""
# Simulated cancellation logic
cancellable = ["ORD-1235", "ORD-1300"]
if order_id in cancellable:
return f"Order {order_id} has been successfully cancelled. Refund will be processed in 3-5 business days."
return f"Order {order_id} cannot be cancelled (already delivered or not found)."
@tool
def get_product_info(product_name: str) -> str:
"""Get detailed information about a product."""
products = {
"Wireless Headphones": {
"price": "$79.99",
"warranty": "1 year",
"features": ["Noise cancelling", "30hr battery", "Bluetooth 5.3"]
},
"USB-C Cable": {
"price": "$12.99",
"warranty": "6 months",
"features": ["100W charging", "10Gbps data", "2m length"]
},
"Laptop Stand": {
"price": "$45.00",
"warranty": "2 years",
"features": ["Adjustable height", "Aluminum", "Foldable"]
}
}
info = products.get(product_name)
return str(info) if info else f"Product '{product_name}' not found."
tools = [search_orders, cancel_order, get_product_info]
print(f"Defined {len(tools)} tools: {[t.name for t in tools]}")4. Build LangGraph Agent
Build a stateful agent using LangGraph with tool-calling capabilities.
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
# Initialize the LLM with tool binding
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_with_tools = llm.bind_tools(tools)
# Define the agent logic
def should_continue(state: MessagesState) -> str:
"""Determine if the agent should call tools or finish."""
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END
def call_model(state: MessagesState) -> dict:
"""Call the LLM with the current message history."""
messages = state["messages"]
system_message = {
"role": "system",
"content": (
"You are a helpful customer support agent. "
"Use the available tools to look up orders, cancel orders, "
"and get product information. Be concise and helpful."
)
}
response = llm_with_tools.invoke([system_message] + messages)
return {"messages": [response]}
# Build the graph
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", ToolNode(tools))
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, ["tools", END])
workflow.add_edge("tools", "agent")
# Compile with memory for multi-turn conversations
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
print("Agent compiled successfully with memory checkpointer.")5. Run with Tracing
Run the agent and observe traces in LangSmith.
import uuid
# Create a thread ID for conversation tracking
thread_id = str(uuid.uuid4())
config = {
"configurable": {"thread_id": thread_id},
"metadata": {"session_type": "demo", "user_id": "demo-user"}
}
# Run a single query
result = app.invoke(
{"messages": [HumanMessage(content="Can you look up orders for customer C001?")]},
config=config
)
# Print the final response
print(f"Thread ID: {thread_id}")
print(f"Response: {result['messages'][-1].content}")6. Multi-Turn Conversation Threading
Simulate a multi-turn conversation with shared thread context.
# Start a new conversation thread
thread_id = str(uuid.uuid4())
config = {
"configurable": {"thread_id": thread_id},
"metadata": {
"session_type": "multi-turn-demo",
"user_id": "customer-123",
"channel": "web"
}
}
# Turn 1: Ask about orders
print("=== Turn 1 ===")
result1 = app.invoke(
{"messages": [HumanMessage(content="Hi, I'm customer C001. Can you show me my orders?")]},
config=config
)
print(f"Agent: {result1['messages'][-1].content}\n")
# Turn 2: Ask about a specific product
print("=== Turn 2 ===")
result2 = app.invoke(
{"messages": [HumanMessage(content="Tell me more about the Wireless Headphones.")]},
config=config
)
print(f"Agent: {result2['messages'][-1].content}\n")
# Turn 3: Cancel an order
print("=== Turn 3 ===")
result3 = app.invoke(
{"messages": [HumanMessage(content="Can you cancel order ORD-1235?")]},
config=config
)
print(f"Agent: {result3['messages'][-1].content}")
print(f"\nAll 3 turns traced under thread: {thread_id}")7. Token Usage & Cost Tracking
Aggregate token usage and estimated costs from LangSmith traces.
from datetime import datetime, timedelta
# Query runs from the project
project_name = "multi-turn-observability"
runs = list(client.list_runs(
project_name=project_name,
filter='eq(is_root, true)',
start_time=datetime.now() - timedelta(hours=1)
))
# Aggregate token usage and costs
total_prompt_tokens = 0
total_completion_tokens = 0
total_cost = 0.0
for run in runs:
if run.total_tokens:
total_prompt_tokens += run.prompt_tokens or 0
total_completion_tokens += run.completion_tokens or 0
total_cost += run.total_cost or 0.0
print(f"Runs analyzed: {len(runs)}")
print(f"Total prompt tokens: {total_prompt_tokens:,}")
print(f"Total completion tokens: {total_completion_tokens:,}")
print(f"Total tokens: {total_prompt_tokens + total_completion_tokens:,}")
print(f"Estimated total cost: ${total_cost:.4f}")
if runs:
avg_tokens = (total_prompt_tokens + total_completion_tokens) / len(runs)
avg_cost = total_cost / len(runs)
print(f"\nAvg tokens per conversation: {avg_tokens:,.0f}")
print(f"Avg cost per conversation: ${avg_cost:.4f}")8. Latency Tracking
Identify slow runs and analyze latency patterns.
# Find slow runs (latency > 5 seconds)
slow_runs = list(client.list_runs(
project_name=project_name,
filter='and(eq(is_root, true), gt(latency, "5s"))',
start_time=datetime.now() - timedelta(hours=1)
))
print(f"Slow runs (>5s): {len(slow_runs)}")
for run in slow_runs:
latency = (run.end_time - run.start_time).total_seconds() if run.end_time else None
print(f" - Run ID: {run.id}")
print(f" Latency: {latency:.2f}s" if latency else " Latency: N/A")
print(f" Status: {run.status}")
print(f" Tokens: {run.total_tokens}")
print()
# Overall latency statistics
all_runs = list(client.list_runs(
project_name=project_name,
filter='eq(is_root, true)',
start_time=datetime.now() - timedelta(hours=1)
))
latencies = []
for run in all_runs:
if run.end_time and run.start_time:
latencies.append((run.end_time - run.start_time).total_seconds())
if latencies:
import statistics
print(f"\nLatency Statistics:")
print(f" Mean: {statistics.mean(latencies):.2f}s")
print(f" Median: {statistics.median(latencies):.2f}s")
print(f" P95: {sorted(latencies)[int(len(latencies)*0.95)]:.2f}s")
print(f" Max: {max(latencies):.2f}s")9. Custom Tracing with @traceable
Use the @traceable decorator to add custom tracing to any function.
from langsmith import traceable
from langchain_openai import ChatOpenAI
@traceable(name="preprocess_input")
def preprocess_input(user_input: str) -> str:
"""Clean and preprocess user input."""
cleaned = user_input.strip().lower()
return cleaned
@traceable(name="generate_response")
def generate_response(processed_input: str) -> str:
"""Generate a response using the LLM."""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
response = llm.invoke([
{"role": "system", "content": "You are a helpful assistant. Be concise."},
{"role": "user", "content": processed_input}
])
return response.content
@traceable(name="postprocess_output")
def postprocess_output(response: str) -> str:
"""Post-process the LLM response."""
# Add disclaimer if needed
if any(word in response.lower() for word in ["invest", "medical", "legal"]):
response += "\n\n*Disclaimer: This is not professional advice.*"
return response
@traceable(name="full_pipeline")
def full_pipeline(user_input: str) -> str:
"""Complete pipeline with custom tracing at each step."""
processed = preprocess_input(user_input)
response = generate_response(processed)
final = postprocess_output(response)
return final
# Run the pipeline
result = full_pipeline("What are the benefits of index fund investing?")
print(f"Result:\n{result}")
print("\n--- Check LangSmith dashboard for the full trace hierarchy ---")