Questions
Pub/Sub messages are processed out of order and some are lost. Implement reliable messaging.
The Scenario
Your order processing system uses Pub/Sub but has issues:
Publisher: Order created → Order updated → Order shipped
Subscriber receives: Order shipped → Order created → Order updated
Issues observed:
- Messages processed out of order
- Some messages processed twice (duplicate charges!)
- Messages occasionally lost (customers never notified)
- Dead letter queue growing with failed messages
The Challenge
Implement reliable Pub/Sub messaging with ordering guarantees, exactly-once processing, and proper error handling.
A junior engineer might add timestamps and sort messages in the subscriber, increase acknowledgment deadline indefinitely, disable retries to prevent duplicates, or use a single subscriber to force ordering. These approaches are complex and error-prone, cause message loss, or don't scale.
A senior engineer uses Pub/Sub ordering keys for related messages, implements idempotent processing with deduplication, configures appropriate acknowledgment deadlines and retry policies, and sets up dead letter topics with alerting for failed messages.
Step 1: Enable Message Ordering
# Create topic with message ordering enabled
gcloud pubsub topics create orders \
--message-ordering
# Create subscription with ordering
gcloud pubsub subscriptions create orders-processor \
--topic=orders \
--enable-message-ordering \
--ack-deadline=60# Publisher: Use ordering key for related messages
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('project', 'orders')
def publish_order_event(order_id: str, event_type: str, data: dict):
"""Publish order event with ordering guarantee."""
message_data = json.dumps({
'order_id': order_id,
'event_type': event_type,
'data': data,
'timestamp': datetime.utcnow().isoformat()
}).encode('utf-8')
# ordering_key ensures messages with same key are delivered in order
future = publisher.publish(
topic_path,
message_data,
ordering_key=order_id # All events for same order are ordered
)
return future.result()
# Usage:
publish_order_event('order-123', 'created', {'items': [...]})
publish_order_event('order-123', 'updated', {'status': 'paid'})
publish_order_event('order-123', 'shipped', {'tracking': 'UPS123'})
# These will be delivered in order!Step 2: Implement Idempotent Processing
import hashlib
from google.cloud import firestore
db = firestore.Client()
def process_message_idempotently(message):
"""Process message exactly once using deduplication."""
# Generate unique message ID
message_id = message.message_id
# Or create your own: hashlib.sha256(message.data).hexdigest()
# Check if already processed
doc_ref = db.collection('processed_messages').document(message_id)
@firestore.transactional
def process_in_transaction(transaction):
doc = doc_ref.get(transaction=transaction)
if doc.exists:
print(f"Message {message_id} already processed, skipping")
return False
# Process the message
data = json.loads(message.data.decode('utf-8'))
result = process_order_event(data)
# Mark as processed
transaction.set(doc_ref, {
'processed_at': firestore.SERVER_TIMESTAMP,
'result': result
})
return True
transaction = db.transaction()
return process_in_transaction(transaction)
def callback(message):
try:
if process_message_idempotently(message):
message.ack()
else:
message.ack() # Already processed, still ack
except Exception as e:
print(f"Error processing message: {e}")
message.nack() # Will be redeliveredStep 3: Configure Dead Letter Topic
# Terraform configuration
resource "google_pubsub_topic" "orders" {
name = "orders"
}
resource "google_pubsub_topic" "orders_dead_letter" {
name = "orders-dead-letter"
}
resource "google_pubsub_subscription" "orders_processor" {
name = "orders-processor"
topic = google_pubsub_topic.orders.name
# Enable message ordering
enable_message_ordering = true
# Acknowledgment deadline
ack_deadline_seconds = 60
# Retry policy
retry_policy {
minimum_backoff = "10s"
maximum_backoff = "600s" # 10 minutes max
}
# Dead letter policy
dead_letter_policy {
dead_letter_topic = google_pubsub_topic.orders_dead_letter.id
max_delivery_attempts = 5
}
# Message retention
message_retention_duration = "604800s" # 7 days
# Expiration (never expire)
expiration_policy {
ttl = ""
}
}
# Grant Pub/Sub permission to publish to dead letter topic
resource "google_pubsub_topic_iam_member" "dead_letter_publisher" {
topic = google_pubsub_topic.orders_dead_letter.name
role = "roles/pubsub.publisher"
member = "serviceAccount:service-${var.project_number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}
# Subscription to process dead letters
resource "google_pubsub_subscription" "dead_letter_processor" {
name = "orders-dead-letter-processor"
topic = google_pubsub_topic.orders_dead_letter.name
# Keep dead letters longer for investigation
message_retention_duration = "2592000s" # 30 days
}Step 4: Implement Robust Subscriber
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
import signal
import sys
class OrderProcessor:
def __init__(self):
self.subscriber = pubsub_v1.SubscriberClient()
self.subscription_path = self.subscriber.subscription_path(
'project', 'orders-processor'
)
self.streaming_pull_future = None
def callback(self, message):
"""Process message with proper error handling."""
try:
# Extend ack deadline for long processing
message.modify_ack_deadline(60)
data = json.loads(message.data.decode('utf-8'))
order_id = message.ordering_key
print(f"Processing {data['event_type']} for order {order_id}")
# Process with idempotency
if self.process_idempotently(message.message_id, data):
message.ack()
print(f"Successfully processed message {message.message_id}")
else:
message.ack() # Already processed
print(f"Duplicate message {message.message_id}, skipped")
except json.JSONDecodeError as e:
# Invalid message format - send to dead letter
print(f"Invalid JSON, sending to dead letter: {e}")
message.nack()
except TransientError as e:
# Temporary failure - retry
print(f"Transient error, will retry: {e}")
message.nack()
except PermanentError as e:
# Permanent failure - don't retry
print(f"Permanent error, sending to dead letter: {e}")
message.ack() # Ack to prevent infinite retries
self.publish_to_dead_letter(message, str(e))
def start(self):
"""Start the subscriber with flow control."""
flow_control = pubsub_v1.types.FlowControl(
max_messages=100, # Max outstanding messages
max_bytes=10 * 1024 * 1024, # 10 MB
)
self.streaming_pull_future = self.subscriber.subscribe(
self.subscription_path,
callback=self.callback,
flow_control=flow_control,
)
print(f"Listening on {self.subscription_path}")
# Handle shutdown gracefully
def shutdown(signum, frame):
print("Shutting down...")
self.streaming_pull_future.cancel()
sys.exit(0)
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
try:
self.streaming_pull_future.result()
except TimeoutError:
self.streaming_pull_future.cancel()
self.streaming_pull_future.result()
if __name__ == "__main__":
processor = OrderProcessor()
processor.start()Step 5: Monitor and Alert
# Alert on dead letter queue growth
resource "google_monitoring_alert_policy" "dead_letter_alert" {
display_name = "Pub/Sub Dead Letter Queue Growing"
conditions {
display_name = "Dead letter messages > 100"
condition_threshold {
filter = <<-EOT
resource.type="pubsub_subscription" AND
resource.labels.subscription_id="orders-dead-letter-processor" AND
metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages"
EOT
comparison = "COMPARISON_GT"
threshold_value = 100
duration = "300s"
aggregations {
alignment_period = "60s"
per_series_aligner = "ALIGN_MEAN"
}
}
}
notification_channels = [google_monitoring_notification_channel.email.id]
}
# Alert on oldest unacked message
resource "google_monitoring_alert_policy" "message_age_alert" {
display_name = "Pub/Sub Messages Backing Up"
conditions {
display_name = "Oldest message > 5 minutes"
condition_threshold {
filter = <<-EOT
resource.type="pubsub_subscription" AND
resource.labels.subscription_id="orders-processor" AND
metric.type="pubsub.googleapis.com/subscription/oldest_unacked_message_age"
EOT
comparison = "COMPARISON_GT"
threshold_value = 300 # 5 minutes
duration = "60s"
}
}
}Step 6: Handle Ordering Key Failures
from google.api_core.exceptions import FailedPrecondition
def publish_with_ordering_recovery(publisher, topic, order_id, data):
"""Publish with automatic recovery from ordering failures."""
try:
future = publisher.publish(
topic,
data.encode('utf-8'),
ordering_key=order_id
)
return future.result(timeout=30)
except FailedPrecondition as e:
# Ordering key is paused due to previous failure
print(f"Resuming ordering key {order_id}")
publisher.resume_publish(topic, order_id)
# Retry the publish
future = publisher.publish(
topic,
data.encode('utf-8'),
ordering_key=order_id
)
return future.result(timeout=30) Message Delivery Guarantees
| Configuration | Ordering | Duplicates | Use Case |
|---|---|---|---|
| Default | No guarantee | Possible | Independent events |
| Ordering key | Within key | Possible | Related events |
| + Idempotency | Within key | Prevented | Financial transactions |
| + Exactly-once | Within key | Prevented (native) | Critical processing |
Pub/Sub Best Practices
- Use ordering keys for related messages (same order, user, etc.)
- Implement idempotent processing - assume duplicates will happen
- Configure dead letter topics - don’t lose failed messages
- Set appropriate ack deadlines - match your processing time
- Use flow control - prevent subscriber overload
Practice Question
Why does enabling message ordering in Pub/Sub only guarantee order for messages with the same ordering key?