Concurrency

Running multiple tasks that overlap in time. Concurrency is about structure (managing many tasks); parallelism is about execution (running on multiple CPUs simultaneously).

Running multiple tasks that overlap in time. Concurrency is about structure (managing many tasks); parallelism is about execution (running on multiple CPUs simultaneously). Both solve different problems.


Concurrency vs Parallelism

Concurrency: one cook, multiple dishes — switches between tasks
Parallelism: multiple cooks — each works on a dish simultaneously

Python:
  CPU-bound (ML, number crunching) → multiprocessing (bypasses GIL)
  I/O-bound (HTTP, DB, files) → asyncio or threading (GIL released during I/O)
  Mixed → run asyncio + ProcessPoolExecutor

Go: goroutines + channels handle both naturally (no GIL)

Python asyncio

import asyncio
import httpx

# async/await — concurrent I/O without threads
async def fetch_user(client: httpx.AsyncClient, user_id: str) -> dict:
    response = await client.get(f"/users/{user_id}")
    response.raise_for_status()
    return response.json()

async def fetch_all_users(user_ids: list[str]) -> list[dict]:
    async with httpx.AsyncClient(base_url="https://api.example.com") as client:
        tasks = [fetch_user(client, uid) for uid in user_ids]
        # Run all concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)

    users, errors = [], []
    for r in results:
        if isinstance(r, Exception):
            errors.append(r)
        else:
            users.append(r)

    if errors:
        raise ExceptionGroup("Some user fetches failed", errors)
    return users

# Run from synchronous entry point
asyncio.run(fetch_all_users(["u1", "u2", "u3"]))

Semaphore — Limit Concurrency

# Prevent hammering an API with too many concurrent requests
async def fetch_with_limit(urls: list[str], max_concurrent: int = 10) -> list[dict]:
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(client: httpx.AsyncClient, url: str) -> dict:
        async with semaphore:  # blocks if 10 requests already in flight
            response = await client.get(url)
            return response.json()

    async with httpx.AsyncClient() as client:
        return await asyncio.gather(*[fetch_one(client, url) for url in urls])

Thread Safety and Race Conditions

# Race condition: two threads read-increment-write the same value
import threading

counter = 0

def unsafe_increment():
    global counter
    for _ in range(100_000):
        counter += 1   # NOT atomic: read → add → write (3 operations)

threads = [threading.Thread(target=unsafe_increment) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)   # < 1,000,000 — race condition

# Fix: Lock
lock = threading.Lock()
counter = 0

def safe_increment():
    global counter
    for _ in range(100_000):
        with lock:
            counter += 1

# Fix: atomic operations via queue
from queue import Queue

work_queue: Queue = Queue()
result_queue: Queue = Queue()

asyncio.Queue — Producer/Consumer

async def producer(queue: asyncio.Queue, items: list) -> None:
    for item in items:
        await queue.put(item)
    await queue.put(None)   # sentinel

async def consumer(queue: asyncio.Queue, worker_id: int) -> list:
    results = []
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)   # pass sentinel to next consumer
            break
        result = await process(item)
        results.append(result)
        queue.task_done()
    return results

async def pipeline(items: list, workers: int = 5) -> list:
    queue: asyncio.Queue = asyncio.Queue(maxsize=workers * 2)

    producer_task = asyncio.create_task(producer(queue, items))
    consumer_tasks = [asyncio.create_task(consumer(queue, i)) for i in range(workers)]

    await producer_task
    results = await asyncio.gather(*consumer_tasks)
    return [r for batch in results for r in batch]

Python Multiprocessing (CPU-bound)

from concurrent.futures import ProcessPoolExecutor
import numpy as np

def compute_chunk(data_chunk: np.ndarray) -> float:
    return np.sum(data_chunk ** 2)   # CPU-intensive

def parallel_compute(data: np.ndarray, workers: int = 4) -> float:
    chunks = np.array_split(data, workers)

    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(compute_chunk, chunks))

    return sum(results)

Go Goroutines and Channels

// Fan-out/fan-in pattern
func fetchAllUsers(userIDs []string) []User {
    results := make(chan User, len(userIDs))

    for _, id := range userIDs {
        go func(uid string) {
            user := fetchUser(uid)   // runs concurrently
            results <- user
        }(id)
    }

    users := make([]User, 0, len(userIDs))
    for range userIDs {
        users = append(users, <-results)
    }
    return users
}

// sync.WaitGroup — wait for goroutines to finish
var wg sync.WaitGroup
var mu sync.Mutex
results := make([]int, 0)

for _, n := range numbers {
    wg.Add(1)
    go func(n int) {
        defer wg.Done()
        result := heavyCompute(n)
        mu.Lock()
        results = append(results, result)
        mu.Unlock()
    }(n)
}
wg.Wait()

Deadlock

# Deadlock: Thread A holds lock1 and waits for lock2
#           Thread B holds lock2 and waits for lock1

# Prevent: always acquire locks in the same order
LOCK_ORDER = [lock1, lock2]   # document and enforce acquisition order

# Or use timeout
if lock.acquire(timeout=5):
    try:
        ...
    finally:
        lock.release()
else:
    raise TimeoutError("Could not acquire lock in 5s")

Common Failure Cases

Race condition survives code review and only appears under load
Why: the read-modify-write sequence looks atomic in single-threaded review but the GIL releases between bytecodes, exposing the window at high concurrency.
Detect: counter or balance inconsistency only visible with 10+ concurrent requests; add a stress test with threading.Barrier to synchronise thread starts.
Fix: use threading.Lock around read-modify-write; prefer atomic data structures (queue.Queue) over shared mutable state.

asyncio task swallows exception silently
Why: asyncio.create_task() without storing the result; when the task raises, the exception is attached to the task object and logged as a warning, often missed.
Detect: asyncio: Future exception was never retrieved warning in logs; expected side effect (write, notification) never occurred.
Fix: always store task references; use asyncio.gather(*tasks, return_exceptions=False) or add a done-callback that re-raises.

Deadlock between two locks acquired in different orders
Why: Thread A holds lock1 and waits for lock2; Thread B holds lock2 and waits for lock1; both block forever.
Detect: application freezes under specific concurrency pattern; threading.enumerate() shows both threads in BLOCKED state.
Fix: document and enforce a canonical lock acquisition order across the codebase; acquire sorted([lock1, lock2]) by id.

ProcessPoolExecutor hangs on KeyboardInterrupt
Why: worker processes ignore SIGINT by default; the pool waits for workers to finish, blocking the main process shutdown.
Detect: Ctrl-C does not kill the process; ps aux shows zombie worker processes.
Fix: use executor.shutdown(wait=False, cancel_futures=True) in a finally block; or set initializer to re-enable SIGINT in workers.

Semaphore count leaks, blocking eventually all coroutines
Why: an exception inside async with semaphore: is caught outside the block, but the semaphore was already acquired; if the exception path doesn't release, the count leaks.
Detect: over time, fewer and fewer concurrent operations proceed; semaphore _value drifts below initial count.
Fix: always use async with semaphore: (context manager). It releases on exceptions; never await semaphore.acquire() without a matching release() in a finally block.

Connections

se-hub · cs-fundamentals/distributed-systems · cs-fundamentals/performance-optimisation-se · cs-fundamentals/database-transactions · python/ecosystem · llms/ae-hub

Open Questions

  • What are the most common misapplications of this concept in production codebases?
  • When should you explicitly choose not to use this pattern or technique?