Parallel Test Execution
Running tests concurrently to compress feedback time — without introducing isolation failures.
Running tests concurrently to compress feedback time. Without introducing isolation failures.
Why Parallelism Is Hard
Tests pass in serial but fail in parallel → almost always a shared state problem.
Shared state that causes parallel failures:
- Same database row modified by two tests simultaneously
- Global singleton (cached object, registry, counter) mutated by multiple workers
- Same test user or account used across test files
- Files written to the same path by parallel workers
- Environment variables mutated by one test, read by another
- Fixed ports (server starts on :8000, two workers conflict)
Rule: a test that can't be run twice simultaneously is not actually isolated.
Fixing parallelism means fixing isolation — the parallelism just surfaces the bug.
pytest-xdist
pip install pytest-xdist
# Run with N workers (auto = one per CPU core)
pytest -n auto tests/
pytest -n 4 tests/
pytest -n 4 --dist=loadscope tests/ # group by module (good for shared fixtures)
pytest -n 4 --dist=loadfile tests/ # group by file (simplest isolation)# conftest.py — worker-aware fixtures
import pytest
import os
@pytest.fixture(scope="session")
def worker_id(request) -> str:
"""Returns 'master' (no -n) or 'gw0', 'gw1', ... with xdist."""
return getattr(request.config, "workerinput", {}).get("workerid", "master")
@pytest.fixture(scope="session")
def db_url(worker_id: str) -> str:
"""Each worker gets its own database schema."""
if worker_id == "master":
schema = "test"
else:
schema = f"test_{worker_id}" # e.g. test_gw0, test_gw1
base_url = os.environ["DATABASE_URL"]
return f"{base_url}?options=-c search_path={schema}"
@pytest.fixture(scope="session", autouse=True)
def setup_worker_schema(db_url: str, worker_id: str) -> None:
"""Create and seed schema for this worker, drop on teardown."""
import psycopg2
schema = f"test_{worker_id}" if worker_id != "master" else "test"
conn = psycopg2.connect(os.environ["DATABASE_URL"])
conn.autocommit = True
cur = conn.cursor()
cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
# Run migrations into this schema
run_migrations(schema)
yield
cur.execute(f"DROP SCHEMA {schema} CASCADE")
conn.close()Playwright Sharding
# Split the test suite into N shards — each shard runs independently on its own runner
# Shard 1 of 4
npx playwright test --shard=1/4
# Shard 2 of 4
npx playwright test --shard=2/4# .github/workflows/e2e-sharded.yml
name: E2E Sharded
on: [push]
jobs:
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false # don't cancel sibling shards on first failure
matrix:
shard: [1, 2, 3, 4]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20
- run: npm ci
- run: npx playwright install --with-deps
- name: Run shard ${{ matrix.shard }}
run: npx playwright test --shard=${{ matrix.shard }}/4
- uses: actions/upload-artifact@v4
if: always()
with:
name: blob-report-${{ matrix.shard }}
path: blob-report/
retention-days: 1
merge-reports:
needs: test
if: always()
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20
- run: npm ci
- uses: actions/download-artifact@v4
with:
path: all-blob-reports/
pattern: blob-report-*
merge-multiple: true
- name: Merge reports
run: npx playwright merge-reports --reporter=html ./all-blob-reports
- uses: actions/upload-artifact@v4
with:
name: html-report
path: playwright-report/pytest-xdist with Playwright
# conftest.py — parallel Playwright with isolated browser contexts
import pytest
from playwright.sync_api import sync_playwright, Browser
@pytest.fixture(scope="session")
def browser(worker_id: str):
"""One browser process per xdist worker."""
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
yield browser
browser.close()
@pytest.fixture
def page(browser: Browser, worker_id: str):
"""Fresh context per test — each test gets an isolated storage state."""
context = browser.new_context(
base_url="http://localhost:3000",
# Each worker uses a different port if running local servers
)
page = context.new_page()
yield page
page.close()
context.close()Port Management for Parallel Services
import socket
import pytest
def find_free_port() -> int:
with socket.socket() as s:
s.bind(("", 0))
return s.getsockname()[1]
@pytest.fixture(scope="session")
def app_port(worker_id: str) -> int:
"""Assign a unique port per xdist worker."""
base_port = 8000
if worker_id == "master":
return base_port
# gw0 → 8000, gw1 → 8001, etc.
worker_num = int(worker_id.replace("gw", ""))
return base_port + worker_num
@pytest.fixture(scope="session")
def app_server(app_port: int):
"""Start a test server on a worker-specific port."""
import subprocess
proc = subprocess.Popen(
["uvicorn", "app:app", "--port", str(app_port), "--host", "127.0.0.1"],
env={**os.environ, "DATABASE_URL": db_url_for_worker()},
)
wait_for_port(app_port)
yield f"http://127.0.0.1:{app_port}"
proc.terminate()Measuring Speedup
# Amdahl's Law for test suites:
# Speedup = 1 / (S + (1-S)/N)
# where S = fraction that must run serially, N = number of workers
# Practical measurement
import time, subprocess
def measure_parallel_speedup(test_path: str) -> dict:
results = {}
for workers in [1, 2, 4, 8]:
start = time.time()
subprocess.run(
["pytest", test_path, f"-n={workers}", "-q", "--tb=no"],
capture_output=True,
)
results[workers] = time.time() - start
serial_time = results[1]
return {
n: {"time": t, "speedup": serial_time / t, "efficiency": (serial_time / t) / n}
for n, t in results.items()
}
# Typical results:
# 1 worker: 300s (serial baseline)
# 4 workers: 90s (3.3× speedup, 83% efficiency)
# 8 workers: 55s (5.5× speedup, 69% efficiency — diminishing returns)Common Failure Cases
Tests pass serially but fail under -n auto without a clear reason
Why: a shared global (module-level cache, singleton registry, or environment variable mutation) is written by one worker and read in a corrupt state by another.
Detect: run with -n 2 and --dist=no alternately to identify which tests conflict; add -p no:randomly to eliminate ordering as a factor.
Fix: move shared state into worker-scoped fixtures or use worker_id to namespace all shared resources.
Port collision when multiple workers start the same server
Why: each xdist worker starts the app on port 8000 and only the first worker succeeds; subsequent workers fail immediately with AddressAlreadyInUse.
Detect: all but one worker report a server startup failure; the winning worker's tests pass.
Fix: derive the port from worker_id (e.g., 8000 + worker_num) or use find_free_port() and pass it through fixtures.
Playwright shards produce no merged report when one shard fails
Why: the merge-reports job requires all blob-report-* artifacts to exist; if a shard fails mid-run and uploads no artifact, the merge step errors.
Detect: the merge job fails with "artifact not found" even though other shards completed.
Fix: set if: always() on both the artifact upload step and the merge job, and use continue-on-error: false only on the final gate step.
Session-scoped fixtures run once per worker instead of once globally
Why: xdist workers are separate processes, so scope="session" creates one instance per worker, not one globally — expensive setup (e.g., DB migration) runs N times.
Detect: DB migration logs appear multiple times in CI output; setup time scales linearly with worker count.
Fix: use scope="session" with autouse=True in conftest.py and a file lock (filelock library) to ensure only one worker performs the migration.
Connections
tqa-hub · technical-qa/playwright-advanced · technical-qa/test-architecture · technical-qa/flaky-test-management · qa/continuous-testing · qa/end-to-end-testing
Open Questions
- What is the most common failure mode when implementing this at scale?
- How does this testing approach need to adapt for distributed or microservice architectures?
Related reading