Python SWE Interview QA - 5

10 Python software engineering interview questions focused on async programming, await semantics, I/O-bound vs CPU-bound workloads, threads, processes, GIL, and production best practices.
Author
Published

02 June 2026

Keywords

Python async programming, async await interview, I/O bound vs CPU bound, thread vs process Python, Python GIL, asyncio best practices, aiohttp concurrency, ProcessPoolExecutor, ThreadPoolExecutor, async reliability patterns, real world async applications

Introduction

This is Part 5 of our SWE Interview QA series, focused on async programming in real systems: async/await, I/O-bound vs CPU-bound workloads, threads vs processes, GIL behavior, and production best practices.

For fundamentals, see Part 1. For advanced internals, see Part 2. For architecture and production APIs, see Part 3 and Part 4.


Q1: What exactly do async and await do in Python?

Answer:

async def defines a coroutine function. Calling it does not run it immediately; it returns a coroutine object. await tells the event loop: “pause this coroutine here and run other ready work until this awaited operation completes.”

graph TD
    linkStyle default stroke:#000,color:#000
    A["Coroutine A starts"] --> B["await network call"]
    B --> C["Event loop switches to<br/>coroutine B/C/D"]
    C --> D["I/O completes"]
    D --> E["Coroutine A resumes"]

    style A fill:#56cc9d,stroke:#333,color:#fff
    style C fill:#6cc3d5,stroke:#333,color:#fff
    style E fill:#ffce67,stroke:#333

Minimal Mental Model

Keyword Meaning
async def Defines coroutine function
await x Suspend current coroutine until x completes
Event loop Scheduler that runs coroutines cooperatively
Task Scheduled coroutine managed by the loop
import asyncio
import time

async def io_step(name: str, delay: float) -> str:
    print(f"{name}: start")
    await asyncio.sleep(delay)
    print(f"{name}: end")
    return f"{name} done"

async def main() -> None:
    start = time.perf_counter()

    # Sequential awaits: B starts only after A completes.
    print(await io_step("A", 1.0))
    print(await io_step("B", 1.0))

    elapsed = time.perf_counter() - start
    print(f"elapsed={elapsed:.2f}s (about 2.0s => not parallel)")

asyncio.run(main())

Objective of this snippet: show that async/await does not automatically mean parallel execution. With sequential await, work still runs one-by-one.

Expected behavior:

  • A starts and finishes first.
  • B starts only after A is fully done.
  • Total time is about 2 seconds for two 1-second waits, which proves this is sequential.

Parallelism is introduced later (Q2) by scheduling multiple coroutines first (for example with asyncio.gather()), then awaiting them together.


Q2: Why is await in a loop often slow, and how do you make it concurrent?

Answer:

for ...: await ... is typically sequential. You only start the next request after the current one finishes. To overlap waits, schedule all work first, then await together with asyncio.gather() (or TaskGroup in Python 3.11+).

graph LR
    linkStyle default stroke:#000,color:#000
    subgraph Sequential
        direction TB
        S1["Request 1<br/>wait 200ms"] --> S2["Request 2<br/>wait 200ms"]
        S2 --> S3["Request 3<br/>wait 200ms"]
        S3 --> S10["... Request 10<br/>Total ≈ 2000ms"]
    end

    subgraph Concurrent
        direction TB
        C1["Request 1"] 
        C2["Request 2"]
        C3["Request 3"]
        C10["... Request 10"]
        C1 --> G["gather() waits for all<br/>Total ≈ 200ms"]
        C2 --> G
        C3 --> G
        C10 --> G
    end

    style S10 fill:#ff7851,stroke:#333,color:#fff
    style G fill:#56cc9d,stroke:#333,color:#fff
    style Sequential fill:#f8f9fa,stroke:#333,color:#000
    style Concurrent fill:#f8f9fa,stroke:#333,color:#000

import asyncio
import aiohttp
import time

URLS = [
    "https://en.wikipedia.org/wiki/Python_(programming_language)",
    "https://en.wikipedia.org/wiki/Artificial_intelligence",
    "https://en.wikipedia.org/wiki/Machine_learning",
    "https://en.wikipedia.org/wiki/Data_science",
    "https://en.wikipedia.org/wiki/Web_scraping",
    "https://en.wikipedia.org/wiki/RESTful_API",
    "https://en.wikipedia.org/wiki/Network_socket",
    "https://en.wikipedia.org/wiki/Concurrent_computing",
    "https://en.wikipedia.org/wiki/Event_loop",
    "https://en.wikipedia.org/wiki/Distributed_computing",
]

HEADERS = {
    "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AsyncInterviewDemo/1.0",
    "Accept-Language": "en-US,en;q=0.9",
}

async def fetch_text(session: aiohttp.ClientSession, url: str) -> int:
    async with session.get(
        url,
        headers=HEADERS,
        timeout=aiohttp.ClientTimeout(total=10),
    ) as resp:
        resp.raise_for_status()
        text = await resp.text()
        return len(text)

async def sequential() -> float:
    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        for url in URLS:
            _ = await fetch_text(session, url)
    return time.perf_counter() - start

async def concurrent() -> float:
    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_text(session, u) for u in URLS]
        _ = await asyncio.gather(*tasks)
    return time.perf_counter() - start

async def main() -> None:
    t1 = await sequential()
    t2 = await concurrent()
    print(f"sequential={t1:.2f}s concurrent={t2:.2f}s speedup={t1 / t2:.1f}x")

asyncio.run(main())

Q3: What is the difference between I/O-bound and CPU-bound work?

Answer:

  • I/O-bound: mostly waiting on network, database, filesystem, sockets.
  • CPU-bound: mostly computing (compression, image transforms, parsing huge files, ML inference/preprocessing).

graph LR
    linkStyle default stroke:#000,color:#000
    IO["I/O-bound"] --> IO2["Most time waiting"]
    CPU["CPU-bound"] --> CPU2["Most time calculating"]

    IO2 --> ASYNC["Best first choice:<br/>asyncio / threads"]
    CPU2 --> PROC["Best first choice:<br/>processes / native code"]

    style IO fill:#56cc9d,stroke:#333,color:#fff
    style CPU fill:#ff7851,stroke:#333,color:#fff
    style ASYNC fill:#6cc3d5,stroke:#333,color:#fff
    style PROC fill:#ffce67,stroke:#333

Workload Best Fit Why
1,000 HTTP requests asyncio + async client Overlap waiting efficiently
Blocking SDK/file calls ThreadPoolExecutor Keeps event loop responsive
Image/video transform ProcessPoolExecutor True parallel CPU execution
Numerical ops in NumPy Threads/processes may both help Many C libs release GIL

Q4: Thread vs Process vs Async: how do you choose in production?

Answer:

Use this production selection rule:

  1. If the operation is naturally async (HTTP, DB async driver), use asyncio.
  2. If operation is blocking I/O and no async API exists, use threads.
  3. If operation is heavy CPU, use processes.

graph TD
    linkStyle default stroke:#000,color:#000
    TASK["New task arrives"] --> Q1{"Is it naturally<br/>async I/O?"}
    Q1 -->|Yes| ASYNC["Use asyncio"]
    Q1 -->|No| Q2{"Is it blocking I/O<br/>with no async API?"}
    Q2 -->|Yes| THREAD["Use ThreadPoolExecutor"]
    Q2 -->|No| Q3{"Is it CPU-heavy?"}
    Q3 -->|Yes| PROC["Use ProcessPoolExecutor"]
    Q3 -->|No| SYNC["Simple sync code<br/>is enough"]

    style ASYNC fill:#56cc9d,stroke:#333,color:#fff
    style THREAD fill:#6cc3d5,stroke:#333,color:#fff
    style PROC fill:#ff7851,stroke:#333,color:#fff
    style SYNC fill:#ffce67,stroke:#333

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
import time
import requests

thread_pool = ThreadPoolExecutor(max_workers=16)
proc_pool = ProcessPoolExecutor(max_workers=4)

def blocking_http(url: str) -> int:
    return len(requests.get(url, timeout=10).text)

def cpu_heavy(n: int) -> int:
    return sum(i * i for i in range(n))

async def baseline_sequential() -> float:
    """Run blocking I/O + CPU in the main thread sequentially."""
    start = time.perf_counter()
    _ = blocking_http("https://en.wikipedia.org/wiki/Artificial_intelligence")
    _ = cpu_heavy(50_000_000)
    return time.perf_counter() - start

async def offloaded_mixed() -> float:
    """Offload blocking I/O to thread, CPU to process — overlap both."""
    loop = asyncio.get_running_loop()
    start = time.perf_counter()

    io_task = loop.run_in_executor(thread_pool, blocking_http, "https://en.wikipedia.org/wiki/Artificial_intelligence")
    cpu_task = loop.run_in_executor(proc_pool, cpu_heavy, 50_000_000)
    _ = await asyncio.gather(io_task, cpu_task)

    return time.perf_counter() - start

async def main() -> None:
    t_seq = await baseline_sequential()
    t_off = await offloaded_mixed()
    print(f"sequential={t_seq:.2f}s  offloaded={t_off:.2f}s  speedup={t_seq / t_off:.1f}x")

asyncio.run(main())

Why asyncio.get_running_loop() matters: run_in_executor is a method on the event loop object, not a standalone function. You call get_running_loop() inside an async function to get a reference to the loop that is currently driving the coroutine. This gives you access to loop.run_in_executor(...), which submits blocking work to a thread or process pool and returns an awaitable — so the event loop stays free to handle other tasks while the executor does the heavy lifting. Without the loop reference you cannot bridge sync executors into the async world.

What if you use ThreadPoolExecutor/ProcessPoolExecutor directly? You can call executor.submit(func, *args) or pool.map(func, items) without the loop — this is exactly what Q5 does and it works fine there because Q5 has no event loop; it is pure sync code. But inside an async function (like Q4’s offloaded_mixed), calling pool.map(...) or future.result() blocks the entire event loop until the work finishes. That means:

  1. The I/O task and CPU task would run one after the other (no overlap).
  2. While they run, every other coroutine in your program is frozen — no HTTP handling, no DB queries, nothing.

With loop.run_in_executor(), both tasks are dispatched simultaneously and the event loop stays free. gather(io_task, cpu_task) then awaits both at once, so I/O wait and CPU work overlap — cutting total time from t_io + t_cpu to roughly max(t_io, t_cpu).

Rule of thumb: use pool.map() / pool.submit() in pure sync code (scripts, CLI tools, Q5-style benchmarks). Use loop.run_in_executor() whenever you are inside an async context and need to keep the event loop responsive.

How do you know you have an event loop? If your function is declared with async def, you are inside an event loop. The loop was started by asyncio.run(main()) at the top level, and every async def function called from there runs on that loop. It does not matter whether you have one task or ten — any async def function has a running loop. The reason you reach for run_in_executor is when that async function needs to call blocking code (I/O SDK, CPU work) without freezing the loop. So the decision is:

  • One blocking call, no other concurrent work? You could still use run_in_executor to keep the loop alive for other coroutines elsewhere in your program.
  • Multiple blocking calls that can overlap? Definitely use run_in_executor + gather — this is Q4’s pattern.
  • Pure sync script with no event loop at all? Use pool.map() / pool.submit() directly — this is Q5’s pattern.

Q5: How does the GIL affect concurrency choices in Python?

Answer:

GIL (Global Interpreter Lock) is a CPython mechanism that allows only one thread to execute Python bytecode at a time in a single process.

graph TD
    linkStyle default stroke:#000,color:#000
    GIL["GIL: one thread<br/>executes bytecode at a time"]
    GIL --> IO["I/O wait →<br/>thread releases GIL"]
    GIL --> CPU["CPU loop →<br/>thread holds GIL"]
    IO --> SCALE_OK["Other threads run<br/>while waiting ✓"]
    CPU --> SCALE_BAD["Other threads blocked<br/>no real parallelism ✗"]
    SCALE_BAD --> FIX["Solution:<br/>ProcessPoolExecutor"]

    style GIL fill:#ffce67,stroke:#333
    style SCALE_OK fill:#56cc9d,stroke:#333,color:#fff
    style SCALE_BAD fill:#ff7851,stroke:#333,color:#fff
    style FIX fill:#6cc3d5,stroke:#333,color:#fff

GIL implications

Scenario Thread scaling
Pure Python CPU loops Poor scaling due to GIL
I/O waits Good (threads release GIL while waiting)
C extensions that release GIL Can scale better
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_work(n: int) -> int:
    """Pure Python CPU loop — GIL-bound."""
    return sum(i * i for i in range(n))

N = 10_000_000
TASKS = 4

def baseline_sequential() -> float:
    start = time.perf_counter()
    for _ in range(TASKS):
        cpu_work(N)
    return time.perf_counter() - start

def with_threads() -> float:
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=TASKS) as pool:
        list(pool.map(cpu_work, [N] * TASKS))
    return time.perf_counter() - start

def with_processes() -> float:
    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=TASKS) as pool:
        list(pool.map(cpu_work, [N] * TASKS))
    return time.perf_counter() - start

if __name__ == "__main__":
    t_seq = baseline_sequential()
    t_thr = with_threads()
    t_proc = with_processes()
    print(f"sequential={t_seq:.2f}s  threads={t_thr:.2f}s  processes={t_proc:.2f}s")
    print(f"thread_speedup={t_seq / t_thr:.1f}x  process_speedup={t_seq / t_proc:.1f}x")

Expected output: threads give ~1x speedup (no gain due to GIL), processes give ~3–4x speedup (true parallelism across cores).


Q6: What are the async best practices for production code?

Answer:

  1. Use timeouts everywhere (network, DB, queue).
  2. Bound concurrency with semaphores to avoid overload.
  3. Reuse clients/sessions (aiohttp.ClientSession, DB pools).
  4. Prefer structured concurrency (TaskGroup) for lifecycle safety.
  5. Never call blocking APIs directly in async paths.

graph TD
    linkStyle default stroke:#000,color:#000
    REQ["Incoming requests"] --> SEM["Semaphore gate<br/>(max 20 concurrent)"]
    SEM --> SESSION["Shared ClientSession<br/>(connection reuse)"]
    SESSION --> TIMEOUT["Per-request timeout<br/>(8s hard limit)"]
    TIMEOUT --> OK["Success → return data"]
    TIMEOUT --> FAIL["Timeout / error →<br/>graceful fallback"]
    SEM --> TG["TaskGroup manages<br/>all task lifecycles"]
    TG --> CANCEL["On failure →<br/>cancel siblings cleanly"]

    style SEM fill:#56cc9d,stroke:#333,color:#fff
    style SESSION fill:#6cc3d5,stroke:#333,color:#fff
    style TIMEOUT fill:#ffce67,stroke:#333
    style FAIL fill:#ff7851,stroke:#333,color:#fff
    style TG fill:#56cc9d,stroke:#333,color:#fff

How they work together: requests enter through a semaphore (rule 2) that caps active work. Each request uses a shared session (rule 3) with a timeout (rule 1). All tasks are grouped under a TaskGroup (rule 4) so if something goes wrong, cleanup is automatic. Blocking calls never enter this path directly (rule 5); they go through run_in_executor instead.

Why the semaphore matters: without it, if you have 10,000 URLs, asyncio.gather will fire 10,000 connections at once. This exhausts file descriptors, overwhelms upstream servers, and triggers rate-limits or connection resets. A semaphore like Semaphore(20) means only 20 requests are in-flight at any moment; the rest wait their turn without consuming resources. It is the simplest form of client-side backpressure.

import asyncio
import time
import aiohttp

URLS = [f"https://httpbin.org/delay/1" for _ in range(20)]

SEM = asyncio.Semaphore(20)

async def safe_fetch(session: aiohttp.ClientSession, url: str) -> str:
    async with SEM:
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=8)) as r:
                r.raise_for_status()
                return await r.text()
        except asyncio.TimeoutError:
            return "timeout"
        except aiohttp.ClientError as exc:
            return f"client_error: {exc}"

async def baseline_sequential(urls: list[str]) -> float:
    """Fetch URLs one-by-one without semaphore or TaskGroup."""
    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        for url in urls:
            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=8)) as r:
                    await r.text()
            except Exception:
                pass
    return time.perf_counter() - start

async def run_with_best_practices(urls: list[str]) -> float:
    """Fetch with semaphore + TaskGroup + shared session + timeout."""
    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(safe_fetch(session, u)) for u in urls]
        _ = [t.result() for t in tasks]
    return time.perf_counter() - start

async def main() -> None:
    t_seq = await baseline_sequential(URLS)
    t_best = await run_with_best_practices(URLS)
    print(f"sequential={t_seq:.2f}s  best_practices={t_best:.2f}s  speedup={t_seq / t_best:.1f}x")

asyncio.run(main())

Q7: Real use case: high-throughput event enrichment pipeline with asyncio + processes

Answer:

A common architecture is:

  • Async layer handles message ingestion and external I/O.
  • CPU-heavy enrichment/scoring runs in a process pool.

graph LR
    linkStyle default stroke:#000,color:#000
    INGEST["Async ingestion<br/>(event loop)"] --> DISPATCH["Dispatch events<br/>to process pool"]
    DISPATCH --> W1["Worker process 1<br/>enrich_event()"]
    DISPATCH --> W2["Worker process 2<br/>enrich_event()"]
    DISPATCH --> W3["Worker process 3<br/>enrich_event()"]
    DISPATCH --> W4["Worker process 4<br/>enrich_event()"]
    W1 --> GATHER["await gather()<br/>collect all results"]
    W2 --> GATHER
    W3 --> GATHER
    W4 --> GATHER
    GATHER --> OUT["Return enriched<br/>event list"]

    style INGEST fill:#56cc9d,stroke:#333,color:#fff
    style DISPATCH fill:#6cc3d5,stroke:#333,color:#fff
    style W1 fill:#ff7851,stroke:#333,color:#fff
    style W2 fill:#ff7851,stroke:#333,color:#fff
    style W3 fill:#ff7851,stroke:#333,color:#fff
    style W4 fill:#ff7851,stroke:#333,color:#fff
    style GATHER fill:#ffce67,stroke:#333

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
from hashlib import sha256

pool = ProcessPoolExecutor(max_workers=4)

def enrich_event(payload: str) -> dict[str, str | int]:
    # Simulate CPU-heavy parsing/scoring work.
    digest = payload
    for _ in range(50_000):
        digest = sha256(digest.encode()).hexdigest()
    return {"payload": payload, "score": int(digest[:8], 16) % 1000}

def baseline_sequential(events: list[str]) -> list[dict[str, str | int]]:
    """Process events one-by-one in the main process (no parallelism)."""
    return [enrich_event(e) for e in events]

async def process_events_parallel(events: list[str]) -> list[dict[str, str | int]]:
    """Dispatch events to process pool and collect results concurrently."""
    loop = asyncio.get_running_loop()
    tasks = [
        loop.run_in_executor(pool, enrich_event, event)
        for event in events
    ]
    return await asyncio.gather(*tasks)

async def main() -> None:
    events = [f"order_{i}" for i in range(8)]

    # Baseline: sequential in main process
    start = time.perf_counter()
    _ = baseline_sequential(events)
    t_seq = time.perf_counter() - start

    # Parallel: dispatched to 4 worker processes
    start = time.perf_counter()
    _ = await process_events_parallel(events)
    t_par = time.perf_counter() - start

    print(f"sequential={t_seq:.2f}s  parallel={t_par:.2f}s  speedup={t_seq / t_par:.1f}x")

asyncio.run(main())

Step-by-step explanation:

  1. ProcessPoolExecutor(max_workers=4) — creates 4 separate OS processes. Each has its own GIL, so they truly run CPU work in parallel.
  2. enrich_event() — a pure function that does heavy hashing (simulates scoring/parsing). It runs entirely inside a worker process, not in the main event loop.
  3. baseline_sequential() — processes all events one-by-one in the main process. This is the slowest path because only one CPU core is used at a time.
  4. loop.run_in_executor(pool, enrich_event, event) — submits one event to the process pool and returns an awaitable. The event loop stays free to handle other I/O while this runs.
  5. The list comprehension creates one awaitable per event, so all events are dispatched to the pool at once.
  6. await asyncio.gather(*tasks) — waits for all worker processes to finish and collects results in order.
  7. main() compares both approaches: with 8 events and 4 workers, parallel should be roughly 2–4x faster than sequential.

What is the event loop?

The event loop is the central scheduler in asyncio. Think of it as a single-threaded coordinator that:

  • Maintains a queue of ready-to-run coroutines.
  • Runs one coroutine until it hits an await, then switches to the next ready coroutine.
  • Monitors I/O file descriptors (sockets, pipes) and wakes up coroutines when their I/O completes.
  • Never does CPU-heavy work itself — it only orchestrates.

In this Q7 example, the event loop dispatches CPU work to separate processes via run_in_executor, then suspends at await gather(...). While waiting, it could handle other tasks (e.g., incoming HTTP requests, DB queries). When all process workers finish, gather resumes and returns the results.

Why this works well:

  • The event loop never blocks on CPU work; it just dispatches and waits.
  • Up to 4 events are enriched in true parallel (one per process).
  • If you also have network I/O (e.g., fetching metadata), the event loop can handle those concurrently while processes crunch numbers.
  • ProcessPoolExecutor completely bypasses the GIL because each worker is a separate process with its own Python interpreter.

Q8: How do cancellation, retries, and backpressure work in async systems?

Answer:

Production async systems fail under overload unless they enforce backpressure and clean cancellation.

graph LR
    linkStyle default stroke:#000,color:#000
    PROD["Producer<br/>(fast)"] --> Q["Bounded Queue<br/>maxsize=1000"]
    Q -->|"queue full →<br/>producer waits"| PROD
    Q --> W1["Worker 1"]
    Q --> W2["Worker 2"]
    Q --> W3["Worker ..."]
    Q --> W8["Worker 8"]
    W1 --> DONE["task_done()"]
    W8 --> DONE
    DONE -->|"all done"| CANCEL["Cancel workers<br/>+ gather(return_exceptions)"]

    style Q fill:#ffce67,stroke:#333
    style PROD fill:#6cc3d5,stroke:#333,color:#fff
    style CANCEL fill:#ff7851,stroke:#333,color:#fff

How backpressure works: when the queue is full (maxsize=1000), await queue.put() suspends the producer until a worker frees a slot. This prevents unbounded memory growth and keeps the system stable under load.

How does backpressure differ from a semaphore? Both limit concurrency, but they solve different problems at different layers:

Semaphore (Q6) Bounded Queue (Q8)
What it limits Number of tasks executing concurrently Number of items buffered between producer and consumer
Where it sits Around the work itself (wrap the call) Between the producer and workers
Producer behavior Producer creates all tasks immediately; semaphore gates execution Producer is suspended when queue is full — cannot even enqueue more
Memory model All tasks exist in memory; only N run at once Only maxsize items + N workers exist at any time
Best for Capping concurrent outbound connections (HTTP, DB) Decoupling a fast producer from slow consumers

In short: a semaphore says “only N tasks may be active at once” but all tasks are already scheduled. A bounded queue says “only N items may be waiting at once” — the producer itself is slowed down, which is true end-to-end backpressure. Use a semaphore when you already have a fixed batch of work (e.g., 100 URLs). Use a bounded queue when work arrives continuously and you need to throttle the source (e.g., streaming events from Kafka).

import asyncio
import time

async def simulate_io_work(item: str) -> None:
    await asyncio.sleep(0.01)  # simulate fast I/O per item

# --- Baseline: no backpressure, fire all at once ---
async def baseline_unbounded(items: list[str]) -> float:
    start = time.perf_counter()
    tasks = [asyncio.create_task(simulate_io_work(i)) for i in items]
    await asyncio.gather(*tasks)
    return time.perf_counter() - start

# --- Best practice: bounded queue + worker pool ---
async def with_backpressure(items: list[str], num_workers: int = 8) -> float:
    queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=100)

    async def producer() -> None:
        for item in items:
            await queue.put(item)
        for _ in range(num_workers):
            await queue.put(None)  # poison pill

    async def worker() -> None:
        while True:
            item = await queue.get()
            if item is None:
                queue.task_done()
                break
            try:
                await simulate_io_work(item)
            finally:
                queue.task_done()

    start = time.perf_counter()
    workers = [asyncio.create_task(worker()) for _ in range(num_workers)]
    await producer()
    await queue.join()
    await asyncio.gather(*workers)
    return time.perf_counter() - start

async def main() -> None:
    items = [f"event_{i}" for i in range(2000)]

    t_unbounded = await baseline_unbounded(items)
    t_bounded = await with_backpressure(items)

    print(f"unbounded={t_unbounded:.2f}s  bounded={t_bounded:.2f}s")
    print(f"Peak tasks: unbounded={len(items)}  bounded=8 workers + 100 queue slots")

asyncio.run(main())

Key takeaway: both finish in similar wall-clock time, but the bounded version uses far less memory (max 108 concurrent items vs 2000 tasks at once). Under real load with heavier payloads, unbounded approaches cause OOM or file descriptor exhaustion.

Best-practice notes:

  • Use bounded queues for predictable memory.
  • Make operations idempotent for safe retries.
  • Handle CancelledError correctly in long-running tasks.

Q9: What production system designs use async, threads, and processes together?

Answer:

Pattern A: API Aggregator (I/O-heavy)

  • Async fan-out to 10-30 upstream APIs.
  • Timeouts + circuit breakers + fallback cache.

Pattern B: Log Ingestion + Parsing (mixed)

  • Async network ingestion.
  • Thread pool for blocking parser library.
  • Process pool for expensive normalization/compression.

Pattern C: Media Platform

  • Async upload and metadata API.
  • Process workers for CPU-heavy feature extraction/scoring.
  • Queue-driven architecture for reliability and retries.

graph TD
    linkStyle default stroke:#000,color:#000
    API["Async API Layer"] --> Q["Queue"]
    Q --> TP["Thread Workers<br/>blocking I/O adapters"]
    Q --> PP["Process Workers<br/>CPU-heavy transforms"]
    API --> CACHE["Redis Cache"]
    API --> DB["Database"]

    style API fill:#56cc9d,stroke:#333,color:#fff
    style Q fill:#ffce67,stroke:#333
    style PP fill:#ff7851,stroke:#333,color:#fff


Q10: What is the strongest concise answer for concurrency strategy in Python?

Answer:

A strong concise answer:

“I classify workload first: I/O-bound vs CPU-bound. For I/O-bound concurrency I prefer asyncio with timeouts, bounded concurrency, and pooled clients. For blocking I/O dependencies I offload to a ThreadPoolExecutor. For CPU-heavy workloads I use ProcessPoolExecutor or native libraries that release the GIL. In production I add observability, retries, cancellation safety, and backpressure so the system degrades gracefully under load.”

One-page decision matrix

Question If Yes If No
Is it mostly waiting on I/O? asyncio Next question
Is library blocking and cannot be replaced? thread pool Next question
Is it CPU-heavy Python code? process pool Simple sync code may be enough

Summary Table

# Topic Key Concept
1 async/await Cooperative scheduling at await points
2 Sequential vs concurrent await in loop is sequential; gather/TaskGroup overlaps waits
3 I/O vs CPU bound Pick model by bottleneck type
4 Thread vs process vs async Async for I/O, thread for blocking I/O, process for CPU
5 GIL behavior GIL limits CPU scaling in threads for pure Python code
6 Async best practices Timeouts, pools, bounded concurrency, structured concurrency
7 Event pipeline use case Async orchestration + process pool for CPU-heavy enrichment
8 Reliability patterns Backpressure, cancellation, retries
9 Real architectures Aggregators, ingestion pipelines, media systems
10 Concurrency strategy framing Explain workload-first strategy clearly

Real-World Use Case: Food Delivery Platform

Assume one backend powers order placement, restaurant sync, driver dispatch, payment updates, and analytics.

Task in Application Workload Type Best Practice Choice Why
Fan-out to restaurant, pricing, and ETA services per order I/O-bound asyncio + TaskGroup + per-call timeout Concurrent network waits with safer lifecycle management
Call legacy blocking payment SDK Blocking I/O ThreadPoolExecutor via run_in_executor Prevents blocking the event loop
Compute route batches and optimization hints CPU-bound ProcessPoolExecutor True parallelism for heavy Python compute
Write order state transitions to DB I/O-bound Async DB client + connection pool High throughput with controlled resource reuse
Push notifications to customer/driver apps I/O-bound burst traffic asyncio + bounded semaphore Caps concurrency and protects upstreams
Consume delivery events from queue Mixed Bounded asyncio.Queue + worker tasks Built-in backpressure and predictable memory use
Retry transient downstream failures Reliability concern Idempotent handlers + exponential backoff Safe retry behavior without duplicate side effects
Graceful deploy/shutdown during peak traffic Lifecycle concern Structured cancellation (TaskGroup, CancelledError handling) Prevents orphan work and partial state updates

This is the production-ready framing: classify each task by bottleneck, then apply the smallest concurrency model that solves that bottleneck safely.

"""
Food Delivery Platform — concurrency patterns in one service.

Demonstrates: asyncio fan-out, ThreadPoolExecutor for blocking SDK,
ProcessPoolExecutor for CPU work, bounded semaphore, backpressure queue,
structured cancellation, and retry with exponential backoff.
"""

import asyncio
import time
import random
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from hashlib import sha256

# --- Executor pools (shared across the service lifetime) ---
thread_pool = ThreadPoolExecutor(max_workers=8)
proc_pool = ProcessPoolExecutor(max_workers=4)

# --- Simulated external services ---

async def call_restaurant_service(order_id: str) -> dict:
    """Async I/O: fan-out to restaurant availability API."""
    await asyncio.sleep(random.uniform(0.05, 0.15))
    return {"order": order_id, "restaurant": "confirmed"}

async def call_pricing_service(order_id: str) -> dict:
    """Async I/O: get dynamic pricing."""
    await asyncio.sleep(random.uniform(0.05, 0.1))
    return {"order": order_id, "price": round(random.uniform(8.0, 25.0), 2)}

async def call_eta_service(order_id: str) -> dict:
    """Async I/O: estimated delivery time."""
    await asyncio.sleep(random.uniform(0.05, 0.12))
    return {"order": order_id, "eta_minutes": random.randint(15, 45)}

def blocking_payment_sdk(order_id: str, amount: float) -> dict:
    """Blocking I/O: legacy payment SDK with no async API."""
    time.sleep(random.uniform(0.1, 0.2))  # simulates blocking network call
    return {"order": order_id, "payment": "success", "charged": amount}

def compute_route_optimization(waypoints: list[str]) -> dict:
    """CPU-bound: heavy route computation."""
    digest = ",".join(waypoints)
    for _ in range(20_000):
        digest = sha256(digest.encode()).hexdigest()
    return {"route_hash": digest[:12], "waypoints": len(waypoints)}

async def write_order_state(order_id: str, state: str) -> None:
    """Async I/O: DB write (simulated)."""
    await asyncio.sleep(0.02)

# --- Pattern 1: Fan-out with TaskGroup + timeout ---

async def place_order(order_id: str) -> dict:
    """Fan-out to restaurant, pricing, and ETA concurrently."""
    async with asyncio.TaskGroup() as tg:
        t_rest = tg.create_task(
            asyncio.wait_for(call_restaurant_service(order_id), timeout=2.0)
        )
        t_price = tg.create_task(
            asyncio.wait_for(call_pricing_service(order_id), timeout=2.0)
        )
        t_eta = tg.create_task(
            asyncio.wait_for(call_eta_service(order_id), timeout=2.0)
        )
    return {
        "restaurant": t_rest.result(),
        "pricing": t_price.result(),
        "eta": t_eta.result(),
    }

# --- Pattern 2: Blocking SDK via thread pool ---

async def process_payment(order_id: str, amount: float) -> dict:
    """Offload blocking payment SDK to thread pool."""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        thread_pool, blocking_payment_sdk, order_id, amount
    )

# --- Pattern 3: CPU-heavy work via process pool ---

async def optimize_routes(orders: list[str]) -> list[dict]:
    """Dispatch route optimization to process pool."""
    loop = asyncio.get_running_loop()
    tasks = [
        loop.run_in_executor(proc_pool, compute_route_optimization, [o, "warehouse", "customer"])
        for o in orders
    ]
    return await asyncio.gather(*tasks)

# --- Pattern 4: Bounded semaphore for notification burst ---

NOTIFY_SEM = asyncio.Semaphore(10)

async def send_notification(user_id: str, message: str) -> str:
    """Push notification with bounded concurrency."""
    async with NOTIFY_SEM:
        await asyncio.sleep(random.uniform(0.02, 0.05))
        return f"notified:{user_id}"

async def notify_all_drivers(driver_ids: list[str], message: str) -> list[str]:
    """Send to many drivers without overwhelming push service."""
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(send_notification(d, message)) for d in driver_ids]
    return [t.result() for t in tasks]

# --- Pattern 5: Bounded queue for event consumption ---

async def consume_delivery_events(num_events: int, num_workers: int = 4) -> int:
    """Process delivery events with backpressure."""
    queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=50)
    processed = 0

    async def producer():
        for i in range(num_events):
            await queue.put(f"delivery_event_{i}")
        for _ in range(num_workers):
            await queue.put(None)

    async def worker():
        nonlocal processed
        while True:
            item = await queue.get()
            if item is None:
                queue.task_done()
                break
            await asyncio.sleep(0.005)  # simulate processing
            processed += 1
            queue.task_done()

    workers = [asyncio.create_task(worker()) for _ in range(num_workers)]
    await producer()
    await queue.join()
    await asyncio.gather(*workers)
    return processed

# --- Pattern 6: Retry with exponential backoff ---

async def retry_with_backoff(coro_factory, max_retries: int = 3) -> any:
    """Retry a coroutine with exponential backoff."""
    for attempt in range(max_retries + 1):
        try:
            return await coro_factory()
        except Exception as exc:
            if attempt == max_retries:
                raise
            wait = 0.1 * (2 ** attempt)
            await asyncio.sleep(wait)

# --- Baseline: everything sequential, no concurrency ---

async def baseline_sequential() -> float:
    """Run all operations sequentially — no fan-out, no pools, no overlap."""
    start = time.perf_counter()

    # 1. Call services one-by-one (no TaskGroup)
    _ = await call_restaurant_service("ORD-001")
    pricing = await call_pricing_service("ORD-001")
    _ = await call_eta_service("ORD-001")

    # 2. Blocking payment in main thread (blocks event loop)
    _ = blocking_payment_sdk("ORD-001", pricing["price"])

    # 3. DB write
    await write_order_state("ORD-001", "confirmed")

    # 4. Route optimization sequentially in main process
    for o in ["ORD-001", "ORD-002", "ORD-003", "ORD-004"]:
        _ = compute_route_optimization([o, "warehouse", "customer"])

    # 5. Notifications one-by-one
    for d in [f"driver_{i}" for i in range(20)]:
        await asyncio.sleep(random.uniform(0.02, 0.05))

    # 6. Events one-by-one
    for i in range(200):
        await asyncio.sleep(0.005)

    return time.perf_counter() - start

# --- Optimized: all concurrency patterns applied ---

async def optimized_concurrent() -> float:
    """Run with fan-out, executors, semaphore, and backpressure."""
    start = time.perf_counter()

    # 1. Fan-out (TaskGroup + timeout)
    order = await place_order("ORD-001")

    # 2. Payment (thread pool)
    _ = await process_payment("ORD-001", order["pricing"]["price"])

    # 3. DB write (async)
    await write_order_state("ORD-001", "confirmed")

    # 4. Routes (process pool)
    _ = await optimize_routes(["ORD-001", "ORD-002", "ORD-003", "ORD-004"])

    # 5. Notifications (bounded semaphore)
    drivers = [f"driver_{i}" for i in range(20)]
    _ = await notify_all_drivers(drivers, "New order nearby!")

    # 6. Events (backpressure queue)
    _ = await consume_delivery_events(200)

    return time.perf_counter() - start

# --- Main: compare baseline vs optimized ---

async def main() -> None:
    print("=" * 50)
    print("BASELINE (sequential, no concurrency)")
    print("=" * 50)
    t_baseline = await baseline_sequential()
    print(f"Baseline elapsed: {t_baseline:.2f}s\n")

    print("=" * 50)
    print("OPTIMIZED (fan-out, executors, semaphore, queue)")
    print("=" * 50)
    t_optimized = await optimized_concurrent()
    print(f"Optimized elapsed: {t_optimized:.2f}s\n")

    print("=" * 50)
    print(f"RESULT: baseline={t_baseline:.2f}s  optimized={t_optimized:.2f}s  "
          f"speedup={t_baseline / t_optimized:.1f}x")
    print("=" * 50)

asyncio.run(main())

What’s Next?

For related content:

TipEnjoyed this article?

If this article helped you, your support helps us deliver more useful content. Here are a few ways to support our work: