graph TD
CLIENT["Client (Frontend / Mobile)"]
CLIENT -->|"GET /users/123"| API["REST API (FastAPI / Flask)"]
API -->|"200 OK + JSON body"| CLIENT
API --> AUTH["Authentication<br/>JWT / OAuth2"]
API --> VALID["Validation<br/>Pydantic / Marshmallow"]
API --> SERVICE["Service Layer<br/>Business Logic"]
SERVICE --> DB["Database"]
SERVICE --> CACHE["Cache (Redis)"]
style API fill:#56cc9d,stroke:#333,color:#fff
style AUTH fill:#ff7851,stroke:#333,color:#fff
style SERVICE fill:#6cc3d5,stroke:#333,color:#fff
Python SWE Interview QA - 4
Python REST API, FastAPI interview, Python concurrency production, API security Python, rate limiting, authentication JWT OAuth, latency optimization, connection pooling, caching strategies, async Python production, OWASP Python, API design best practices
Introduction
This is Part 4 of our Python SWE Interview QA series, focused on building production REST APIs — covering API design, concurrency under load, latency optimization, and security hardening. These are the questions that separate backend engineers who ship to production from those who only build prototypes.
For foundational Python topics, see Python SWE Interview QA - 1. For advanced internals, see Part 2. For design patterns and code structure, see Part 3.
Q1: How do you design a RESTful API in Python and what are the best practices?
Answer:
A well-designed REST API follows consistent conventions for resources, HTTP methods, status codes, and response formats — making it predictable and self-documenting.
RESTful URL Design
| Action | HTTP Method | URL | Status Code |
|---|---|---|---|
| List users | GET |
/api/v1/users |
200 |
| Get one user | GET |
/api/v1/users/{id} |
200 / 404 |
| Create user | POST |
/api/v1/users |
201 |
| Update user (full) | PUT |
/api/v1/users/{id} |
200 / 404 |
| Update user (partial) | PATCH |
/api/v1/users/{id} |
200 / 404 |
| Delete user | DELETE |
/api/v1/users/{id} |
204 / 404 |
| User’s orders | GET |
/api/v1/users/{id}/orders |
200 |
FastAPI Implementation
from fastapi import FastAPI, HTTPException, Depends, Query, status
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime
app = FastAPI(title="User Service", version="1.0.0")
# Request/Response models (validation + documentation)
class UserCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
email: EmailStr
role: str = "user"
class UserResponse(BaseModel):
id: int
name: str
email: str
role: str
created_at: datetime
model_config = {"from_attributes": True}
class PaginatedResponse(BaseModel):
items: list[UserResponse]
total: int
page: int
page_size: int
# Endpoints
@app.get("/api/v1/users", response_model=PaginatedResponse)
async def list_users(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
role: str | None = None,
):
"""List users with pagination and optional filtering."""
users, total = await user_service.list_users(
page=page, page_size=page_size, role=role
)
return PaginatedResponse(
items=users, total=total, page=page, page_size=page_size
)
@app.post("/api/v1/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user_data: UserCreate):
"""Create a new user."""
user = await user_service.create_user(user_data)
return user
@app.get("/api/v1/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""Get a single user by ID."""
user = await user_service.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return userAPI Design Best Practices
| Practice | Why |
|---|---|
Use nouns for resources (/users), not verbs (/getUsers) |
HTTP method already conveys the action |
Version your API (/api/v1/...) |
Non-breaking evolution |
| Use proper HTTP status codes | Clients can handle responses generically |
| Paginate list endpoints | Prevent unbounded responses |
| Use Pydantic for request/response models | Auto-validation + OpenAPI docs |
| Return consistent error format | {"detail": "...", "code": "..."} |
Use plural nouns (/users, not /user) |
Consistent collection semantics |
Q2: How do you handle concurrency in a production Python API?
Answer:
Production APIs must handle thousands of concurrent requests. Python achieves this through a combination of async I/O, worker processes, and connection pooling.
graph TD
LB["Load Balancer (Nginx)"]
LB --> W1["Uvicorn Worker 1<br/>(async event loop)"]
LB --> W2["Uvicorn Worker 2<br/>(async event loop)"]
LB --> W3["Uvicorn Worker 3<br/>(async event loop)"]
LB --> W4["Uvicorn Worker 4<br/>(async event loop)"]
W1 --> POOL["Connection Pool<br/>(asyncpg / aioredis)"]
W2 --> POOL
W3 --> POOL
W4 --> POOL
POOL --> DB["PostgreSQL"]
POOL --> REDIS["Redis"]
style LB fill:#56cc9d,stroke:#333,color:#fff
style POOL fill:#ffce67,stroke:#333
style DB fill:#6cc3d5,stroke:#333,color:#fff
Production Deployment Architecture
# gunicorn.conf.py — Production server configuration
import multiprocessing
# Workers = 2-4 × CPU cores (for I/O-bound workloads)
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker" # Async workers
bind = "0.0.0.0:8000"
keepalive = 120
timeout = 30
graceful_timeout = 30
max_requests = 1000 # Restart worker after N requests (prevent memory leaks)
max_requests_jitter = 50 # Randomize restarts to avoid thundering herdConnection Pooling
from contextlib import asynccontextmanager
import asyncpg
import redis.asyncio as aioredis
# Application lifespan — create pools once at startup
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create connection pools
app.state.db_pool = await asyncpg.create_pool(
dsn=settings.database_url,
min_size=5, # Keep 5 connections ready
max_size=20, # Max 20 concurrent DB connections per worker
max_inactive_connection_lifetime=300,
)
app.state.redis = aioredis.from_url(
settings.redis_url,
max_connections=20,
)
yield
# Shutdown: close pools
await app.state.db_pool.close()
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)
# Usage in endpoints
async def get_db(request: Request) -> asyncpg.Pool:
return request.app.state.db_pool
@app.get("/api/v1/users/{user_id}")
async def get_user(user_id: int, db: asyncpg.Pool = Depends(get_db)):
row = await db.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
if not row:
raise HTTPException(status_code=404)
return dict(row)Concurrency Patterns
| Pattern | Use When | Example |
|---|---|---|
| async/await | I/O-bound (DB, HTTP, Redis) | await db.fetch(...) |
| Background tasks | Fire-and-forget (emails, logs) | BackgroundTasks in FastAPI |
| Task queues | Heavy/long-running work | Celery, arq, dramatiq |
| Connection pooling | Reuse expensive connections | asyncpg pool, Redis pool |
| Worker processes | Utilize multiple CPU cores | Gunicorn with N workers |
Handling CPU-Bound Work in an Async API
import asyncio
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(max_workers=4)
def cpu_intensive_task(data: bytes) -> dict:
"""Runs in a separate process — doesn't block event loop."""
# Heavy computation here (image processing, ML inference, etc.)
return {"result": process(data)}
@app.post("/api/v1/process")
async def process_data(data: bytes):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, cpu_intensive_task, data)
return resultQ3: How do you implement rate limiting and throttling?
Answer:
Rate limiting protects your API from abuse, ensures fair usage, and prevents cascading failures under load.
graph TD
REQ["Incoming Request"]
REQ --> CHECK["Check Rate Limit<br/>(Redis counter)"]
CHECK -->|"Under limit"| PROCESS["Process Request"]
CHECK -->|"Over limit"| REJECT["429 Too Many Requests<br/>Retry-After: 60"]
subgraph Algorithms["Rate Limiting Algorithms"]
A1["Fixed Window<br/>100 req/minute"]
A2["Sliding Window<br/>Smoother distribution"]
A3["Token Bucket<br/>Allow bursts"]
A4["Leaky Bucket<br/>Constant rate"]
end
style CHECK fill:#ffce67,stroke:#333
style REJECT fill:#ff7851,stroke:#333,color:#fff
style PROCESS fill:#56cc9d,stroke:#333,color:#fff
Sliding Window Rate Limiter with Redis
import time
from fastapi import Request, HTTPException
from redis.asyncio import Redis
class RateLimiter:
"""Sliding window rate limiter using Redis sorted sets."""
def __init__(self, redis: Redis, requests_per_minute: int = 60):
self.redis = redis
self.limit = requests_per_minute
self.window = 60 # seconds
async def is_allowed(self, key: str) -> tuple[bool, dict]:
now = time.time()
window_start = now - self.window
pipe = self.redis.pipeline()
# Remove expired entries
pipe.zremrangebyscore(key, 0, window_start)
# Add current request
pipe.zadd(key, {str(now): now})
# Count requests in window
pipe.zcard(key)
# Set expiry on the key
pipe.expire(key, self.window)
_, _, count, _ = await pipe.execute()
remaining = max(0, self.limit - count)
headers = {
"X-RateLimit-Limit": str(self.limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(now + self.window)),
}
return count <= self.limit, headers
# FastAPI middleware / dependency
async def rate_limit_dependency(request: Request):
client_ip = request.client.host
key = f"rate_limit:{client_ip}"
limiter = RateLimiter(request.app.state.redis)
allowed, headers = await limiter.is_allowed(key)
if not allowed:
raise HTTPException(
status_code=429,
detail="Too many requests",
headers={"Retry-After": "60", **headers},
)
# Attach headers to response
request.state.rate_limit_headers = headersRate Limiting Strategies
| Strategy | Approach | Use Case |
|---|---|---|
| Per IP | Limit by client IP | Public APIs |
| Per user/API key | Limit by authenticated identity | SaaS APIs |
| Per endpoint | Different limits for different routes | Heavy vs light endpoints |
| Tiered | Free: 100/hr, Pro: 10000/hr | Commercial APIs |
| Adaptive | Reduce limits under high load | Self-protection |
Q4: How do you optimize API latency?
Answer:
Latency optimization is a layered problem — you need to identify bottlenecks (usually I/O) and apply targeted strategies at each layer.
graph LR
subgraph Layers["Latency Optimization Layers"]
direction TB
L1["Network: CDN, compression, HTTP/2"]
L2["Application: caching, async, batching"]
L3["Database: indexes, query optimization, connection pools"]
L4["Infrastructure: horizontal scaling, edge computing"]
end
style Layers fill:#56cc9d,stroke:#333,color:#fff
Caching Strategies
import hashlib
import json
from functools import wraps
from redis.asyncio import Redis
# Layer 1: In-memory cache (per-worker, fastest)
from cachetools import TTLCache
local_cache = TTLCache(maxsize=1000, ttl=30) # 30-second TTL
# Layer 2: Redis cache (shared across workers)
async def cache_response(redis: Redis, key: str, ttl: int = 300):
"""Decorator for caching endpoint responses in Redis."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Check cache
cached = await redis.get(key)
if cached:
return json.loads(cached)
# Compute result
result = await func(*args, **kwargs)
# Store in cache
await redis.set(key, json.dumps(result), ex=ttl)
return result
return wrapper
return decorator
# Layer 3: HTTP cache headers
from fastapi.responses import JSONResponse
@app.get("/api/v1/products/{product_id}")
async def get_product(product_id: int):
product = await product_service.get(product_id)
response = JSONResponse(content=product.dict())
response.headers["Cache-Control"] = "public, max-age=60"
response.headers["ETag"] = hashlib.md5(
json.dumps(product.dict()).encode()
).hexdigest()
return responseDatabase Query Optimization
# SLOW: N+1 query problem
async def get_users_with_orders_slow():
users = await db.fetch("SELECT * FROM users LIMIT 100")
for user in users:
# 100 separate queries!
orders = await db.fetch(
"SELECT * FROM orders WHERE user_id = $1", user["id"]
)
user["orders"] = orders
# FAST: Single query with JOIN or batch
async def get_users_with_orders_fast():
return await db.fetch("""
SELECT u.*, json_agg(o.*) as orders
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
GROUP BY u.id
LIMIT 100
""")
# FAST: Batch loading with IN clause
async def get_users_with_orders_batch():
users = await db.fetch("SELECT * FROM users LIMIT 100")
user_ids = [u["id"] for u in users]
orders = await db.fetch(
"SELECT * FROM orders WHERE user_id = ANY($1)", user_ids
)
# Group orders by user_id in Python
orders_by_user = defaultdict(list)
for order in orders:
orders_by_user[order["user_id"]].append(order)
for user in users:
user["orders"] = orders_by_user[user["id"]]Latency Optimization Checklist
| Technique | Typical Improvement | Effort |
|---|---|---|
| Add database indexes | 10-100x for queries | Low |
| Connection pooling | 5-20ms per request | Low |
| Redis caching | 50-500ms saved per cache hit | Medium |
| Response compression (gzip) | 50-80% smaller payloads | Low |
| Async I/O (avoid blocking) | Throughput, not latency | Medium |
| Pagination (limit result size) | Prevents timeout | Low |
| N+1 query elimination | 10-100x for list endpoints | Medium |
| Background tasks (defer work) | Perceived latency reduction | Medium |
| Read replicas | Distribute read load | High |
Q6: How do you protect a Python API against common security vulnerabilities?
Answer:
Production APIs must defend against the OWASP Top 10 web security risks. Python frameworks provide tools, but developers must use them correctly.
graph TD
THREATS["Common API Threats"]
THREATS --> INJ["Injection<br/>(SQL, NoSQL, Command)"]
THREATS --> AUTH["Broken Authentication"]
THREATS --> EXPO["Data Exposure<br/>(Sensitive data in responses)"]
THREATS --> MASS["Mass Assignment<br/>(Unvalidated input fields)"]
THREATS --> RATE["Lack of Rate Limiting"]
THREATS --> SSRF["SSRF<br/>(Server-Side Request Forgery)"]
subgraph Defenses["Defense Layers"]
D1["Input Validation (Pydantic)"]
D2["Parameterized Queries"]
D3["Output Filtering"]
D4["CORS Configuration"]
D5["Security Headers"]
D6["Rate Limiting"]
end
style THREATS fill:#ff7851,stroke:#333,color:#fff
style Defenses fill:#56cc9d,stroke:#333,color:#fff
SQL Injection Prevention
# VULNERABLE: String formatting in SQL
async def get_user_unsafe(username: str):
# NEVER DO THIS — SQL injection!
query = f"SELECT * FROM users WHERE username = '{username}'"
return await db.fetch(query)
# Attack: username = "'; DROP TABLE users; --"
# SAFE: Parameterized queries (always use these)
async def get_user_safe(username: str):
query = "SELECT * FROM users WHERE username = $1"
return await db.fetchrow(query, username)
# Parameters are escaped automatically
# SAFE: ORM (SQLAlchemy, Tortoise)
async def get_user_orm(username: str):
return await User.filter(username=username).first()Input Validation and Mass Assignment Protection
from pydantic import BaseModel, Field, field_validator
import re
class UserCreate(BaseModel):
"""Only these fields are accepted — everything else is ignored."""
name: str = Field(..., min_length=1, max_length=100)
email: EmailStr
password: str = Field(..., min_length=8, max_length=128)
@field_validator("name")
@classmethod
def validate_name(cls, v: str) -> str:
if not re.match(r"^[a-zA-Z\s\-']+$", v):
raise ValueError("Name contains invalid characters")
return v.strip()
@field_validator("password")
@classmethod
def validate_password(cls, v: str) -> str:
if not re.search(r"[A-Z]", v):
raise ValueError("Password must contain an uppercase letter")
if not re.search(r"[0-9]", v):
raise ValueError("Password must contain a digit")
return v
# Mass assignment protection: UserCreate does NOT have 'role' or 'is_admin'
# Even if client sends {"name": "Alice", "role": "admin"}, role is ignored
@app.post("/api/v1/users")
async def create_user(data: UserCreate): # Only name, email, password accepted
user = await user_service.create(data)
return UserResponse.model_validate(user) # Only safe fields in responseSecurity Headers and CORS
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
# CORS: restrict which origins can call your API
app.add_middleware(
CORSMiddleware,
allow_origins=["https://myapp.com", "https://admin.myapp.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
# Trusted hosts: prevent Host header attacks
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["api.myapp.com"])
# Security headers middleware
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["X-XSS-Protection"] = "1; mode=block"
return responseSecurity Checklist
| Vulnerability | Defense |
|---|---|
| SQL Injection | Parameterized queries / ORM |
| Mass Assignment | Pydantic models with explicit fields |
| Broken Auth | JWT + bcrypt + short expiry |
| Data Exposure | Response models that exclude sensitive fields |
| SSRF | Validate/whitelist URLs before fetching |
| DoS | Rate limiting + request size limits |
| XSS (if serving HTML) | Template escaping, CSP headers |
| CSRF | SameSite cookies + CSRF tokens |
Q7: How do you implement background tasks and task queues?
Answer:
Not all work should happen in the request-response cycle. Background tasks handle deferred work (emails, notifications), while task queues handle heavy/long-running jobs (report generation, ML inference).
graph LR
API["API Server"]
API -->|"lightweight"| BG["Background Tasks<br/>(in-process)"]
API -->|"heavy/reliable"| QUEUE["Task Queue<br/>(Celery / arq)"]
QUEUE --> WORKER["Worker Process(es)"]
WORKER --> DB["Database"]
WORKER --> EMAIL["Email Service"]
WORKER --> ML["ML Model"]
subgraph InProcess["In-Process (FastAPI BackgroundTasks)"]
BG1["Send email"]
BG2["Write audit log"]
BG3["Update cache"]
end
subgraph Distributed["Distributed (Celery / arq)"]
Q1["Generate PDF report"]
Q2["Process video upload"]
Q3["Run ML pipeline"]
Q4["Bulk data import"]
end
style API fill:#56cc9d,stroke:#333,color:#fff
style QUEUE fill:#ffce67,stroke:#333
style WORKER fill:#6cc3d5,stroke:#333,color:#fff
FastAPI Background Tasks (Lightweight)
from fastapi import BackgroundTasks
async def send_welcome_email(email: str, name: str):
"""Runs after response is sent — doesn't block the client."""
await email_service.send(
to=email,
subject="Welcome!",
body=f"Hi {name}, welcome to our platform!"
)
async def log_signup(user_id: int):
await analytics.track("user_signup", user_id=user_id)
@app.post("/api/v1/users", status_code=201)
async def create_user(data: UserCreate, background_tasks: BackgroundTasks):
user = await user_service.create(data)
# These run AFTER the response is returned to the client
background_tasks.add_task(send_welcome_email, user.email, user.name)
background_tasks.add_task(log_signup, user.id)
return UserResponse.model_validate(user)
# Client gets 201 immediately — doesn't wait for emailDistributed Task Queue (arq — async-native)
# tasks.py — define tasks
from arq import create_pool
from arq.connections import RedisSettings
async def generate_report(ctx, user_id: int, report_type: str):
"""Heavy task — runs in a separate worker process."""
data = await fetch_report_data(user_id, report_type)
pdf = render_pdf(data) # CPU-intensive
url = await upload_to_s3(pdf)
await notify_user(user_id, f"Report ready: {url}")
return {"url": url}
async def process_image(ctx, image_id: int):
"""Image processing — CPU-bound, offloaded to worker."""
image = await download_image(image_id)
thumbnails = create_thumbnails(image, sizes=[128, 256, 512])
await save_thumbnails(image_id, thumbnails)
# Worker configuration
class WorkerSettings:
functions = [generate_report, process_image]
redis_settings = RedisSettings(host="redis")
max_jobs = 10
# API endpoint — enqueue task
@app.post("/api/v1/reports")
async def request_report(user_id: int, report_type: str):
redis = await create_pool(RedisSettings(host="redis"))
job = await redis.enqueue_job("generate_report", user_id, report_type)
return {"job_id": job.job_id, "status": "queued"}When to Use What
| Approach | Use Case | Guarantees |
|---|---|---|
BackgroundTasks |
Email, logging, cache updates | Best-effort (lost if server crashes) |
| arq / Celery | Reports, heavy processing, ML | Reliable (persisted in Redis/RabbitMQ) |
| Cron / scheduler | Periodic cleanup, daily reports | Scheduled execution |
| Streaming response | Real-time progress updates | Client stays connected |
Q8: How do you handle API versioning and backward compatibility?
Answer:
APIs evolve, but breaking changes destroy client trust. Versioning strategies let you evolve without breaking existing consumers.
graph TD
subgraph Strategies["Versioning Strategies"]
URL["URL Path<br/>/api/v1/users<br/>/api/v2/users"]
HEADER["Header<br/>Accept: application/vnd.api+json;version=2"]
QUERY["Query Param<br/>/api/users?version=2"]
end
URL --> REC["✓ Most common<br/>✓ Easy to understand<br/>✓ Cacheable"]
HEADER --> ADV["✓ Clean URLs<br/>✗ Harder to test<br/>✗ Not visible in logs"]
style URL fill:#56cc9d,stroke:#333,color:#fff
style REC fill:#6cc3d5,stroke:#333,color:#fff
URL-Based Versioning (Recommended)
from fastapi import APIRouter
# Version 1
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users/{user_id}")
async def get_user_v1(user_id: int):
user = await user_service.get(user_id)
return {"id": user.id, "name": user.name, "email": user.email}
# Version 2 — adds new fields, changes format
v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users/{user_id}")
async def get_user_v2(user_id: int):
user = await user_service.get(user_id)
return {
"id": user.id,
"name": {"first": user.first_name, "last": user.last_name},
"email": user.email,
"created_at": user.created_at.isoformat(),
"links": {"self": f"/api/v2/users/{user.id}"},
}
app.include_router(v1_router)
app.include_router(v2_router)Backward-Compatible Changes (No Version Bump Needed)
| Safe Change | Why It’s Safe |
|---|---|
| Add a new optional field to response | Existing clients ignore unknown fields |
| Add a new endpoint | No conflict with existing routes |
| Add optional query parameter | Existing calls still work without it |
| Add a new enum value to response | Clients should handle unknown values |
Breaking Changes (Require New Version)
| Breaking Change | Why It Breaks |
|---|---|
| Remove or rename a field | Clients accessing it get errors |
Change field type (string → int) |
Parsing fails |
| Make optional field required | Existing requests missing it fail |
| Change URL structure | Existing bookmarks/integrations break |
| Change error format | Client error handling breaks |
Deprecation Strategy
import warnings
from fastapi import Header
@v1_router.get("/users/{user_id}", deprecated=True) # Shows in OpenAPI docs
async def get_user_v1(user_id: int):
"""Deprecated: Use /api/v2/users/{user_id} instead."""
# Add sunset header
response.headers["Sunset"] = "Sat, 01 Jan 2027 00:00:00 GMT"
response.headers["Deprecation"] = "true"
response.headers["Link"] = '</api/v2/users>; rel="successor-version"'
return await get_user_legacy(user_id)Q9: How do you implement health checks and observability?
Answer:
Production APIs need health checks (for load balancers and orchestrators), metrics (for dashboards), and distributed tracing (for debugging latency across services).
graph TD
OBS["Observability Stack"]
OBS --> HEALTH["Health Checks<br/>/health, /ready"]
OBS --> METRICS["Metrics<br/>Prometheus + Grafana"]
OBS --> TRACES["Distributed Tracing<br/>OpenTelemetry"]
OBS --> LOGS["Structured Logs<br/>JSON → ELK/Loki"]
HEALTH --> LB["Load Balancer<br/>Route traffic to healthy instances"]
HEALTH --> K8S["Kubernetes<br/>Restart unhealthy pods"]
style OBS fill:#56cc9d,stroke:#333,color:#fff
style HEALTH fill:#6cc3d5,stroke:#333,color:#fff
style METRICS fill:#ffce67,stroke:#333
Health Check Endpoints
from fastapi import FastAPI, status
from pydantic import BaseModel
class HealthResponse(BaseModel):
status: str
checks: dict[str, str]
@app.get("/health/live", status_code=200)
async def liveness():
"""Is the process alive? (Kubernetes liveness probe)"""
return {"status": "alive"}
@app.get("/health/ready", response_model=HealthResponse)
async def readiness():
"""Can the service handle traffic? Check dependencies."""
checks = {}
# Check database
try:
await app.state.db_pool.fetchval("SELECT 1")
checks["database"] = "healthy"
except Exception:
checks["database"] = "unhealthy"
# Check Redis
try:
await app.state.redis.ping()
checks["redis"] = "healthy"
except Exception:
checks["redis"] = "unhealthy"
all_healthy = all(v == "healthy" for v in checks.values())
if not all_healthy:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={"status": "unhealthy", "checks": checks},
)
return HealthResponse(status="healthy", checks=checks)Request Metrics Middleware
import time
from prometheus_client import Counter, Histogram, generate_latest
from starlette.middleware.base import BaseHTTPMiddleware
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status_code"],
)
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"HTTP request latency",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)
class MetricsMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
endpoint = request.url.path
REQUEST_COUNT.labels(request.method, endpoint, response.status_code).inc()
REQUEST_LATENCY.labels(request.method, endpoint).observe(duration)
return response
app.add_middleware(MetricsMiddleware)
@app.get("/metrics")
async def metrics():
"""Prometheus scrape endpoint."""
return Response(content=generate_latest(), media_type="text/plain")Key Metrics to Track
| Metric | What It Tells You |
|---|---|
| Request rate (req/s) | Traffic volume and trends |
| Latency percentiles (p50, p95, p99) | User experience |
| Error rate (5xx / total) | System reliability |
| DB connection pool utilization | Capacity planning |
| Queue depth / task latency | Background job health |
| Cache hit ratio | Caching effectiveness |
Q10: How do you handle graceful shutdown and zero-downtime deployments?
Answer:
Production APIs must handle shutdowns without dropping in-flight requests and deploy new versions without user-visible downtime.
graph TD
subgraph Shutdown["Graceful Shutdown Flow"]
direction TB
SIG["SIGTERM received"]
SIG --> STOP["Stop accepting new requests"]
STOP --> DRAIN["Drain in-flight requests<br/>(wait for completion)"]
DRAIN --> CLEANUP["Close connections<br/>(DB pools, Redis, files)"]
CLEANUP --> EXIT["Process exits cleanly"]
end
subgraph Deploy["Zero-Downtime Deploy"]
direction TB
D1["Start new instances"]
D1 --> D2["Health check passes"]
D2 --> D3["Route traffic to new instances"]
D3 --> D4["Drain old instances"]
D4 --> D5["Stop old instances"]
end
style Shutdown fill:#6cc3d5,stroke:#333,color:#fff
style Deploy fill:#56cc9d,stroke:#333,color:#fff
Graceful Shutdown in FastAPI
import signal
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# STARTUP
app.state.db_pool = await create_db_pool()
app.state.redis = await create_redis_pool()
app.state.task_queue = TaskQueue()
yield # Application runs here
# SHUTDOWN (triggered by SIGTERM)
# 1. Stop accepting new background tasks
await app.state.task_queue.stop()
# 2. Wait for in-flight background tasks to complete (with timeout)
await asyncio.wait_for(
app.state.task_queue.drain(),
timeout=30, # Force exit after 30s
)
# 3. Close external connections
await app.state.db_pool.close()
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)Gunicorn Graceful Shutdown
# gunicorn.conf.py
graceful_timeout = 30 # Seconds to wait for workers to finish requests
timeout = 60 # Max time for a single request
# Pre-fork hook: setup signal handling
def worker_exit(server, worker):
"""Called when a worker is shutting down."""
# Cleanup worker-specific resources
passZero-Downtime Deployment Strategies
| Strategy | How It Works | Complexity |
|---|---|---|
| Rolling update | Replace instances one at a time | Low (K8s default) |
| Blue-green | Run two full environments, switch traffic | Medium |
| Canary | Route small % of traffic to new version | High |
Kubernetes Configuration
# Deployment with graceful shutdown support
apiVersion: apps/v1
kind: Deployment
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 1
template:
spec:
terminationGracePeriodSeconds: 45 # Time for graceful shutdown
containers:
- name: api
livenessProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
lifecycle:
preStop:
exec:
command: ["sleep", "5"] # Allow LB to deregisterDeployment Checklist
| Step | Purpose |
|---|---|
| Run database migrations first | Schema supports both old and new code |
| Deploy new version alongside old | Both must work concurrently |
| Health checks pass before routing traffic | Catch startup failures |
| Drain connections before stopping old | No dropped requests |
| Monitor error rates after deploy | Catch regressions fast |
| Have rollback plan ready | Revert within minutes if needed |
Summary Table
| # | Topic | Key Concept |
|---|---|---|
| 1 | REST API Design | Nouns for resources, proper HTTP methods/status codes, Pydantic validation |
| 2 | Production Concurrency | Async workers + connection pooling + process pool for CPU work |
| 3 | Rate Limiting | Sliding window with Redis; per-IP/user/endpoint strategies |
| 4 | Latency Optimization | Caching layers, N+1 elimination, indexes, pagination |
| 5 | Authentication | JWT + bcrypt + OAuth2; role-based authorization |
| 6 | Security | Parameterized queries, input validation, CORS, security headers |
| 7 | Background Tasks | In-process for lightweight; task queues (arq/Celery) for heavy work |
| 8 | API Versioning | URL-based versioning; additive changes are safe; deprecation headers |
| 9 | Observability | Health checks, Prometheus metrics, structured logging |
| 10 | Graceful Shutdown | SIGTERM → stop accepting → drain → cleanup → exit |
What’s Next?
This article covered production API concerns. For related content:
- Python fundamentals: Python SWE Interview QA - 1
- Advanced internals: Python SWE Interview QA - 2
- Design patterns and structure: Python SWE Interview QA - 3
- Machine learning concepts: ML Interview QA - 1
- LLM architecture: LLM Interview QA - 1