!pip install -q langchain langchain-openai langchain-community faiss-cpu numpy rank-bm25RAG in Production: Scaling, Caching, and Observability
Vector database selection, semantic caching, query routing, hybrid search, monitoring, and cost optimization for production RAG systems
Table of Contents
1. Setup & Installation
import os
# os.environ["OPENAI_API_KEY"] = "your-api-key-here" # Uncomment and set2. Vector Database Selection
Choosing the right vector database depends on scale, latency, hosting preference, and feature requirements.
vector_db_comparison = {
"Pinecone": {
"Type": "Fully managed SaaS",
"Scale": "Billions of vectors",
"Hybrid Search": "Yes (sparse+dense)",
"Best For": "Teams wanting zero-ops, quick start",
},
"Qdrant": {
"Type": "Self-hosted or Cloud",
"Scale": "Billions (disk-backed)",
"Hybrid Search": "Yes (payload filtering)",
"Best For": "Flexible filtering, Rust performance",
},
"Weaviate": {
"Type": "Self-hosted or Cloud",
"Scale": "Hundreds of millions",
"Hybrid Search": "Yes (BM25+vector)",
"Best For": "Built-in ML modules, GraphQL API",
},
"pgvector": {
"Type": "PostgreSQL extension",
"Scale": "Millions",
"Hybrid Search": "Via SQL joins",
"Best For": "Existing Postgres infra, ACID compliance",
},
"FAISS": {
"Type": "In-memory library",
"Scale": "Billions (single machine)",
"Hybrid Search": "Manual implementation",
"Best For": "Prototyping, research, batch processing",
},
"ChromaDB": {
"Type": "Embedded / client-server",
"Scale": "Millions",
"Hybrid Search": "Metadata filtering",
"Best For": "Local dev, simple use cases",
},
}
print("Vector Database Comparison:")
print("=" * 70)
for db, details in vector_db_comparison.items():
print(f"\n{db}:")
for key, value in details.items():
print(f" {key}: {value}")# Example: FAISS with production-style config
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
docs = [
Document(page_content="Kubernetes orchestrates containerized workloads.", metadata={"category": "infra"}),
Document(page_content="Prometheus monitors system metrics and alerts.", metadata={"category": "monitoring"}),
Document(page_content="Redis provides in-memory caching for low latency.", metadata={"category": "caching"}),
Document(page_content="PostgreSQL is a relational database with ACID compliance.", metadata={"category": "database"}),
Document(page_content="Nginx serves as a reverse proxy and load balancer.", metadata={"category": "infra"}),
]
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = FAISS.from_documents(docs, embeddings)
# Retrieve with metadata
results = vectorstore.similarity_search_with_score("caching solutions", k=3)
for doc, score in results:
print(f" [{doc.metadata['category']}] {doc.page_content} (score: {score:.4f})")3. Semantic Caching
Cache responses for semantically similar queries to avoid redundant LLM calls.
import numpy as np
import time
class SemanticCache:
def __init__(self, embeddings, threshold=0.92):
self.embeddings = embeddings
self.threshold = threshold
self.cache = [] # List of (query_embedding, query_text, response)
self.stats = {"hits": 0, "misses": 0}
def _cosine_similarity(self, a, b):
a, b = np.array(a), np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def get(self, query: str):
query_embedding = self.embeddings.embed_query(query)
best_score = 0.0
best_response = None
for cached_emb, cached_query, cached_response in self.cache:
score = self._cosine_similarity(query_embedding, cached_emb)
if score > best_score:
best_score = score
best_response = (cached_query, cached_response)
if best_score >= self.threshold and best_response:
self.stats["hits"] += 1
return best_response[1], best_score, best_response[0]
self.stats["misses"] += 1
return None, best_score, None
def put(self, query: str, response: str):
query_embedding = self.embeddings.embed_query(query)
self.cache.append((query_embedding, query, response))
# Demo semantic caching
cache = SemanticCache(embeddings, threshold=0.90)
# First query (cache miss)
cache.put("How does Kubernetes work?", "Kubernetes orchestrates containers across clusters...")
cache.put("What is Redis used for?", "Redis is an in-memory data store used for caching...")
# Similar query (should be cache hit)
queries_to_test = [
"How does K8s work?",
"What's Redis for?",
"What is machine learning?",
]
for q in queries_to_test:
response, score, matched = cache.get(q)
status = "HIT" if response else "MISS"
print(f"[{status}] '{q}' (similarity: {score:.4f})")
if matched:
print(f" Matched: '{matched}'")
print(f"\nCache stats: {cache.stats}")4. Query Routing
Route queries to the most efficient data source or strategy.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from typing import Literal
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
class QueryRoute(BaseModel):
"""Route query to the best backend."""
target: Literal["vector_store", "sql_database", "api", "llm_only"] = Field(
description="Best backend for this query"
)
reasoning: str = Field(description="Brief reasoning")
router = llm.with_structured_output(QueryRoute)
route_prompt = ChatPromptTemplate.from_template(
"Route this query to the best backend:\n"
"- vector_store: Semantic text search (documentation, articles)\n"
"- sql_database: Structured data queries (counts, aggregations, filtering)\n"
"- api: Real-time data (stock prices, weather, live status)\n"
"- llm_only: General knowledge, no external data needed\n\n"
"Query: {query}"
)
route_chain = route_prompt | router
test_queries = [
"What is the SLA for our API?",
"How many customers signed up last month?",
"What's the current price of GOOGL?",
"Explain the CAP theorem",
]
print("Query Routing Results:")
print("=" * 60)
for q in test_queries:
result = route_chain.invoke({"query": q})
print(f"\nQ: {q}")
print(f" Target: {result.target}")
print(f" Reason: {result.reasoning}")5. Hybrid Search Tuning
Combine BM25 (keyword) and vector (semantic) search using Reciprocal Rank Fusion (RRF).
from rank_bm25 import BM25Okapi
# Simple hybrid search implementation
texts = [
"Kubernetes manages container orchestration and scaling",
"Docker containers package applications with dependencies",
"Prometheus alerting monitors service health metrics",
"Redis cache improves response latency significantly",
"PostgreSQL handles ACID-compliant relational data",
]
# BM25 retriever
tokenized = [text.lower().split() for text in texts]
bm25 = BM25Okapi(tokenized)
# Vector retriever (reuse FAISS from above)
faiss_docs = [Document(page_content=t) for t in texts]
vs = FAISS.from_documents(faiss_docs, embeddings)
def reciprocal_rank_fusion(results_list, k=60):
"""Merge multiple ranked lists using RRF."""
scores = {}
for results in results_list:
for rank, (doc_text, _) in enumerate(results):
scores[doc_text] = scores.get(doc_text, 0) + 1.0 / (rank + k)
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
def hybrid_search(query, alpha=0.5, k=3):
# BM25 results
bm25_scores = bm25.get_scores(query.lower().split())
bm25_ranked = sorted(zip(texts, bm25_scores), key=lambda x: x[1], reverse=True)[:k]
# Vector results
vector_results = vs.similarity_search_with_score(query, k=k)
vector_ranked = [(doc.page_content, score) for doc, score in vector_results]
# Fuse with RRF
fused = reciprocal_rank_fusion([bm25_ranked, vector_ranked])
return fused[:k], bm25_ranked, vector_ranked
query = "container orchestration scaling"
fused, bm25_res, vector_res = hybrid_search(query)
print(f"Query: '{query}'")
print(f"\nBM25 results:")
for text, score in bm25_res:
print(f" {score:.4f} | {text}")
print(f"\nVector results:")
for text, score in vector_res:
print(f" {score:.4f} | {text}")
print(f"\nRRF Fused results:")
for text, score in fused:
print(f" {score:.6f} | {text}")6. Observability & Monitoring
Track retrieval quality, latency, and LLM cost in production.
import time
from dataclasses import dataclass, field
@dataclass
class RAGMetrics:
query_count: int = 0
total_latency_ms: float = 0.0
retrieval_latencies: list = field(default_factory=list)
generation_latencies: list = field(default_factory=list)
empty_retrievals: int = 0
cache_hits: int = 0
token_usage: int = 0
def record_query(self, retrieval_ms, generation_ms, docs_count, cached, tokens):
self.query_count += 1
self.retrieval_latencies.append(retrieval_ms)
self.generation_latencies.append(generation_ms)
self.total_latency_ms += retrieval_ms + generation_ms
if docs_count == 0:
self.empty_retrievals += 1
if cached:
self.cache_hits += 1
self.token_usage += tokens
def summary(self):
if not self.query_count:
return "No queries recorded."
avg_retrieval = sum(self.retrieval_latencies) / len(self.retrieval_latencies)
avg_generation = sum(self.generation_latencies) / len(self.generation_latencies)
return {
"total_queries": self.query_count,
"avg_retrieval_ms": round(avg_retrieval, 1),
"avg_generation_ms": round(avg_generation, 1),
"avg_total_ms": round(self.total_latency_ms / self.query_count, 1),
"p95_retrieval_ms": round(sorted(self.retrieval_latencies)[int(len(self.retrieval_latencies) * 0.95)], 1),
"cache_hit_rate": round(self.cache_hits / self.query_count * 100, 1),
"empty_retrieval_rate": round(self.empty_retrievals / self.query_count * 100, 1),
"total_tokens": self.token_usage,
}
# Simulate production metrics
metrics = RAGMetrics()
import random
random.seed(42)
for i in range(100):
retrieval_ms = random.gauss(50, 15)
generation_ms = random.gauss(200, 50)
docs_count = random.choice([0, 2, 3, 3, 3, 4, 5])
cached = random.random() < 0.3
tokens = random.randint(100, 500)
metrics.record_query(retrieval_ms, generation_ms, docs_count, cached, tokens)
print("Production RAG Metrics (simulated 100 queries):")
print("=" * 50)
for key, value in metrics.summary().items():
print(f" {key}: {value}")7. Cost Optimization
Strategies to reduce LLM and embedding costs in production.
cost_strategies = {
"Semantic Caching": {
"Savings": "30-60% LLM costs",
"Implementation": "Cache responses for similar queries",
"Trade-off": "Stale answers, threshold tuning",
},
"Model Cascading": {
"Savings": "50-80% per simple query",
"Implementation": "Route simple queries to cheap model, complex to GPT-4",
"Trade-off": "Routing latency, potential quality loss",
},
"Embedding Caching": {
"Savings": "Eliminate repeat embedding costs",
"Implementation": "Cache embeddings for frequently asked questions",
"Trade-off": "Memory usage, cache invalidation",
},
"Chunk Deduplication": {
"Savings": "10-30% fewer chunks to embed",
"Implementation": "Hash chunks, skip duplicates",
"Trade-off": "Near-duplicates may slip through",
},
"Prompt Compression": {
"Savings": "40-60% token reduction",
"Implementation": "LongLLMLinguaCompressor, summary-based context",
"Trade-off": "Information loss, compression latency",
},
}
print("Cost Optimization Strategies:")
print("=" * 60)
for strategy, details in cost_strategies.items():
print(f"\n{strategy}:")
for key, value in details.items():
print(f" {key}: {value}")
# Cost estimation
print("\n" + "=" * 60)
queries_per_month = 100_000
avg_tokens_per_query = 2000
gpt4o_mini_price = 0.15 / 1_000_000 # per input token
cache_hit_rate = 0.35
base_cost = queries_per_month * avg_tokens_per_query * gpt4o_mini_price
cached_cost = base_cost * (1 - cache_hit_rate)
print(f"\nMonthly Cost Estimate ({queries_per_month:,} queries):")
print(f" Without caching: ${base_cost:.2f}")
print(f" With {cache_hit_rate:.0%} cache hit rate: ${cached_cost:.2f}")
print(f" Monthly savings: ${base_cost - cached_cost:.2f}")8. Production Checklist
production_checklist = [
("Infrastructure", [
"Choose vector DB based on scale and feature needs",
"Set up semantic caching with appropriate threshold",
"Implement query routing for multi-source retrieval",
"Configure hybrid search with tuned RRF weights",
]),
("Quality", [
"Set up automated evaluation (RAGAS/DeepEval) in CI",
"Monitor retrieval relevance and generation faithfulness",
"A/B test chunking strategies and retrieval parameters",
"Implement feedback collection from end users",
]),
("Observability", [
"Track latency percentiles (p50, p95, p99)",
"Alert on empty retrieval rate spikes",
"Log all queries and retrieved contexts for debugging",
"Monitor token usage and cost per query",
]),
("Cost Control", [
"Implement model cascading (cheap → expensive)",
"Cache embeddings for static documents",
"Set rate limits and cost budgets",
"Use prompt compression for large contexts",
]),
]
print("Production RAG Checklist:")
print("=" * 60)
for category, items in production_checklist:
print(f"\n{category}:")
for item in items:
print(f" [ ] {item}")