Python SWE Interview QA - 4

10 Python software engineering interview questions on REST APIs, concurrency in production, latency optimization, and application security.
Author
Published

21 May 2026

Keywords

Python REST API, FastAPI interview, Python concurrency production, API security Python, rate limiting, authentication JWT OAuth, latency optimization, connection pooling, caching strategies, async Python production, OWASP Python, API design best practices

Introduction

This is Part 4 of our Python SWE Interview QA series, focused on building production REST APIs — covering API design, concurrency under load, latency optimization, and security hardening. These are the questions that separate backend engineers who ship to production from those who only build prototypes.

For foundational Python topics, see Python SWE Interview QA - 1. For advanced internals, see Part 2. For design patterns and code structure, see Part 3.


Q1: How do you design a RESTful API in Python and what are the best practices?

Answer:

A well-designed REST API follows consistent conventions for resources, HTTP methods, status codes, and response formats — making it predictable and self-documenting.

graph TD
    CLIENT["Client (Frontend / Mobile)"]
    CLIENT -->|"GET /users/123"| API["REST API (FastAPI / Flask)"]
    API -->|"200 OK + JSON body"| CLIENT

    API --> AUTH["Authentication<br/>JWT / OAuth2"]
    API --> VALID["Validation<br/>Pydantic / Marshmallow"]
    API --> SERVICE["Service Layer<br/>Business Logic"]
    SERVICE --> DB["Database"]
    SERVICE --> CACHE["Cache (Redis)"]

    style API fill:#56cc9d,stroke:#333,color:#fff
    style AUTH fill:#ff7851,stroke:#333,color:#fff
    style SERVICE fill:#6cc3d5,stroke:#333,color:#fff

RESTful URL Design

Action HTTP Method URL Status Code
List users GET /api/v1/users 200
Get one user GET /api/v1/users/{id} 200 / 404
Create user POST /api/v1/users 201
Update user (full) PUT /api/v1/users/{id} 200 / 404
Update user (partial) PATCH /api/v1/users/{id} 200 / 404
Delete user DELETE /api/v1/users/{id} 204 / 404
User’s orders GET /api/v1/users/{id}/orders 200

FastAPI Implementation

from fastapi import FastAPI, HTTPException, Depends, Query, status
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime

app = FastAPI(title="User Service", version="1.0.0")

# Request/Response models (validation + documentation)
class UserCreate(BaseModel):
    name: str = Field(..., min_length=1, max_length=100)
    email: EmailStr
    role: str = "user"

class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    role: str
    created_at: datetime

    model_config = {"from_attributes": True}

class PaginatedResponse(BaseModel):
    items: list[UserResponse]
    total: int
    page: int
    page_size: int

# Endpoints
@app.get("/api/v1/users", response_model=PaginatedResponse)
async def list_users(
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100),
    role: str | None = None,
):
    """List users with pagination and optional filtering."""
    users, total = await user_service.list_users(
        page=page, page_size=page_size, role=role
    )
    return PaginatedResponse(
        items=users, total=total, page=page, page_size=page_size
    )

@app.post("/api/v1/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user_data: UserCreate):
    """Create a new user."""
    user = await user_service.create_user(user_data)
    return user

@app.get("/api/v1/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    """Get a single user by ID."""
    user = await user_service.get_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

API Design Best Practices

Practice Why
Use nouns for resources (/users), not verbs (/getUsers) HTTP method already conveys the action
Version your API (/api/v1/...) Non-breaking evolution
Use proper HTTP status codes Clients can handle responses generically
Paginate list endpoints Prevent unbounded responses
Use Pydantic for request/response models Auto-validation + OpenAPI docs
Return consistent error format {"detail": "...", "code": "..."}
Use plural nouns (/users, not /user) Consistent collection semantics

Q2: How do you handle concurrency in a production Python API?

Answer:

Production APIs must handle thousands of concurrent requests. Python achieves this through a combination of async I/O, worker processes, and connection pooling.

graph TD
    LB["Load Balancer (Nginx)"]
    LB --> W1["Uvicorn Worker 1<br/>(async event loop)"]
    LB --> W2["Uvicorn Worker 2<br/>(async event loop)"]
    LB --> W3["Uvicorn Worker 3<br/>(async event loop)"]
    LB --> W4["Uvicorn Worker 4<br/>(async event loop)"]

    W1 --> POOL["Connection Pool<br/>(asyncpg / aioredis)"]
    W2 --> POOL
    W3 --> POOL
    W4 --> POOL

    POOL --> DB["PostgreSQL"]
    POOL --> REDIS["Redis"]

    style LB fill:#56cc9d,stroke:#333,color:#fff
    style POOL fill:#ffce67,stroke:#333
    style DB fill:#6cc3d5,stroke:#333,color:#fff

Production Deployment Architecture

# gunicorn.conf.py — Production server configuration
import multiprocessing

# Workers = 2-4 × CPU cores (for I/O-bound workloads)
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"  # Async workers
bind = "0.0.0.0:8000"
keepalive = 120
timeout = 30
graceful_timeout = 30
max_requests = 1000        # Restart worker after N requests (prevent memory leaks)
max_requests_jitter = 50   # Randomize restarts to avoid thundering herd

Connection Pooling

from contextlib import asynccontextmanager
import asyncpg
import redis.asyncio as aioredis

# Application lifespan — create pools once at startup
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create connection pools
    app.state.db_pool = await asyncpg.create_pool(
        dsn=settings.database_url,
        min_size=5,          # Keep 5 connections ready
        max_size=20,         # Max 20 concurrent DB connections per worker
        max_inactive_connection_lifetime=300,
    )
    app.state.redis = aioredis.from_url(
        settings.redis_url,
        max_connections=20,
    )
    yield
    # Shutdown: close pools
    await app.state.db_pool.close()
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)

# Usage in endpoints
async def get_db(request: Request) -> asyncpg.Pool:
    return request.app.state.db_pool

@app.get("/api/v1/users/{user_id}")
async def get_user(user_id: int, db: asyncpg.Pool = Depends(get_db)):
    row = await db.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
    if not row:
        raise HTTPException(status_code=404)
    return dict(row)

Concurrency Patterns

Pattern Use When Example
async/await I/O-bound (DB, HTTP, Redis) await db.fetch(...)
Background tasks Fire-and-forget (emails, logs) BackgroundTasks in FastAPI
Task queues Heavy/long-running work Celery, arq, dramatiq
Connection pooling Reuse expensive connections asyncpg pool, Redis pool
Worker processes Utilize multiple CPU cores Gunicorn with N workers

Handling CPU-Bound Work in an Async API

import asyncio
from concurrent.futures import ProcessPoolExecutor

executor = ProcessPoolExecutor(max_workers=4)

def cpu_intensive_task(data: bytes) -> dict:
    """Runs in a separate process — doesn't block event loop."""
    # Heavy computation here (image processing, ML inference, etc.)
    return {"result": process(data)}

@app.post("/api/v1/process")
async def process_data(data: bytes):
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(executor, cpu_intensive_task, data)
    return result

Q3: How do you implement rate limiting and throttling?

Answer:

Rate limiting protects your API from abuse, ensures fair usage, and prevents cascading failures under load.

graph TD
    REQ["Incoming Request"]
    REQ --> CHECK["Check Rate Limit<br/>(Redis counter)"]
    CHECK -->|"Under limit"| PROCESS["Process Request"]
    CHECK -->|"Over limit"| REJECT["429 Too Many Requests<br/>Retry-After: 60"]

    subgraph Algorithms["Rate Limiting Algorithms"]
        A1["Fixed Window<br/>100 req/minute"]
        A2["Sliding Window<br/>Smoother distribution"]
        A3["Token Bucket<br/>Allow bursts"]
        A4["Leaky Bucket<br/>Constant rate"]
    end

    style CHECK fill:#ffce67,stroke:#333
    style REJECT fill:#ff7851,stroke:#333,color:#fff
    style PROCESS fill:#56cc9d,stroke:#333,color:#fff

Sliding Window Rate Limiter with Redis

import time
from fastapi import Request, HTTPException
from redis.asyncio import Redis

class RateLimiter:
    """Sliding window rate limiter using Redis sorted sets."""

    def __init__(self, redis: Redis, requests_per_minute: int = 60):
        self.redis = redis
        self.limit = requests_per_minute
        self.window = 60  # seconds

    async def is_allowed(self, key: str) -> tuple[bool, dict]:
        now = time.time()
        window_start = now - self.window
        pipe = self.redis.pipeline()

        # Remove expired entries
        pipe.zremrangebyscore(key, 0, window_start)
        # Add current request
        pipe.zadd(key, {str(now): now})
        # Count requests in window
        pipe.zcard(key)
        # Set expiry on the key
        pipe.expire(key, self.window)

        _, _, count, _ = await pipe.execute()

        remaining = max(0, self.limit - count)
        headers = {
            "X-RateLimit-Limit": str(self.limit),
            "X-RateLimit-Remaining": str(remaining),
            "X-RateLimit-Reset": str(int(now + self.window)),
        }

        return count <= self.limit, headers

# FastAPI middleware / dependency
async def rate_limit_dependency(request: Request):
    client_ip = request.client.host
    key = f"rate_limit:{client_ip}"
    limiter = RateLimiter(request.app.state.redis)
    allowed, headers = await limiter.is_allowed(key)

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail="Too many requests",
            headers={"Retry-After": "60", **headers},
        )
    # Attach headers to response
    request.state.rate_limit_headers = headers

Rate Limiting Strategies

Strategy Approach Use Case
Per IP Limit by client IP Public APIs
Per user/API key Limit by authenticated identity SaaS APIs
Per endpoint Different limits for different routes Heavy vs light endpoints
Tiered Free: 100/hr, Pro: 10000/hr Commercial APIs
Adaptive Reduce limits under high load Self-protection

Q4: How do you optimize API latency?

Answer:

Latency optimization is a layered problem — you need to identify bottlenecks (usually I/O) and apply targeted strategies at each layer.

graph LR
    subgraph Layers["Latency Optimization Layers"]
        direction TB
        L1["Network: CDN, compression, HTTP/2"]
        L2["Application: caching, async, batching"]
        L3["Database: indexes, query optimization, connection pools"]
        L4["Infrastructure: horizontal scaling, edge computing"]
    end

    style Layers fill:#56cc9d,stroke:#333,color:#fff

Caching Strategies

import hashlib
import json
from functools import wraps
from redis.asyncio import Redis

# Layer 1: In-memory cache (per-worker, fastest)
from cachetools import TTLCache
local_cache = TTLCache(maxsize=1000, ttl=30)  # 30-second TTL

# Layer 2: Redis cache (shared across workers)
async def cache_response(redis: Redis, key: str, ttl: int = 300):
    """Decorator for caching endpoint responses in Redis."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Check cache
            cached = await redis.get(key)
            if cached:
                return json.loads(cached)

            # Compute result
            result = await func(*args, **kwargs)

            # Store in cache
            await redis.set(key, json.dumps(result), ex=ttl)
            return result
        return wrapper
    return decorator

# Layer 3: HTTP cache headers
from fastapi.responses import JSONResponse

@app.get("/api/v1/products/{product_id}")
async def get_product(product_id: int):
    product = await product_service.get(product_id)
    response = JSONResponse(content=product.dict())
    response.headers["Cache-Control"] = "public, max-age=60"
    response.headers["ETag"] = hashlib.md5(
        json.dumps(product.dict()).encode()
    ).hexdigest()
    return response

Database Query Optimization

# SLOW: N+1 query problem
async def get_users_with_orders_slow():
    users = await db.fetch("SELECT * FROM users LIMIT 100")
    for user in users:
        # 100 separate queries!
        orders = await db.fetch(
            "SELECT * FROM orders WHERE user_id = $1", user["id"]
        )
        user["orders"] = orders

# FAST: Single query with JOIN or batch
async def get_users_with_orders_fast():
    return await db.fetch("""
        SELECT u.*, json_agg(o.*) as orders
        FROM users u
        LEFT JOIN orders o ON o.user_id = u.id
        GROUP BY u.id
        LIMIT 100
    """)

# FAST: Batch loading with IN clause
async def get_users_with_orders_batch():
    users = await db.fetch("SELECT * FROM users LIMIT 100")
    user_ids = [u["id"] for u in users]
    orders = await db.fetch(
        "SELECT * FROM orders WHERE user_id = ANY($1)", user_ids
    )
    # Group orders by user_id in Python
    orders_by_user = defaultdict(list)
    for order in orders:
        orders_by_user[order["user_id"]].append(order)
    for user in users:
        user["orders"] = orders_by_user[user["id"]]

Latency Optimization Checklist

Technique Typical Improvement Effort
Add database indexes 10-100x for queries Low
Connection pooling 5-20ms per request Low
Redis caching 50-500ms saved per cache hit Medium
Response compression (gzip) 50-80% smaller payloads Low
Async I/O (avoid blocking) Throughput, not latency Medium
Pagination (limit result size) Prevents timeout Low
N+1 query elimination 10-100x for list endpoints Medium
Background tasks (defer work) Perceived latency reduction Medium
Read replicas Distribute read load High

Q5: How do you implement authentication and authorization in a Python API?

Answer:

Authentication verifies identity (“who are you?”). Authorization checks permissions (“what can you do?”). Production APIs typically use JWT tokens with OAuth2 flows.

graph TD
    LOGIN["POST /auth/login<br/>(email + password)"]
    LOGIN --> VERIFY["Verify credentials<br/>(bcrypt hash check)"]
    VERIFY --> TOKEN["Issue JWT Token<br/>(access + refresh)"]
    TOKEN --> CLIENT["Client stores token"]

    CLIENT -->|"Authorization: Bearer <token>"| API["Protected Endpoint"]
    API --> DECODE["Decode & verify JWT"]
    DECODE --> AUTHZ["Check permissions<br/>(role-based / resource-based)"]
    AUTHZ --> RESPONSE["Return data"]

    style LOGIN fill:#6cc3d5,stroke:#333,color:#fff
    style DECODE fill:#ffce67,stroke:#333
    style AUTHZ fill:#ff7851,stroke:#333,color:#fff

JWT Authentication with FastAPI

from datetime import datetime, timedelta, timezone
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# Configuration
SECRET_KEY = settings.secret_key  # From environment, never hardcoded!
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

class TokenPayload(BaseModel):
    sub: str          # Subject (user ID)
    exp: datetime     # Expiration
    role: str         # User role

def create_access_token(user_id: int, role: str) -> str:
    expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    payload = {"sub": str(user_id), "exp": expire, "role": role}
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

# Dependency: extract and verify current user from JWT
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = await user_service.get_by_id(int(user_id))
    if user is None:
        raise credentials_exception
    return user

Role-Based Authorization

from enum import Enum
from functools import wraps

class Role(str, Enum):
    USER = "user"
    ADMIN = "admin"
    MODERATOR = "moderator"

def require_role(*allowed_roles: Role):
    """Dependency that checks user role."""
    async def role_checker(current_user: User = Depends(get_current_user)):
        if current_user.role not in allowed_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Insufficient permissions",
            )
        return current_user
    return role_checker

# Usage
@app.delete("/api/v1/users/{user_id}")
async def delete_user(
    user_id: int,
    admin: User = Depends(require_role(Role.ADMIN)),
):
    """Only admins can delete users."""
    await user_service.delete(user_id)
    return {"status": "deleted"}

@app.get("/api/v1/admin/stats")
async def admin_stats(
    user: User = Depends(require_role(Role.ADMIN, Role.MODERATOR)),
):
    """Admins and moderators can view stats."""
    return await stats_service.get_dashboard()

Auth Best Practices

Practice Why
Hash passwords with bcrypt (cost factor ≥ 12) Slow hashing resists brute force
Short-lived access tokens (15-30 min) Limits window of compromise
Use refresh tokens for long sessions Access token rotation
Store secrets in environment variables Never in code or git
Validate token on every request Stateless verification
Use HTTPS always Prevent token interception
Implement token revocation (blacklist) Force logout / compromised tokens

Q6: How do you protect a Python API against common security vulnerabilities?

Answer:

Production APIs must defend against the OWASP Top 10 web security risks. Python frameworks provide tools, but developers must use them correctly.

graph TD
    THREATS["Common API Threats"]
    THREATS --> INJ["Injection<br/>(SQL, NoSQL, Command)"]
    THREATS --> AUTH["Broken Authentication"]
    THREATS --> EXPO["Data Exposure<br/>(Sensitive data in responses)"]
    THREATS --> MASS["Mass Assignment<br/>(Unvalidated input fields)"]
    THREATS --> RATE["Lack of Rate Limiting"]
    THREATS --> SSRF["SSRF<br/>(Server-Side Request Forgery)"]

    subgraph Defenses["Defense Layers"]
        D1["Input Validation (Pydantic)"]
        D2["Parameterized Queries"]
        D3["Output Filtering"]
        D4["CORS Configuration"]
        D5["Security Headers"]
        D6["Rate Limiting"]
    end

    style THREATS fill:#ff7851,stroke:#333,color:#fff
    style Defenses fill:#56cc9d,stroke:#333,color:#fff

SQL Injection Prevention

# VULNERABLE: String formatting in SQL
async def get_user_unsafe(username: str):
    # NEVER DO THIS — SQL injection!
    query = f"SELECT * FROM users WHERE username = '{username}'"
    return await db.fetch(query)
    # Attack: username = "'; DROP TABLE users; --"

# SAFE: Parameterized queries (always use these)
async def get_user_safe(username: str):
    query = "SELECT * FROM users WHERE username = $1"
    return await db.fetchrow(query, username)
    # Parameters are escaped automatically

# SAFE: ORM (SQLAlchemy, Tortoise)
async def get_user_orm(username: str):
    return await User.filter(username=username).first()

Input Validation and Mass Assignment Protection

from pydantic import BaseModel, Field, field_validator
import re

class UserCreate(BaseModel):
    """Only these fields are accepted — everything else is ignored."""
    name: str = Field(..., min_length=1, max_length=100)
    email: EmailStr
    password: str = Field(..., min_length=8, max_length=128)

    @field_validator("name")
    @classmethod
    def validate_name(cls, v: str) -> str:
        if not re.match(r"^[a-zA-Z\s\-']+$", v):
            raise ValueError("Name contains invalid characters")
        return v.strip()

    @field_validator("password")
    @classmethod
    def validate_password(cls, v: str) -> str:
        if not re.search(r"[A-Z]", v):
            raise ValueError("Password must contain an uppercase letter")
        if not re.search(r"[0-9]", v):
            raise ValueError("Password must contain a digit")
        return v

# Mass assignment protection: UserCreate does NOT have 'role' or 'is_admin'
# Even if client sends {"name": "Alice", "role": "admin"}, role is ignored
@app.post("/api/v1/users")
async def create_user(data: UserCreate):  # Only name, email, password accepted
    user = await user_service.create(data)
    return UserResponse.model_validate(user)  # Only safe fields in response

Security Headers and CORS

from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware

# CORS: restrict which origins can call your API
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://myapp.com", "https://admin.myapp.com"],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
)

# Trusted hosts: prevent Host header attacks
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["api.myapp.com"])

# Security headers middleware
@app.middleware("http")
async def add_security_headers(request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    return response

Security Checklist

Vulnerability Defense
SQL Injection Parameterized queries / ORM
Mass Assignment Pydantic models with explicit fields
Broken Auth JWT + bcrypt + short expiry
Data Exposure Response models that exclude sensitive fields
SSRF Validate/whitelist URLs before fetching
DoS Rate limiting + request size limits
XSS (if serving HTML) Template escaping, CSP headers
CSRF SameSite cookies + CSRF tokens

Q7: How do you implement background tasks and task queues?

Answer:

Not all work should happen in the request-response cycle. Background tasks handle deferred work (emails, notifications), while task queues handle heavy/long-running jobs (report generation, ML inference).

graph LR
    API["API Server"]
    API -->|"lightweight"| BG["Background Tasks<br/>(in-process)"]
    API -->|"heavy/reliable"| QUEUE["Task Queue<br/>(Celery / arq)"]

    QUEUE --> WORKER["Worker Process(es)"]
    WORKER --> DB["Database"]
    WORKER --> EMAIL["Email Service"]
    WORKER --> ML["ML Model"]

    subgraph InProcess["In-Process (FastAPI BackgroundTasks)"]
        BG1["Send email"]
        BG2["Write audit log"]
        BG3["Update cache"]
    end

    subgraph Distributed["Distributed (Celery / arq)"]
        Q1["Generate PDF report"]
        Q2["Process video upload"]
        Q3["Run ML pipeline"]
        Q4["Bulk data import"]
    end

    style API fill:#56cc9d,stroke:#333,color:#fff
    style QUEUE fill:#ffce67,stroke:#333
    style WORKER fill:#6cc3d5,stroke:#333,color:#fff

FastAPI Background Tasks (Lightweight)

from fastapi import BackgroundTasks

async def send_welcome_email(email: str, name: str):
    """Runs after response is sent — doesn't block the client."""
    await email_service.send(
        to=email,
        subject="Welcome!",
        body=f"Hi {name}, welcome to our platform!"
    )

async def log_signup(user_id: int):
    await analytics.track("user_signup", user_id=user_id)

@app.post("/api/v1/users", status_code=201)
async def create_user(data: UserCreate, background_tasks: BackgroundTasks):
    user = await user_service.create(data)

    # These run AFTER the response is returned to the client
    background_tasks.add_task(send_welcome_email, user.email, user.name)
    background_tasks.add_task(log_signup, user.id)

    return UserResponse.model_validate(user)
    # Client gets 201 immediately — doesn't wait for email

Distributed Task Queue (arq — async-native)

# tasks.py — define tasks
from arq import create_pool
from arq.connections import RedisSettings

async def generate_report(ctx, user_id: int, report_type: str):
    """Heavy task — runs in a separate worker process."""
    data = await fetch_report_data(user_id, report_type)
    pdf = render_pdf(data)  # CPU-intensive
    url = await upload_to_s3(pdf)
    await notify_user(user_id, f"Report ready: {url}")
    return {"url": url}

async def process_image(ctx, image_id: int):
    """Image processing — CPU-bound, offloaded to worker."""
    image = await download_image(image_id)
    thumbnails = create_thumbnails(image, sizes=[128, 256, 512])
    await save_thumbnails(image_id, thumbnails)

# Worker configuration
class WorkerSettings:
    functions = [generate_report, process_image]
    redis_settings = RedisSettings(host="redis")
    max_jobs = 10

# API endpoint — enqueue task
@app.post("/api/v1/reports")
async def request_report(user_id: int, report_type: str):
    redis = await create_pool(RedisSettings(host="redis"))
    job = await redis.enqueue_job("generate_report", user_id, report_type)
    return {"job_id": job.job_id, "status": "queued"}

When to Use What

Approach Use Case Guarantees
BackgroundTasks Email, logging, cache updates Best-effort (lost if server crashes)
arq / Celery Reports, heavy processing, ML Reliable (persisted in Redis/RabbitMQ)
Cron / scheduler Periodic cleanup, daily reports Scheduled execution
Streaming response Real-time progress updates Client stays connected

Q8: How do you handle API versioning and backward compatibility?

Answer:

APIs evolve, but breaking changes destroy client trust. Versioning strategies let you evolve without breaking existing consumers.

graph TD
    subgraph Strategies["Versioning Strategies"]
        URL["URL Path<br/>/api/v1/users<br/>/api/v2/users"]
        HEADER["Header<br/>Accept: application/vnd.api+json;version=2"]
        QUERY["Query Param<br/>/api/users?version=2"]
    end

    URL --> REC["✓ Most common<br/>✓ Easy to understand<br/>✓ Cacheable"]
    HEADER --> ADV["✓ Clean URLs<br/>✗ Harder to test<br/>✗ Not visible in logs"]

    style URL fill:#56cc9d,stroke:#333,color:#fff
    style REC fill:#6cc3d5,stroke:#333,color:#fff

Backward-Compatible Changes (No Version Bump Needed)

Safe Change Why It’s Safe
Add a new optional field to response Existing clients ignore unknown fields
Add a new endpoint No conflict with existing routes
Add optional query parameter Existing calls still work without it
Add a new enum value to response Clients should handle unknown values

Breaking Changes (Require New Version)

Breaking Change Why It Breaks
Remove or rename a field Clients accessing it get errors
Change field type (stringint) Parsing fails
Make optional field required Existing requests missing it fail
Change URL structure Existing bookmarks/integrations break
Change error format Client error handling breaks

Deprecation Strategy

import warnings
from fastapi import Header

@v1_router.get("/users/{user_id}", deprecated=True)  # Shows in OpenAPI docs
async def get_user_v1(user_id: int):
    """Deprecated: Use /api/v2/users/{user_id} instead."""
    # Add sunset header
    response.headers["Sunset"] = "Sat, 01 Jan 2027 00:00:00 GMT"
    response.headers["Deprecation"] = "true"
    response.headers["Link"] = '</api/v2/users>; rel="successor-version"'
    return await get_user_legacy(user_id)

Q9: How do you implement health checks and observability?

Answer:

Production APIs need health checks (for load balancers and orchestrators), metrics (for dashboards), and distributed tracing (for debugging latency across services).

graph TD
    OBS["Observability Stack"]
    OBS --> HEALTH["Health Checks<br/>/health, /ready"]
    OBS --> METRICS["Metrics<br/>Prometheus + Grafana"]
    OBS --> TRACES["Distributed Tracing<br/>OpenTelemetry"]
    OBS --> LOGS["Structured Logs<br/>JSON → ELK/Loki"]

    HEALTH --> LB["Load Balancer<br/>Route traffic to healthy instances"]
    HEALTH --> K8S["Kubernetes<br/>Restart unhealthy pods"]

    style OBS fill:#56cc9d,stroke:#333,color:#fff
    style HEALTH fill:#6cc3d5,stroke:#333,color:#fff
    style METRICS fill:#ffce67,stroke:#333

Health Check Endpoints

from fastapi import FastAPI, status
from pydantic import BaseModel

class HealthResponse(BaseModel):
    status: str
    checks: dict[str, str]

@app.get("/health/live", status_code=200)
async def liveness():
    """Is the process alive? (Kubernetes liveness probe)"""
    return {"status": "alive"}

@app.get("/health/ready", response_model=HealthResponse)
async def readiness():
    """Can the service handle traffic? Check dependencies."""
    checks = {}

    # Check database
    try:
        await app.state.db_pool.fetchval("SELECT 1")
        checks["database"] = "healthy"
    except Exception:
        checks["database"] = "unhealthy"

    # Check Redis
    try:
        await app.state.redis.ping()
        checks["redis"] = "healthy"
    except Exception:
        checks["redis"] = "unhealthy"

    all_healthy = all(v == "healthy" for v in checks.values())

    if not all_healthy:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail={"status": "unhealthy", "checks": checks},
        )
    return HealthResponse(status="healthy", checks=checks)

Request Metrics Middleware

import time
from prometheus_client import Counter, Histogram, generate_latest
from starlette.middleware.base import BaseHTTPMiddleware

REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status_code"],
)
REQUEST_LATENCY = Histogram(
    "http_request_duration_seconds",
    "HTTP request latency",
    ["method", "endpoint"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)

class MetricsMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        duration = time.perf_counter() - start

        endpoint = request.url.path
        REQUEST_COUNT.labels(request.method, endpoint, response.status_code).inc()
        REQUEST_LATENCY.labels(request.method, endpoint).observe(duration)

        return response

app.add_middleware(MetricsMiddleware)

@app.get("/metrics")
async def metrics():
    """Prometheus scrape endpoint."""
    return Response(content=generate_latest(), media_type="text/plain")

Key Metrics to Track

Metric What It Tells You
Request rate (req/s) Traffic volume and trends
Latency percentiles (p50, p95, p99) User experience
Error rate (5xx / total) System reliability
DB connection pool utilization Capacity planning
Queue depth / task latency Background job health
Cache hit ratio Caching effectiveness

Q10: How do you handle graceful shutdown and zero-downtime deployments?

Answer:

Production APIs must handle shutdowns without dropping in-flight requests and deploy new versions without user-visible downtime.

graph TD
    subgraph Shutdown["Graceful Shutdown Flow"]
        direction TB
        SIG["SIGTERM received"]
        SIG --> STOP["Stop accepting new requests"]
        STOP --> DRAIN["Drain in-flight requests<br/>(wait for completion)"]
        DRAIN --> CLEANUP["Close connections<br/>(DB pools, Redis, files)"]
        CLEANUP --> EXIT["Process exits cleanly"]
    end

    subgraph Deploy["Zero-Downtime Deploy"]
        direction TB
        D1["Start new instances"]
        D1 --> D2["Health check passes"]
        D2 --> D3["Route traffic to new instances"]
        D3 --> D4["Drain old instances"]
        D4 --> D5["Stop old instances"]
    end

    style Shutdown fill:#6cc3d5,stroke:#333,color:#fff
    style Deploy fill:#56cc9d,stroke:#333,color:#fff

Graceful Shutdown in FastAPI

import signal
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # STARTUP
    app.state.db_pool = await create_db_pool()
    app.state.redis = await create_redis_pool()
    app.state.task_queue = TaskQueue()

    yield  # Application runs here

    # SHUTDOWN (triggered by SIGTERM)
    # 1. Stop accepting new background tasks
    await app.state.task_queue.stop()

    # 2. Wait for in-flight background tasks to complete (with timeout)
    await asyncio.wait_for(
        app.state.task_queue.drain(),
        timeout=30,  # Force exit after 30s
    )

    # 3. Close external connections
    await app.state.db_pool.close()
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)

Gunicorn Graceful Shutdown

# gunicorn.conf.py
graceful_timeout = 30  # Seconds to wait for workers to finish requests
timeout = 60           # Max time for a single request

# Pre-fork hook: setup signal handling
def worker_exit(server, worker):
    """Called when a worker is shutting down."""
    # Cleanup worker-specific resources
    pass

Zero-Downtime Deployment Strategies

Strategy How It Works Complexity
Rolling update Replace instances one at a time Low (K8s default)
Blue-green Run two full environments, switch traffic Medium
Canary Route small % of traffic to new version High

Kubernetes Configuration

# Deployment with graceful shutdown support
apiVersion: apps/v1
kind: Deployment
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 1
      maxSurge: 1
  template:
    spec:
      terminationGracePeriodSeconds: 45  # Time for graceful shutdown
      containers:
        - name: api
          livenessProbe:
            httpGet:
              path: /health/live
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /health/ready
              port: 8000
            initialDelaySeconds: 10
            periodSeconds: 5
          lifecycle:
            preStop:
              exec:
                command: ["sleep", "5"]  # Allow LB to deregister

Deployment Checklist

Step Purpose
Run database migrations first Schema supports both old and new code
Deploy new version alongside old Both must work concurrently
Health checks pass before routing traffic Catch startup failures
Drain connections before stopping old No dropped requests
Monitor error rates after deploy Catch regressions fast
Have rollback plan ready Revert within minutes if needed

Summary Table

# Topic Key Concept
1 REST API Design Nouns for resources, proper HTTP methods/status codes, Pydantic validation
2 Production Concurrency Async workers + connection pooling + process pool for CPU work
3 Rate Limiting Sliding window with Redis; per-IP/user/endpoint strategies
4 Latency Optimization Caching layers, N+1 elimination, indexes, pagination
5 Authentication JWT + bcrypt + OAuth2; role-based authorization
6 Security Parameterized queries, input validation, CORS, security headers
7 Background Tasks In-process for lightweight; task queues (arq/Celery) for heavy work
8 API Versioning URL-based versioning; additive changes are safe; deprecation headers
9 Observability Health checks, Prometheus metrics, structured logging
10 Graceful Shutdown SIGTERM → stop accepting → drain → cleanup → exit

What’s Next?

This article covered production API concerns. For related content: