Message Queues

Asynchronous communication between services. Queues decouple producers from consumers: the producer doesn't wait for the consumer; the consumer processes at its own pace.

Asynchronous communication between services. Queues decouple producers from consumers: the producer doesn't wait for the consumer; the consumer processes at its own pace.


Queues vs Pub/Sub vs Streaming

Queue (point-to-point):
  One message → one consumer (competing consumers)
  Consumer acknowledges → message deleted
  Use: task distribution, work queues, background jobs
  Tools: SQS, RabbitMQ queues

Pub/Sub (one-to-many):
  One message → many consumers (each gets their own copy)
  Publisher doesn't know consumers
  Messages often ephemeral (no long-term storage)
  Use: broadcast notifications, event fan-out
  Tools: SNS, RabbitMQ exchanges (fanout), Google Pub/Sub

Streaming (ordered log):
  Messages retained on disk; consumers track their position
  Consumers can replay from any offset
  Multiple consumer groups, each with their own position
  Use: event sourcing, audit logs, analytics, high-throughput processing
  Tools: Kafka, Kinesis, Pulsar

RabbitMQ — Python (pika)

# producer.py
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()

# Declare exchange and queue
channel.exchange_declare(exchange="orders", exchange_type="direct", durable=True)
channel.queue_declare(queue="order_processing", durable=True)
channel.queue_bind(exchange="orders", queue="order_processing", routing_key="new")

# Publish with persistent delivery mode (survives broker restart)
channel.basic_publish(
    exchange="orders",
    routing_key="new",
    body=json.dumps({"order_id": "ord-123", "user_id": "u-001", "total": 49.99}),
    properties=pika.BasicProperties(
        delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE,
        content_type="application/json",
    )
)
connection.close()

# consumer.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()

channel.basic_qos(prefetch_count=1)  # one message at a time per consumer

def process_order(ch, method, properties, body):
    import json
    order = json.loads(body)
    try:
        fulfill_order(order)
        ch.basic_ack(delivery_tag=method.delivery_tag)  # success: remove from queue
    except Exception as e:
        print(f"Failed to process order: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)  # dead-letter

channel.basic_consume(queue="order_processing", on_message_callback=process_order)
channel.start_consuming()

Dead Letter Queue Pattern

# RabbitMQ — route failed messages to DLQ automatically
channel.queue_declare(
    queue="order_processing",
    durable=True,
    arguments={
        "x-dead-letter-exchange": "orders.dlx",   # failed messages go here
        "x-message-ttl": 30000,                   # messages expire after 30s if not ACKed
        "x-max-retries": 3,                       # custom header for retry counting
    }
)

# SQS — built-in DLQ support
# In Terraform:
resource "aws_sqs_queue" "dlq" {
  name = "order-processing-dlq"
  message_retention_seconds = 1209600  # 14 days
}

resource "aws_sqs_queue" "main" {
  name = "order-processing"
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dlq.arn
    maxReceiveCount     = 3   # after 3 failed attempts → DLQ
  })
}

AWS SQS — Python (boto3)

import boto3
import json
import time

sqs = boto3.client("sqs", region_name="eu-west-1")
QUEUE_URL = "https://sqs.eu-west-1.amazonaws.com/123456789/order-processing"

# Producer
def enqueue_order(order_id: str, total: float):
    sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps({"order_id": order_id, "total": total}),
        MessageGroupId="orders",         # FIFO queue group
        MessageDeduplicationId=order_id,  # prevent duplicates
    )

# Consumer — long polling for efficiency
def consume_orders():
    while True:
        response = sqs.receive_message(
            QueueUrl=QUEUE_URL,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20,          # long poll: wait up to 20s for messages
            VisibilityTimeout=60,        # process within 60s or message reappears
        )

        messages = response.get("Messages", [])
        for message in messages:
            body = json.loads(message["Body"])
            try:
                process_order(body)
                sqs.delete_message(
                    QueueUrl=QUEUE_URL,
                    ReceiptHandle=message["ReceiptHandle"],
                )
            except Exception as e:
                print(f"Failed: {e}")
                # Don't delete — message will reappear after VisibilityTimeout
                # After maxReceiveCount failures → moves to DLQ

Kafka — Python (confluent-kafka)

# producer.py
from confluent_kafka import Producer

producer = Producer({"bootstrap.servers": "kafka:9092"})

def delivery_report(err, msg):
    if err:
        raise RuntimeError(f"Message delivery failed: {err}")

producer.produce(
    topic="orders",
    key="order-123",                    # same key → same partition → ordering
    value='{"order_id": "123"}',
    callback=delivery_report,
)
producer.flush()  # wait for all messages to be delivered

# consumer.py — multiple consumer groups can independently read the same topic
from confluent_kafka import Consumer

consumer = Consumer({
    "bootstrap.servers": "kafka:9092",
    "group.id": "fulfilment-service",
    "auto.offset.reset": "earliest",    # start from beginning if no prior offset
    "enable.auto.commit": False,        # manual commit = at-least-once processing
})

consumer.subscribe(["orders"])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue

        process_order(msg.value())
        consumer.commit(msg)            # commit offset after successful processing
finally:
    consumer.close()

Choosing the Right Tool

SQS:       Simple, managed, pay-per-message. Good for AWS-native apps, task queues.
           Max retention 14 days. No ordering guarantee (FIFO queue adds it).

RabbitMQ:  Flexible routing (exchanges/bindings), acknowledgements, plugins.
           Good for complex routing rules, when you need message priorities.
           Requires infrastructure management.

Kafka:     Ordered, replayable log. High throughput (millions/sec).
           Consumer groups can process the same stream independently.
           Good for event sourcing, analytics pipelines, audit logs.
           Retention configurable (days to forever).

SNS+SQS:  Fan-out: SNS publishes → multiple SQS queues subscribe independently.
           Classic pattern for event-driven microservices in AWS.

Common Failure Cases

Message deleted before processing is complete (at-most-once loss) Why: SQS's delete_message is called immediately on receipt, or RabbitMQ's basic_ack fires before the handler returns; a crash after deletion but before the work finishes loses the message permanently. Detect: job counts in the DB are lower than enqueue counts; the discrepancy grows over time. Fix: delete/ack only after the handler returns successfully; let the visibility timeout expire on failure so the message reappears for retry.

Visibility timeout shorter than processing time causes duplicate delivery Why: SQS re-enqueues any message not deleted within its VisibilityTimeout; if processing takes longer than the timeout, the same message is received and processed by a second consumer concurrently. Detect: duplicate rows or double charges correlate with long-running jobs; both consumers log successful processing of the same message_id. Fix: set VisibilityTimeout to at least 6x the expected processing time; extend it programmatically (change_message_visibility) for jobs with unpredictable duration.

No DLQ configured — poison messages loop forever Why: a malformed or permanently-failing message that cannot be processed keeps cycling back to the queue and consuming retries, blocking throughput for other messages. Detect: the same message ID appears in logs repeatedly for hours; consumer retry counter resets instead of giving up. Fix: configure a dead-letter queue with maxReceiveCount=3 so persistently-failing messages are quarantined for investigation rather than retried indefinitely.

Kafka consumer offset committed during rebalance, skipping messages Why: if an offset is committed for a partition that gets reassigned to another consumer during a rebalance, the new consumer starts after the committed offset, skipping any unprocessed messages. Detect: consumer lag drops to zero but some expected records are missing from the DB. Fix: commit offsets only after the handler returns; use the cooperative rebalance strategy so fewer partitions are revoked per rebalance cycle.

Connections

se-hub · cs-fundamentals/event-driven-architecture · cs-fundamentals/distributed-systems · cloud/aws-sqs-sns · cs-fundamentals/microservices-patterns · llms/ae-hub

Open Questions

  • What are the most common misapplications of this concept in production codebases?
  • When should you explicitly choose not to use this pattern or technique?