DeployU
Interviews / Cloud & DevOps / Messages are being lost and processed multiple times. Implement reliable SQS/SNS messaging.

Messages are being lost and processed multiple times. Implement reliable SQS/SNS messaging.

practical Messaging Interactive Quiz Code Examples

The Scenario

Your messaging system has problems:

Current Issues:
├── Messages disappearing after 30 seconds
├── Same message processed 3-4 times
├── Order of messages not preserved
├── Failed messages lost forever
├── Consumer crashes mid-processing
└── No visibility into message flow

The Challenge

Implement a reliable messaging architecture using SQS and SNS with proper visibility timeout, dead letter queues, and idempotent processing.

Wrong Approach

A junior engineer might use default SQS settings, skip dead letter queues, process messages without idempotency, or ignore visibility timeout configuration. These approaches cause message loss, duplicate processing, debugging nightmares, and data inconsistency.

Right Approach

A senior engineer configures visibility timeout based on processing time, implements dead letter queues for failed messages, makes consumers idempotent, uses message deduplication, and sets up comprehensive monitoring.

Step 1: Understanding the Message Flow

Reliable Messaging Architecture:
┌─────────────┐     ┌─────────────┐     ┌─────────────────────────────────┐
│  Publisher  │────►│    SNS      │────►│           SQS Queues            │
└─────────────┘     │   Topic     │     │  ┌─────────┐    ┌────────────┐  │
                    │             │     │  │ Orders  │    │ Inventory  │  │
                    │ Fan-out to  │────►│  │  Queue  │    │   Queue    │  │
                    │  multiple   │     │  └────┬────┘    └─────┬──────┘  │
                    │subscribers  │     │       │               │         │
                    └─────────────┘     │       ▼               ▼         │
                                        │  ┌─────────┐    ┌────────────┐  │
                                        │  │ Orders  │    │ Inventory  │  │
                                        │  │   DLQ   │    │    DLQ     │  │
                                        │  └─────────┘    └────────────┘  │
                                        └─────────────────────────────────┘
                                                │               │
                                                ▼               ▼
                                        ┌─────────────┐ ┌─────────────────┐
                                        │   Orders    │ │    Inventory    │
                                        │   Lambda    │ │     Lambda      │
                                        └─────────────┘ └─────────────────┘

Step 2: Configure SQS with Dead Letter Queue

# Dead Letter Queue
resource "aws_sqs_queue" "orders_dlq" {
  name                      = "orders-dlq"
  message_retention_seconds = 1209600  # 14 days

  tags = {
    Name = "orders-dlq"
  }
}

# Main Queue
resource "aws_sqs_queue" "orders" {
  name                       = "orders-queue"
  visibility_timeout_seconds = 300  # 5 minutes (6x Lambda timeout)
  message_retention_seconds  = 345600  # 4 days
  receive_wait_time_seconds  = 20  # Long polling
  delay_seconds              = 0

  # Dead letter queue configuration
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.orders_dlq.arn
    maxReceiveCount     = 3  # Move to DLQ after 3 failures
  })

  # Enable SSE
  sqs_managed_sse_enabled = true

  tags = {
    Name = "orders-queue"
  }
}

# Allow DLQ to receive messages
resource "aws_sqs_queue_redrive_allow_policy" "orders_dlq" {
  queue_url = aws_sqs_queue.orders_dlq.id

  redrive_allow_policy = jsonencode({
    redrivePermission = "byQueue"
    sourceQueueArns   = [aws_sqs_queue.orders.arn]
  })
}

# FIFO Queue for ordered processing
resource "aws_sqs_queue" "orders_fifo" {
  name                        = "orders-queue.fifo"
  fifo_queue                  = true
  content_based_deduplication = false  # We'll provide deduplication IDs
  deduplication_scope         = "messageGroup"
  fifo_throughput_limit       = "perMessageGroupId"
  visibility_timeout_seconds  = 300

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.orders_fifo_dlq.arn
    maxReceiveCount     = 3
  })

  tags = {
    Name = "orders-fifo-queue"
  }
}

Step 3: SNS Topic with Fan-out

# SNS Topic
resource "aws_sns_topic" "orders" {
  name = "orders-topic"

  # Enable SSE
  kms_master_key_id = "alias/aws/sns"

  tags = {
    Name = "orders-topic"
  }
}

# Subscribe SQS queues to SNS
resource "aws_sns_topic_subscription" "orders_queue" {
  topic_arn = aws_sns_topic.orders.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.orders.arn

  # Filter policy - only receive certain messages
  filter_policy = jsonencode({
    event_type = ["order.created", "order.updated"]
  })

  # Enable raw message delivery (no SNS wrapper)
  raw_message_delivery = true
}

resource "aws_sns_topic_subscription" "inventory_queue" {
  topic_arn = aws_sns_topic.orders.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.inventory.arn

  filter_policy = jsonencode({
    event_type = ["order.created"]
  })

  raw_message_delivery = true
}

# SQS policy to allow SNS to send messages
resource "aws_sqs_queue_policy" "orders" {
  queue_url = aws_sqs_queue.orders.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid       = "AllowSNS"
        Effect    = "Allow"
        Principal = {
          Service = "sns.amazonaws.com"
        }
        Action   = "sqs:SendMessage"
        Resource = aws_sqs_queue.orders.arn
        Condition = {
          ArnEquals = {
            "aws:SourceArn" = aws_sns_topic.orders.arn
          }
        }
      }
    ]
  })
}

Step 4: Publishing Messages

import boto3
import json
import uuid
from datetime import datetime

sns = boto3.client('sns')
sqs = boto3.client('sqs')

def publish_order_event(order: dict, event_type: str):
    """Publish order event to SNS with message attributes."""
    message = {
        'order_id': order['id'],
        'customer_id': order['customer_id'],
        'total': order['total'],
        'items': order['items'],
        'timestamp': datetime.utcnow().isoformat(),
    }

    response = sns.publish(
        TopicArn='arn:aws:sns:us-east-1:123456789:orders-topic',
        Message=json.dumps(message),
        MessageAttributes={
            'event_type': {
                'DataType': 'String',
                'StringValue': event_type
            },
            'customer_tier': {
                'DataType': 'String',
                'StringValue': order.get('customer_tier', 'standard')
            }
        },
        # For FIFO topics
        # MessageGroupId=order['customer_id'],
        # MessageDeduplicationId=f"{order['id']}-{event_type}"
    )

    return response['MessageId']

def send_to_fifo_queue(order: dict):
    """Send message to FIFO queue with deduplication."""
    response = sqs.send_message(
        QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/orders-queue.fifo',
        MessageBody=json.dumps(order),
        MessageGroupId=order['customer_id'],  # Orders for same customer processed in order
        MessageDeduplicationId=f"order-{order['id']}-{order['version']}"  # Prevent duplicates
    )

    return response['MessageId']

Step 5: Idempotent Consumer

import boto3
import json
import hashlib
from functools import wraps

dynamodb = boto3.resource('dynamodb')
idempotency_table = dynamodb.Table('message-idempotency')

def idempotent(func):
    """Decorator to ensure idempotent message processing."""
    @wraps(func)
    def wrapper(event, context):
        for record in event['Records']:
            message_id = record['messageId']
            receipt_handle = record['receiptHandle']

            # Check if already processed
            try:
                response = idempotency_table.get_item(
                    Key={'message_id': message_id}
                )
                if 'Item' in response:
                    print(f"Message {message_id} already processed, skipping")
                    continue
            except Exception as e:
                print(f"Error checking idempotency: {e}")

            # Process message
            try:
                result = func(record)

                # Mark as processed
                idempotency_table.put_item(
                    Item={
                        'message_id': message_id,
                        'processed_at': datetime.utcnow().isoformat(),
                        'result': str(result),
                        'ttl': int((datetime.utcnow() + timedelta(days=7)).timestamp())
                    },
                    ConditionExpression='attribute_not_exists(message_id)'
                )

            except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
                print(f"Message {message_id} processed by another consumer")

            except Exception as e:
                print(f"Error processing message: {e}")
                raise  # Let SQS retry

    return wrapper

@idempotent
def process_order(record):
    """Process a single order message."""
    body = json.loads(record['body'])
    order_id = body['order_id']

    # Idempotent operation - use order_id as idempotency key
    result = db.execute("""
        INSERT INTO orders (id, customer_id, total, status)
        VALUES (%s, %s, %s, 'processing')
        ON CONFLICT (id) DO NOTHING
        RETURNING id
    """, (order_id, body['customer_id'], body['total']))

    if result:
        # Order was inserted, process it
        process_order_items(order_id, body['items'])
        update_inventory(body['items'])
        send_confirmation_email(body['customer_id'], order_id)

    return {'order_id': order_id, 'status': 'processed'}

Step 6: Lambda Consumer with Batch Processing

resource "aws_lambda_function" "order_processor" {
  function_name = "order-processor"
  runtime       = "python3.11"
  handler       = "handler.process_orders"
  timeout       = 50  # Less than visibility_timeout / 6
  memory_size   = 256

  role = aws_iam_role.lambda_execution.arn

  environment {
    variables = {
      IDEMPOTENCY_TABLE = aws_dynamodb_table.idempotency.name
    }
  }

  dead_letter_config {
    target_arn = aws_sqs_queue.lambda_dlq.arn
  }
}

resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn                   = aws_sqs_queue.orders.arn
  function_name                      = aws_lambda_function.order_processor.arn
  batch_size                         = 10
  maximum_batching_window_in_seconds = 5

  # Partial batch failure reporting
  function_response_types = ["ReportBatchItemFailures"]

  # Scaling configuration
  scaling_config {
    maximum_concurrency = 10
  }
}
# Lambda handler with partial batch failure reporting
def process_orders(event, context):
    """Process SQS messages with partial failure reporting."""
    batch_item_failures = []

    for record in event['Records']:
        try:
            body = json.loads(record['body'])
            process_single_order(body)

        except Exception as e:
            print(f"Error processing message {record['messageId']}: {e}")
            batch_item_failures.append({
                'itemIdentifier': record['messageId']
            })

    # Return failed items for retry
    return {
        'batchItemFailures': batch_item_failures
    }

Step 7: DLQ Processing and Monitoring

import boto3

sqs = boto3.client('sqs')
cloudwatch = boto3.client('cloudwatch')

def process_dlq():
    """Process messages from dead letter queue."""
    dlq_url = 'https://sqs.us-east-1.amazonaws.com/123456789/orders-dlq'

    while True:
        response = sqs.receive_message(
            QueueUrl=dlq_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20,
            AttributeNames=['All'],
            MessageAttributeNames=['All']
        )

        messages = response.get('Messages', [])
        if not messages:
            break

        for message in messages:
            body = json.loads(message['Body'])
            attributes = message.get('Attributes', {})

            print(f"DLQ Message: {message['MessageId']}")
            print(f"  Receive Count: {attributes.get('ApproximateReceiveCount')}")
            print(f"  First Received: {attributes.get('ApproximateFirstReceiveTimestamp')}")
            print(f"  Body: {body}")

            # Analyze and fix the issue, then either:
            # 1. Reprocess manually
            # 2. Redrive to source queue
            # 3. Archive for investigation

            # Delete from DLQ after handling
            sqs.delete_message(
                QueueUrl=dlq_url,
                ReceiptHandle=message['ReceiptHandle']
            )

def redrive_dlq_messages():
    """Redrive messages from DLQ back to source queue."""
    sqs.start_message_move_task(
        SourceArn='arn:aws:sqs:us-east-1:123456789:orders-dlq',
        DestinationArn='arn:aws:sqs:us-east-1:123456789:orders-queue',
        MaxNumberOfMessagesPerSecond=10
    )
# CloudWatch Alarms
resource "aws_cloudwatch_metric_alarm" "dlq_messages" {
  alarm_name          = "orders-dlq-has-messages"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 300
  statistic           = "Sum"
  threshold           = 0
  alarm_description   = "Messages in DLQ indicate processing failures"
  alarm_actions       = [aws_sns_topic.alerts.arn]

  dimensions = {
    QueueName = aws_sqs_queue.orders_dlq.name
  }
}

resource "aws_cloudwatch_metric_alarm" "queue_age" {
  alarm_name          = "orders-queue-old-messages"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "ApproximateAgeOfOldestMessage"
  namespace           = "AWS/SQS"
  period              = 300
  statistic           = "Maximum"
  threshold           = 3600  # 1 hour
  alarm_description   = "Messages sitting in queue too long"
  alarm_actions       = [aws_sns_topic.alerts.arn]

  dimensions = {
    QueueName = aws_sqs_queue.orders.name
  }
}

SQS/SNS Best Practices

SettingValueReason
Visibility Timeout6x processing timeAllow retries without duplicates
Long Polling20 secondsReduce empty responses
DLQ Max Receive3-5Balance retries and alerting
Message Retention4-14 daysTime to fix issues
Batch Size10Efficient processing

Practice Question

Why should visibility timeout be set to at least 6 times the Lambda function timeout?