DeployU
Interviews / Cloud & DevOps / Pub/Sub messages are processed out of order and some are lost. Implement reliable messaging.

Pub/Sub messages are processed out of order and some are lost. Implement reliable messaging.

practical Messaging Interactive Quiz Code Examples

The Scenario

Your order processing system uses Pub/Sub but has issues:

Publisher: Order created → Order updated → Order shipped
Subscriber receives: Order shipped → Order created → Order updated

Issues observed:
- Messages processed out of order
- Some messages processed twice (duplicate charges!)
- Messages occasionally lost (customers never notified)
- Dead letter queue growing with failed messages

The Challenge

Implement reliable Pub/Sub messaging with ordering guarantees, exactly-once processing, and proper error handling.

Wrong Approach

A junior engineer might add timestamps and sort messages in the subscriber, increase acknowledgment deadline indefinitely, disable retries to prevent duplicates, or use a single subscriber to force ordering. These approaches are complex and error-prone, cause message loss, or don't scale.

Right Approach

A senior engineer uses Pub/Sub ordering keys for related messages, implements idempotent processing with deduplication, configures appropriate acknowledgment deadlines and retry policies, and sets up dead letter topics with alerting for failed messages.

Step 1: Enable Message Ordering

# Create topic with message ordering enabled
gcloud pubsub topics create orders \
  --message-ordering

# Create subscription with ordering
gcloud pubsub subscriptions create orders-processor \
  --topic=orders \
  --enable-message-ordering \
  --ack-deadline=60
# Publisher: Use ordering key for related messages
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('project', 'orders')

def publish_order_event(order_id: str, event_type: str, data: dict):
    """Publish order event with ordering guarantee."""
    message_data = json.dumps({
        'order_id': order_id,
        'event_type': event_type,
        'data': data,
        'timestamp': datetime.utcnow().isoformat()
    }).encode('utf-8')

    # ordering_key ensures messages with same key are delivered in order
    future = publisher.publish(
        topic_path,
        message_data,
        ordering_key=order_id  # All events for same order are ordered
    )

    return future.result()

# Usage:
publish_order_event('order-123', 'created', {'items': [...]})
publish_order_event('order-123', 'updated', {'status': 'paid'})
publish_order_event('order-123', 'shipped', {'tracking': 'UPS123'})
# These will be delivered in order!

Step 2: Implement Idempotent Processing

import hashlib
from google.cloud import firestore

db = firestore.Client()

def process_message_idempotently(message):
    """Process message exactly once using deduplication."""

    # Generate unique message ID
    message_id = message.message_id
    # Or create your own: hashlib.sha256(message.data).hexdigest()

    # Check if already processed
    doc_ref = db.collection('processed_messages').document(message_id)

    @firestore.transactional
    def process_in_transaction(transaction):
        doc = doc_ref.get(transaction=transaction)

        if doc.exists:
            print(f"Message {message_id} already processed, skipping")
            return False

        # Process the message
        data = json.loads(message.data.decode('utf-8'))
        result = process_order_event(data)

        # Mark as processed
        transaction.set(doc_ref, {
            'processed_at': firestore.SERVER_TIMESTAMP,
            'result': result
        })

        return True

    transaction = db.transaction()
    return process_in_transaction(transaction)


def callback(message):
    try:
        if process_message_idempotently(message):
            message.ack()
        else:
            message.ack()  # Already processed, still ack
    except Exception as e:
        print(f"Error processing message: {e}")
        message.nack()  # Will be redelivered

Step 3: Configure Dead Letter Topic

# Terraform configuration
resource "google_pubsub_topic" "orders" {
  name = "orders"
}

resource "google_pubsub_topic" "orders_dead_letter" {
  name = "orders-dead-letter"
}

resource "google_pubsub_subscription" "orders_processor" {
  name  = "orders-processor"
  topic = google_pubsub_topic.orders.name

  # Enable message ordering
  enable_message_ordering = true

  # Acknowledgment deadline
  ack_deadline_seconds = 60

  # Retry policy
  retry_policy {
    minimum_backoff = "10s"
    maximum_backoff = "600s"  # 10 minutes max
  }

  # Dead letter policy
  dead_letter_policy {
    dead_letter_topic     = google_pubsub_topic.orders_dead_letter.id
    max_delivery_attempts = 5
  }

  # Message retention
  message_retention_duration = "604800s"  # 7 days

  # Expiration (never expire)
  expiration_policy {
    ttl = ""
  }
}

# Grant Pub/Sub permission to publish to dead letter topic
resource "google_pubsub_topic_iam_member" "dead_letter_publisher" {
  topic  = google_pubsub_topic.orders_dead_letter.name
  role   = "roles/pubsub.publisher"
  member = "serviceAccount:service-${var.project_number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}

# Subscription to process dead letters
resource "google_pubsub_subscription" "dead_letter_processor" {
  name  = "orders-dead-letter-processor"
  topic = google_pubsub_topic.orders_dead_letter.name

  # Keep dead letters longer for investigation
  message_retention_duration = "2592000s"  # 30 days
}

Step 4: Implement Robust Subscriber

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
import signal
import sys

class OrderProcessor:
    def __init__(self):
        self.subscriber = pubsub_v1.SubscriberClient()
        self.subscription_path = self.subscriber.subscription_path(
            'project', 'orders-processor'
        )
        self.streaming_pull_future = None

    def callback(self, message):
        """Process message with proper error handling."""
        try:
            # Extend ack deadline for long processing
            message.modify_ack_deadline(60)

            data = json.loads(message.data.decode('utf-8'))
            order_id = message.ordering_key

            print(f"Processing {data['event_type']} for order {order_id}")

            # Process with idempotency
            if self.process_idempotently(message.message_id, data):
                message.ack()
                print(f"Successfully processed message {message.message_id}")
            else:
                message.ack()  # Already processed
                print(f"Duplicate message {message.message_id}, skipped")

        except json.JSONDecodeError as e:
            # Invalid message format - send to dead letter
            print(f"Invalid JSON, sending to dead letter: {e}")
            message.nack()

        except TransientError as e:
            # Temporary failure - retry
            print(f"Transient error, will retry: {e}")
            message.nack()

        except PermanentError as e:
            # Permanent failure - don't retry
            print(f"Permanent error, sending to dead letter: {e}")
            message.ack()  # Ack to prevent infinite retries
            self.publish_to_dead_letter(message, str(e))

    def start(self):
        """Start the subscriber with flow control."""
        flow_control = pubsub_v1.types.FlowControl(
            max_messages=100,        # Max outstanding messages
            max_bytes=10 * 1024 * 1024,  # 10 MB
        )

        self.streaming_pull_future = self.subscriber.subscribe(
            self.subscription_path,
            callback=self.callback,
            flow_control=flow_control,
        )

        print(f"Listening on {self.subscription_path}")

        # Handle shutdown gracefully
        def shutdown(signum, frame):
            print("Shutting down...")
            self.streaming_pull_future.cancel()
            sys.exit(0)

        signal.signal(signal.SIGTERM, shutdown)
        signal.signal(signal.SIGINT, shutdown)

        try:
            self.streaming_pull_future.result()
        except TimeoutError:
            self.streaming_pull_future.cancel()
            self.streaming_pull_future.result()

if __name__ == "__main__":
    processor = OrderProcessor()
    processor.start()

Step 5: Monitor and Alert

# Alert on dead letter queue growth
resource "google_monitoring_alert_policy" "dead_letter_alert" {
  display_name = "Pub/Sub Dead Letter Queue Growing"

  conditions {
    display_name = "Dead letter messages > 100"

    condition_threshold {
      filter = <<-EOT
        resource.type="pubsub_subscription" AND
        resource.labels.subscription_id="orders-dead-letter-processor" AND
        metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages"
      EOT

      comparison      = "COMPARISON_GT"
      threshold_value = 100
      duration        = "300s"

      aggregations {
        alignment_period   = "60s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }

  notification_channels = [google_monitoring_notification_channel.email.id]
}

# Alert on oldest unacked message
resource "google_monitoring_alert_policy" "message_age_alert" {
  display_name = "Pub/Sub Messages Backing Up"

  conditions {
    display_name = "Oldest message > 5 minutes"

    condition_threshold {
      filter = <<-EOT
        resource.type="pubsub_subscription" AND
        resource.labels.subscription_id="orders-processor" AND
        metric.type="pubsub.googleapis.com/subscription/oldest_unacked_message_age"
      EOT

      comparison      = "COMPARISON_GT"
      threshold_value = 300  # 5 minutes
      duration        = "60s"
    }
  }
}

Step 6: Handle Ordering Key Failures

from google.api_core.exceptions import FailedPrecondition

def publish_with_ordering_recovery(publisher, topic, order_id, data):
    """Publish with automatic recovery from ordering failures."""
    try:
        future = publisher.publish(
            topic,
            data.encode('utf-8'),
            ordering_key=order_id
        )
        return future.result(timeout=30)

    except FailedPrecondition as e:
        # Ordering key is paused due to previous failure
        print(f"Resuming ordering key {order_id}")
        publisher.resume_publish(topic, order_id)

        # Retry the publish
        future = publisher.publish(
            topic,
            data.encode('utf-8'),
            ordering_key=order_id
        )
        return future.result(timeout=30)

Message Delivery Guarantees

ConfigurationOrderingDuplicatesUse Case
DefaultNo guaranteePossibleIndependent events
Ordering keyWithin keyPossibleRelated events
+ IdempotencyWithin keyPreventedFinancial transactions
+ Exactly-onceWithin keyPrevented (native)Critical processing

Pub/Sub Best Practices

  1. Use ordering keys for related messages (same order, user, etc.)
  2. Implement idempotent processing - assume duplicates will happen
  3. Configure dead letter topics - don’t lose failed messages
  4. Set appropriate ack deadlines - match your processing time
  5. Use flow control - prevent subscriber overload

Practice Question

Why does enabling message ordering in Pub/Sub only guarantee order for messages with the same ordering key?