RAG in Production: Scaling, Caching, and Observability

Vector database selection, semantic caching, query routing, hybrid search, monitoring, and cost optimization for production RAG systems

Open In Colab

📖 Read the full article


Table of Contents

  1. Setup & Installation
  2. Vector Database Selection
  3. Semantic Caching
  4. Query Routing
  5. Hybrid Search Tuning
  6. Observability & Monitoring
  7. Cost Optimization
  8. Production Checklist

1. Setup & Installation

!pip install -q langchain langchain-openai langchain-community faiss-cpu numpy rank-bm25
import os
# os.environ["OPENAI_API_KEY"] = "your-api-key-here"  # Uncomment and set

2. Vector Database Selection

Choosing the right vector database depends on scale, latency, hosting preference, and feature requirements.

vector_db_comparison = {
    "Pinecone": {
        "Type": "Fully managed SaaS",
        "Scale": "Billions of vectors",
        "Hybrid Search": "Yes (sparse+dense)",
        "Best For": "Teams wanting zero-ops, quick start",
    },
    "Qdrant": {
        "Type": "Self-hosted or Cloud",
        "Scale": "Billions (disk-backed)",
        "Hybrid Search": "Yes (payload filtering)",
        "Best For": "Flexible filtering, Rust performance",
    },
    "Weaviate": {
        "Type": "Self-hosted or Cloud",
        "Scale": "Hundreds of millions",
        "Hybrid Search": "Yes (BM25+vector)",
        "Best For": "Built-in ML modules, GraphQL API",
    },
    "pgvector": {
        "Type": "PostgreSQL extension",
        "Scale": "Millions",
        "Hybrid Search": "Via SQL joins",
        "Best For": "Existing Postgres infra, ACID compliance",
    },
    "FAISS": {
        "Type": "In-memory library",
        "Scale": "Billions (single machine)",
        "Hybrid Search": "Manual implementation",
        "Best For": "Prototyping, research, batch processing",
    },
    "ChromaDB": {
        "Type": "Embedded / client-server",
        "Scale": "Millions",
        "Hybrid Search": "Metadata filtering",
        "Best For": "Local dev, simple use cases",
    },
}

print("Vector Database Comparison:")
print("=" * 70)
for db, details in vector_db_comparison.items():
    print(f"\n{db}:")
    for key, value in details.items():
        print(f"  {key}: {value}")
# Example: FAISS with production-style config
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document

docs = [
    Document(page_content="Kubernetes orchestrates containerized workloads.", metadata={"category": "infra"}),
    Document(page_content="Prometheus monitors system metrics and alerts.", metadata={"category": "monitoring"}),
    Document(page_content="Redis provides in-memory caching for low latency.", metadata={"category": "caching"}),
    Document(page_content="PostgreSQL is a relational database with ACID compliance.", metadata={"category": "database"}),
    Document(page_content="Nginx serves as a reverse proxy and load balancer.", metadata={"category": "infra"}),
]

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = FAISS.from_documents(docs, embeddings)

# Retrieve with metadata
results = vectorstore.similarity_search_with_score("caching solutions", k=3)
for doc, score in results:
    print(f"  [{doc.metadata['category']}] {doc.page_content} (score: {score:.4f})")

3. Semantic Caching

Cache responses for semantically similar queries to avoid redundant LLM calls.

import numpy as np
import time


class SemanticCache:
    def __init__(self, embeddings, threshold=0.92):
        self.embeddings = embeddings
        self.threshold = threshold
        self.cache = []  # List of (query_embedding, query_text, response)
        self.stats = {"hits": 0, "misses": 0}

    def _cosine_similarity(self, a, b):
        a, b = np.array(a), np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

    def get(self, query: str):
        query_embedding = self.embeddings.embed_query(query)
        best_score = 0.0
        best_response = None
        for cached_emb, cached_query, cached_response in self.cache:
            score = self._cosine_similarity(query_embedding, cached_emb)
            if score > best_score:
                best_score = score
                best_response = (cached_query, cached_response)

        if best_score >= self.threshold and best_response:
            self.stats["hits"] += 1
            return best_response[1], best_score, best_response[0]
        self.stats["misses"] += 1
        return None, best_score, None

    def put(self, query: str, response: str):
        query_embedding = self.embeddings.embed_query(query)
        self.cache.append((query_embedding, query, response))


# Demo semantic caching
cache = SemanticCache(embeddings, threshold=0.90)

# First query (cache miss)
cache.put("How does Kubernetes work?", "Kubernetes orchestrates containers across clusters...")
cache.put("What is Redis used for?", "Redis is an in-memory data store used for caching...")

# Similar query (should be cache hit)
queries_to_test = [
    "How does K8s work?",
    "What's Redis for?",
    "What is machine learning?",
]

for q in queries_to_test:
    response, score, matched = cache.get(q)
    status = "HIT" if response else "MISS"
    print(f"[{status}] '{q}' (similarity: {score:.4f})")
    if matched:
        print(f"  Matched: '{matched}'")

print(f"\nCache stats: {cache.stats}")

4. Query Routing

Route queries to the most efficient data source or strategy.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from typing import Literal

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)


class QueryRoute(BaseModel):
    """Route query to the best backend."""
    target: Literal["vector_store", "sql_database", "api", "llm_only"] = Field(
        description="Best backend for this query"
    )
    reasoning: str = Field(description="Brief reasoning")


router = llm.with_structured_output(QueryRoute)
route_prompt = ChatPromptTemplate.from_template(
    "Route this query to the best backend:\n"
    "- vector_store: Semantic text search (documentation, articles)\n"
    "- sql_database: Structured data queries (counts, aggregations, filtering)\n"
    "- api: Real-time data (stock prices, weather, live status)\n"
    "- llm_only: General knowledge, no external data needed\n\n"
    "Query: {query}"
)

route_chain = route_prompt | router

test_queries = [
    "What is the SLA for our API?",
    "How many customers signed up last month?",
    "What's the current price of GOOGL?",
    "Explain the CAP theorem",
]

print("Query Routing Results:")
print("=" * 60)
for q in test_queries:
    result = route_chain.invoke({"query": q})
    print(f"\nQ: {q}")
    print(f"  Target: {result.target}")
    print(f"  Reason: {result.reasoning}")

5. Hybrid Search Tuning

Combine BM25 (keyword) and vector (semantic) search using Reciprocal Rank Fusion (RRF).

from rank_bm25 import BM25Okapi

# Simple hybrid search implementation
texts = [
    "Kubernetes manages container orchestration and scaling",
    "Docker containers package applications with dependencies",
    "Prometheus alerting monitors service health metrics",
    "Redis cache improves response latency significantly",
    "PostgreSQL handles ACID-compliant relational data",
]

# BM25 retriever
tokenized = [text.lower().split() for text in texts]
bm25 = BM25Okapi(tokenized)

# Vector retriever (reuse FAISS from above)
faiss_docs = [Document(page_content=t) for t in texts]
vs = FAISS.from_documents(faiss_docs, embeddings)


def reciprocal_rank_fusion(results_list, k=60):
    """Merge multiple ranked lists using RRF."""
    scores = {}
    for results in results_list:
        for rank, (doc_text, _) in enumerate(results):
            scores[doc_text] = scores.get(doc_text, 0) + 1.0 / (rank + k)
    return sorted(scores.items(), key=lambda x: x[1], reverse=True)


def hybrid_search(query, alpha=0.5, k=3):
    # BM25 results
    bm25_scores = bm25.get_scores(query.lower().split())
    bm25_ranked = sorted(zip(texts, bm25_scores), key=lambda x: x[1], reverse=True)[:k]

    # Vector results
    vector_results = vs.similarity_search_with_score(query, k=k)
    vector_ranked = [(doc.page_content, score) for doc, score in vector_results]

    # Fuse with RRF
    fused = reciprocal_rank_fusion([bm25_ranked, vector_ranked])
    return fused[:k], bm25_ranked, vector_ranked


query = "container orchestration scaling"
fused, bm25_res, vector_res = hybrid_search(query)

print(f"Query: '{query}'")
print(f"\nBM25 results:")
for text, score in bm25_res:
    print(f"  {score:.4f} | {text}")
print(f"\nVector results:")
for text, score in vector_res:
    print(f"  {score:.4f} | {text}")
print(f"\nRRF Fused results:")
for text, score in fused:
    print(f"  {score:.6f} | {text}")

6. Observability & Monitoring

Track retrieval quality, latency, and LLM cost in production.

import time
from dataclasses import dataclass, field


@dataclass
class RAGMetrics:
    query_count: int = 0
    total_latency_ms: float = 0.0
    retrieval_latencies: list = field(default_factory=list)
    generation_latencies: list = field(default_factory=list)
    empty_retrievals: int = 0
    cache_hits: int = 0
    token_usage: int = 0

    def record_query(self, retrieval_ms, generation_ms, docs_count, cached, tokens):
        self.query_count += 1
        self.retrieval_latencies.append(retrieval_ms)
        self.generation_latencies.append(generation_ms)
        self.total_latency_ms += retrieval_ms + generation_ms
        if docs_count == 0:
            self.empty_retrievals += 1
        if cached:
            self.cache_hits += 1
        self.token_usage += tokens

    def summary(self):
        if not self.query_count:
            return "No queries recorded."
        avg_retrieval = sum(self.retrieval_latencies) / len(self.retrieval_latencies)
        avg_generation = sum(self.generation_latencies) / len(self.generation_latencies)
        return {
            "total_queries": self.query_count,
            "avg_retrieval_ms": round(avg_retrieval, 1),
            "avg_generation_ms": round(avg_generation, 1),
            "avg_total_ms": round(self.total_latency_ms / self.query_count, 1),
            "p95_retrieval_ms": round(sorted(self.retrieval_latencies)[int(len(self.retrieval_latencies) * 0.95)], 1),
            "cache_hit_rate": round(self.cache_hits / self.query_count * 100, 1),
            "empty_retrieval_rate": round(self.empty_retrievals / self.query_count * 100, 1),
            "total_tokens": self.token_usage,
        }


# Simulate production metrics
metrics = RAGMetrics()
import random
random.seed(42)

for i in range(100):
    retrieval_ms = random.gauss(50, 15)
    generation_ms = random.gauss(200, 50)
    docs_count = random.choice([0, 2, 3, 3, 3, 4, 5])
    cached = random.random() < 0.3
    tokens = random.randint(100, 500)
    metrics.record_query(retrieval_ms, generation_ms, docs_count, cached, tokens)

print("Production RAG Metrics (simulated 100 queries):")
print("=" * 50)
for key, value in metrics.summary().items():
    print(f"  {key}: {value}")

7. Cost Optimization

Strategies to reduce LLM and embedding costs in production.

cost_strategies = {
    "Semantic Caching": {
        "Savings": "30-60% LLM costs",
        "Implementation": "Cache responses for similar queries",
        "Trade-off": "Stale answers, threshold tuning",
    },
    "Model Cascading": {
        "Savings": "50-80% per simple query",
        "Implementation": "Route simple queries to cheap model, complex to GPT-4",
        "Trade-off": "Routing latency, potential quality loss",
    },
    "Embedding Caching": {
        "Savings": "Eliminate repeat embedding costs",
        "Implementation": "Cache embeddings for frequently asked questions",
        "Trade-off": "Memory usage, cache invalidation",
    },
    "Chunk Deduplication": {
        "Savings": "10-30% fewer chunks to embed",
        "Implementation": "Hash chunks, skip duplicates",
        "Trade-off": "Near-duplicates may slip through",
    },
    "Prompt Compression": {
        "Savings": "40-60% token reduction",
        "Implementation": "LongLLMLinguaCompressor, summary-based context",
        "Trade-off": "Information loss, compression latency",
    },
}

print("Cost Optimization Strategies:")
print("=" * 60)
for strategy, details in cost_strategies.items():
    print(f"\n{strategy}:")
    for key, value in details.items():
        print(f"  {key}: {value}")

# Cost estimation
print("\n" + "=" * 60)
queries_per_month = 100_000
avg_tokens_per_query = 2000
gpt4o_mini_price = 0.15 / 1_000_000  # per input token
cache_hit_rate = 0.35

base_cost = queries_per_month * avg_tokens_per_query * gpt4o_mini_price
cached_cost = base_cost * (1 - cache_hit_rate)

print(f"\nMonthly Cost Estimate ({queries_per_month:,} queries):")
print(f"  Without caching: ${base_cost:.2f}")
print(f"  With {cache_hit_rate:.0%} cache hit rate: ${cached_cost:.2f}")
print(f"  Monthly savings: ${base_cost - cached_cost:.2f}")

8. Production Checklist

production_checklist = [
    ("Infrastructure", [
        "Choose vector DB based on scale and feature needs",
        "Set up semantic caching with appropriate threshold",
        "Implement query routing for multi-source retrieval",
        "Configure hybrid search with tuned RRF weights",
    ]),
    ("Quality", [
        "Set up automated evaluation (RAGAS/DeepEval) in CI",
        "Monitor retrieval relevance and generation faithfulness",
        "A/B test chunking strategies and retrieval parameters",
        "Implement feedback collection from end users",
    ]),
    ("Observability", [
        "Track latency percentiles (p50, p95, p99)",
        "Alert on empty retrieval rate spikes",
        "Log all queries and retrieved contexts for debugging",
        "Monitor token usage and cost per query",
    ]),
    ("Cost Control", [
        "Implement model cascading (cheap → expensive)",
        "Cache embeddings for static documents",
        "Set rate limits and cost budgets",
        "Use prompt compression for large contexts",
    ]),
]

print("Production RAG Checklist:")
print("=" * 60)
for category, items in production_checklist:
    print(f"\n{category}:")
    for item in items:
        print(f"  [ ] {item}")