Event-Driven Architecture
Systems that communicate by producing and consuming events rather than direct calls. Temporal decoupling: producer doesn't wait for consumer; spatial decoupling: producer doesn't know who consumes.
Systems that communicate by producing and consuming events rather than direct calls. Temporal decoupling: producer doesn't wait for consumer; spatial decoupling: producer doesn't know who consumes.
Core Patterns
Event notification:
Something happened. Consumer decides what to do. No data in event.
Example: OrderPlaced { order_id: "abc" }
Consumer must call back to get order details → tight coupling risk
Event-carried state transfer:
Event contains all data consumers need. No callbacks.
Example: OrderPlaced { order_id: "abc", user_id: "u1", items: [...], total: 99.99 }
Larger events, but fully decoupled consumption
Event sourcing:
State is derived by replaying events. Current state = fold over event log.
Append-only event store. No UPDATE/DELETE. Complete audit trail.
CQRS + event sourcing:
Write side: commands → events → event store
Read side: projections built by consuming the event stream
Event Schema Design
# events/models.py
from dataclasses import dataclass
from datetime import datetime
from typing import Any
import uuid
@dataclass
class Event:
event_id: str
event_type: str
aggregate_id: str
aggregate_type: str
version: int
timestamp: datetime
payload: dict[str, Any]
metadata: dict[str, Any] # correlation_id, causation_id, user_id
def make_event(event_type: str, aggregate_id: str, aggregate_type: str,
version: int, payload: dict) -> Event:
return Event(
event_id=str(uuid.uuid4()),
event_type=event_type,
aggregate_id=aggregate_id,
aggregate_type=aggregate_type,
version=version,
timestamp=datetime.utcnow(),
payload=payload,
metadata={},
)
# Example events
ORDER_PLACED = "order.placed"
ORDER_PAYMENT_RECEIVED = "order.payment_received"
ORDER_FULFILLMENT_STARTED = "order.fulfillment_started"
ORDER_SHIPPED = "order.shipped"
ORDER_CANCELLED = "order.cancelled"Kafka Producer and Consumer (Python)
# events/producer.py
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
class EventPublisher:
def __init__(self, bootstrap_servers: str):
self.producer = Producer({"bootstrap.servers": bootstrap_servers})
def publish(self, topic: str, event: Event, key: str = None):
self.producer.produce(
topic=topic,
key=(key or event.aggregate_id).encode(),
value=json.dumps({
"event_id": event.event_id,
"event_type": event.event_type,
"aggregate_id": event.aggregate_id,
"version": event.version,
"timestamp": event.timestamp.isoformat(),
"payload": event.payload,
}),
headers={"correlation-id": event.metadata.get("correlation_id", "")},
callback=self._delivery_report,
)
self.producer.flush()
def _delivery_report(self, err, msg):
if err:
raise RuntimeError(f"Event delivery failed: {err}")
# events/consumer.py
from confluent_kafka import Consumer, KafkaError
class EventConsumer:
def __init__(self, bootstrap_servers: str, group_id: str, topics: list[str]):
self.consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # manual commit after processing
})
self.consumer.subscribe(topics)
self.handlers: dict[str, callable] = {}
def register_handler(self, event_type: str, handler: callable):
self.handlers[event_type] = handler
def run(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise KafkaException(msg.error())
event = json.loads(msg.value())
handler = self.handlers.get(event["event_type"])
if handler:
handler(event)
self.consumer.commit(msg) # commit after successful processingEvent Sourcing — Aggregate
# domain/order.py
from dataclasses import dataclass, field
from typing import ClassVar
@dataclass
class Order:
id: str
status: str = "pending"
items: list = field(default_factory=list)
total: float = 0.0
version: int = 0
_uncommitted_events: list = field(default_factory=list, repr=False)
# Event sourcing: apply events to rebuild state
def apply(self, event: dict):
handlers = {
"order.placed": self._on_placed,
"order.payment_received": self._on_payment_received,
"order.cancelled": self._on_cancelled,
}
handler = handlers.get(event["event_type"])
if handler:
handler(event["payload"])
self.version += 1
def _on_placed(self, payload):
self.items = payload["items"]
self.total = payload["total"]
self.status = "pending"
def _on_payment_received(self, payload):
self.status = "paid"
def _on_cancelled(self, payload):
self.status = "cancelled"
# Command handlers — validate, then raise events
def place(self, items: list, total: float):
if self.version > 0:
raise ValueError("Order already exists")
event = make_event("order.placed", self.id, "order", self.version + 1,
{"items": items, "total": total})
self.apply(event)
self._uncommitted_events.append(event)
def cancel(self, reason: str):
if self.status not in ("pending", "paid"):
raise ValueError(f"Cannot cancel order in status {self.status}")
event = make_event("order.cancelled", self.id, "order", self.version + 1,
{"reason": reason})
self.apply(event)
self._uncommitted_events.append(event)
@classmethod
def reconstitute(cls, events: list) -> "Order":
order = cls(id=events[0]["aggregate_id"])
for event in events:
order.apply(event)
return orderOutbox Pattern — Guaranteed Delivery
# Publish events atomically with DB changes — no lost events on crash
def place_order(order_data: dict, db_session):
with db_session.begin():
# Write business data
order = Order(**order_data)
db_session.add(order)
# Write event to outbox in same transaction
outbox_event = OutboxEvent(
event_type="order.placed",
aggregate_id=str(order.id),
payload=json.dumps({"order_id": str(order.id), "total": order.total}),
)
db_session.add(outbox_event)
# Transaction committed atomically — DB has both order AND outbox entry
# Separate outbox relay process
async def outbox_relay():
while True:
events = db.query(OutboxEvent).filter_by(published=False).limit(100).all()
for event in events:
publisher.publish("orders", event)
event.published = True
db.commit()
await asyncio.sleep(0.1)Dead Letter Queue Handling
# Consumer with DLQ fallback
def process_with_dlq(event: dict, max_retries: int = 3):
for attempt in range(max_retries):
try:
process_event(event)
return
except Exception as e:
if attempt == max_retries - 1:
dlq_publisher.publish("orders.dlq", {**event, "error": str(e)})
logger.error(f"Event sent to DLQ after {max_retries} attempts", exc_info=True)
else:
time.sleep(2 ** attempt) # exponential backoffCommon Failure Cases
Lost events on service crash (no outbox pattern)
Why: the service writes to the database and then publishes to the broker as two separate operations; a crash between them leaves the DB updated but the event never sent.
Detect: downstream consumers show gaps in event sequences; data in the DB doesn't match what consumers have processed.
Fix: use the outbox pattern — write the event to an outbox table in the same DB transaction, then relay it to the broker asynchronously.
Consumer commits offset before processing succeeds
Why: auto-commit (or committing immediately on receipt) marks the message as processed even if the handler throws an exception, causing silent data loss.
Detect: events appear consumed in Kafka/SQS but the expected side effects (DB writes, emails) are missing.
Fix: set enable.auto.commit=False and commit only after the handler returns successfully; route failures to the DLQ after N retries.
Missing idempotency key causes duplicate processing
Why: at-least-once delivery means the same event can arrive twice (network retry, requeue after visibility timeout); if the handler is not idempotent, it processes the duplicate.
Detect: double charges, duplicate email sends, or inflated row counts that correlate with retry events.
Fix: store the event_id in a processed-events table and skip events whose ID has already been seen.
Event schema change breaks consumers
Why: a producer adds a required field or renames one without coordinating with consumers, causing deserialization errors across all downstream services.
Detect: consumers start throwing KeyError / deserialization exceptions after a producer deploy.
Fix: use Schema Registry (Avro / Protobuf) with compatibility enforcement set to BACKWARD; never rename or remove required fields without a deprecation cycle.
Consumer group rebalance causes processing stalls
Why: adding or removing consumers in a Kafka consumer group triggers a rebalance; during rebalance, no partition is being consumed, causing visible lag spikes.
Detect: consumer lag metric spikes periodically in Grafana, correlating with deployment events.
Fix: use cooperative (incremental) rebalancing (partition.assignment.strategy=cooperative-sticky) to reduce stop-the-world pauses.
Connections
se-hub · cs-fundamentals/microservices-patterns · cs-fundamentals/distributed-systems · cloud/aws-sqs-sns · cloud/aws-step-functions · llms/ae-hub
Open Questions
- What are the most common misapplications of this concept in production codebases?
- When should you explicitly choose not to use this pattern or technique?
Related reading