Performance Optimisation
Systematic approach to improving system performance: measure first, optimise the bottleneck, measure again.
Systematic approach to improving system performance: measure first, optimise the bottleneck, measure again. Premature optimisation is the root of all evil, but ignoring performance in production is inexcusable.
The Process
1. Measure — establish a baseline with real workloads (not synthetic)
2. Profile — find the actual bottleneck (not the assumed one)
3. Hypothesise — why is this slow?
4. Optimise — change one thing at a time
5. Measure again — did it help? Did it regress anything else?
6. Repeat
Common wrong assumptions:
- "The loop is slow" → often it's the DB call inside the loop
- "Python is slow" → often it's waiting on I/O
- "The query is slow" → often it's missing an index
- "It's the network" → often it's serialisation or N+1 queries
Python Profiling
# cProfile — function-level profiling
import cProfile
import pstats
import io
def profile(func):
def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
ps.print_stats(20) # top 20 functions by cumulative time
print(s.getvalue())
return result
return wrapper
@profile
def expensive_operation():
...
# line_profiler — line-by-line timing
# pip install line-profiler
@profile # use kernprof -l -v script.py
def process_orders(orders):
results = []
for order in orders: # line timing shown here
total = sum(i.price for i in order.items)
tax = total * 0.2
results.append(total + tax)
return results
# memory_profiler — memory usage per line
from memory_profiler import profile as mem_profile
@mem_profile
def load_large_dataset():
return pd.read_csv("huge_file.csv") # see exactly where memory spikesDatabase Performance
# N+1 detection and fix
# BAD — N+1 query: 1 query for orders + N queries for users
orders = db.query(Order).all()
for order in orders:
print(order.user.name) # each triggers a SELECT
# GOOD — eager loading with JOIN
orders = db.query(Order).options(joinedload(Order.user)).all()
for order in orders:
print(order.user.name) # no additional queries
# GOOD — batch load for large sets
orders = db.query(Order).options(subqueryload(Order.items)).all()
# Explain query plans
result = db.execute(text("EXPLAIN (ANALYZE, BUFFERS) SELECT ..."))
for row in result:
print(row[0]) # look for "Seq Scan" on large tables — add index
# Index analysis — find slow queries
# In postgres: pg_stat_statements, auto_explain
# AWS RDS: Performance Insights-- Add index for common filter patterns
CREATE INDEX CONCURRENTLY idx_orders_user_status
ON orders(user_id, status)
WHERE status != 'completed'; -- partial index — only index active orders
-- Covering index — avoid table lookup entirely
CREATE INDEX CONCURRENTLY idx_products_category_name_price
ON products(category, name, price);
-- SELECT name, price FROM products WHERE category = 'x' ORDER BY name
-- → Index-Only Scan: no heap accessAsync I/O for Throughput
import asyncio
import httpx
# SLOW — sequential requests (10 requests × 200ms each = 2s)
async def fetch_all_sequential(product_ids: list[str]) -> list[dict]:
async with httpx.AsyncClient() as client:
results = []
for pid in product_ids:
resp = await client.get(f"/api/products/{pid}")
results.append(resp.json())
return results
# FAST — concurrent requests (10 requests × 200ms max = 200ms)
async def fetch_all_concurrent(product_ids: list[str]) -> list[dict]:
async with httpx.AsyncClient() as client:
tasks = [client.get(f"/api/products/{pid}") for pid in product_ids]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return [r.json() for r in responses if not isinstance(r, Exception)]
# With rate limiting — don't hammer downstream
async def fetch_with_limit(product_ids: list[str], max_concurrent: int = 10) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient() as client:
async def fetch_one(pid):
async with semaphore:
resp = await client.get(f"/api/products/{pid}")
return resp.json()
return await asyncio.gather(*[fetch_one(pid) for pid in product_ids])Caching for Latency
from functools import lru_cache
import time
# In-process cache — fastest (no network hop)
@lru_cache(maxsize=1000)
def get_exchange_rate(currency_pair: str) -> float:
return external_api.get_rate(currency_pair)
# TTL-aware cache
def ttl_cache(ttl_seconds: int):
cache = {}
def decorator(func):
def wrapper(*args):
key = args
if key in cache:
value, expires_at = cache[key]
if time.time() < expires_at:
return value
result = func(*args)
cache[key] = (result, time.time() + ttl_seconds)
return result
return wrapper
return decorator
@ttl_cache(ttl_seconds=60)
def get_product_category_tree() -> dict:
return db.query_category_tree() # expensive; fine to cache 60sResponse Payload Optimisation
# Return only requested fields (reduce bandwidth + serialisation cost)
@app.get("/api/products")
def list_products(fields: str = None):
products = db.query(Product).all()
if fields:
allowed = {"id", "name", "price", "category"}
requested = set(fields.split(",")) & allowed
return [
{k: getattr(p, k) for k in requested}
for p in products
]
return products
# Streaming large responses — don't buffer in memory
from fastapi.responses import StreamingResponse
import json
@app.get("/api/reports/orders")
async def stream_orders():
async def generate():
yield "["
first = True
async for order in db.stream_orders():
if not first:
yield ","
yield json.dumps(order.dict())
first = False
yield "]"
return StreamingResponse(generate(), media_type="application/json")Benchmarking
# pytest-benchmark
def test_order_total_calculation_performance(benchmark):
order = Order(items=[OrderItem(price=9.99, quantity=i) for i in range(100)])
result = benchmark(order.calculate_total)
assert result > 0
# benchmark.pedantic for more control
def test_db_query_performance(benchmark, db_session):
result = benchmark.pedantic(
target=lambda: db_session.query(Product).filter_by(category="electronics").all(),
rounds=10,
iterations=5,
)
assert len(result) > 0Common Failure Cases
N+1 query discovered in production, not in dev
Why: dev has 10 rows; ORM's lazy loading fires one query per row; in production with 10,000 rows it fires 10,000 queries.
Detect: EXPLAIN (ANALYZE) on the endpoint shows hundreds of identical SELECT statements; SQLAlchemy's echo=True in staging reveals it.
Fix: add joinedload or subqueryload; always test with production-scale data in staging; add a query count assertion in integration tests.
Premature caching hides a correctness bug
Why: stale cached value masks a write that should have updated the data; the bug only shows after cache expiry.
Detect: data inconsistency appears intermittently, correlated with TTL intervals; disabling the cache makes the bug reproduce consistently.
Fix: fix the underlying data consistency issue first; then re-enable caching with explicit invalidation on write.
Benchmark measures the wrong thing — cold vs warm JIT
Why: the first pytest-benchmark iteration runs Python bytecode unoptimised; reported mean includes the cold-start outlier.
Detect: benchmark min is far lower than mean; first iteration is 10x slower than the rest in --benchmark-verbose output.
Fix: use benchmark.pedantic with warmup_rounds=3 to discard cold-start iterations before measuring.
Async I/O gains eliminated by a blocking call inside the coroutine
Why: one synchronous blocking call (time.sleep, requests.get, open(...)) inside an async def blocks the entire event loop for its duration.
Detect: event loop lag metric spikes; all other coroutines stall during the blocking call; asyncio debug mode logs the slow callback.
Fix: replace blocking calls with async equivalents (asyncio.sleep, httpx.AsyncClient, aiofiles); use loop.run_in_executor for unavoidable blocking calls.
Partial index not used because query doesn't match the WHERE clause
Why: the partial index was created with WHERE status != 'completed' but the query filters WHERE status = 'pending'; planner chooses a seq scan.
Detect: EXPLAIN shows Seq Scan despite an index existing; pg_stat_user_indexes shows zero scans on the index.
Fix: align the partial index predicate with the actual query filter; or drop it and create a full index on status.
Connections
se-hub · cs-fundamentals/caching-strategies · cs-fundamentals/database-design · cs-fundamentals/concurrency · cs-fundamentals/observability-se · technical-qa/load-testing-advanced
Open Questions
- What are the most common misapplications of this concept in production codebases?
- When should you explicitly choose not to use this pattern or technique?
Related reading