Interviews / Cloud & DevOps / Messages are being lost and processed multiple times. Implement reliable SQS/SNS messaging.
Lambda functions are timing out when accessing RDS in a VPC. Debug the connectivity issue.
Design a multi-tier VPC architecture with public, private, and database subnets.
DynamoDB is throttling requests and costs are high. Optimize the table design.
RDS connections are exhausted and failover takes too long. Fix the database setup.
Implement S3 with CloudFront for secure, cached content delivery with signed URLs.
ECS tasks are failing with exit code 137 and health check failures. Debug the container issues.
Messages are being lost and processed multiple times. Implement reliable SQS/SNS messaging.
Design a scalable API Gateway with throttling, caching, and Lambda integration.
Production incidents take hours to detect. Implement CloudWatch alarms and dashboards.
IAM policies are too permissive. Implement least privilege access with proper role design.
Build a CI/CD pipeline with CodePipeline that deploys to ECS with blue-green deployments.
Your AWS bill increased 40% last month. Identify waste and implement cost controls.
Questions
Messages are being lost and processed multiple times. Implement reliable SQS/SNS messaging.
The Scenario
Your messaging system has problems:
Current Issues:
├── Messages disappearing after 30 seconds
├── Same message processed 3-4 times
├── Order of messages not preserved
├── Failed messages lost forever
├── Consumer crashes mid-processing
└── No visibility into message flow
The Challenge
Implement a reliable messaging architecture using SQS and SNS with proper visibility timeout, dead letter queues, and idempotent processing.
Wrong Approach
A junior engineer might use default SQS settings, skip dead letter queues, process messages without idempotency, or ignore visibility timeout configuration. These approaches cause message loss, duplicate processing, debugging nightmares, and data inconsistency.
Right Approach
A senior engineer configures visibility timeout based on processing time, implements dead letter queues for failed messages, makes consumers idempotent, uses message deduplication, and sets up comprehensive monitoring.
Step 1: Understanding the Message Flow
Reliable Messaging Architecture:
┌─────────────┐ ┌─────────────┐ ┌─────────────────────────────────┐
│ Publisher │────►│ SNS │────►│ SQS Queues │
└─────────────┘ │ Topic │ │ ┌─────────┐ ┌────────────┐ │
│ │ │ │ Orders │ │ Inventory │ │
│ Fan-out to │────►│ │ Queue │ │ Queue │ │
│ multiple │ │ └────┬────┘ └─────┬──────┘ │
│subscribers │ │ │ │ │
└─────────────┘ │ ▼ ▼ │
│ ┌─────────┐ ┌────────────┐ │
│ │ Orders │ │ Inventory │ │
│ │ DLQ │ │ DLQ │ │
│ └─────────┘ └────────────┘ │
└─────────────────────────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────────┐
│ Orders │ │ Inventory │
│ Lambda │ │ Lambda │
└─────────────┘ └─────────────────┘Step 2: Configure SQS with Dead Letter Queue
# Dead Letter Queue
resource "aws_sqs_queue" "orders_dlq" {
name = "orders-dlq"
message_retention_seconds = 1209600 # 14 days
tags = {
Name = "orders-dlq"
}
}
# Main Queue
resource "aws_sqs_queue" "orders" {
name = "orders-queue"
visibility_timeout_seconds = 300 # 5 minutes (6x Lambda timeout)
message_retention_seconds = 345600 # 4 days
receive_wait_time_seconds = 20 # Long polling
delay_seconds = 0
# Dead letter queue configuration
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.orders_dlq.arn
maxReceiveCount = 3 # Move to DLQ after 3 failures
})
# Enable SSE
sqs_managed_sse_enabled = true
tags = {
Name = "orders-queue"
}
}
# Allow DLQ to receive messages
resource "aws_sqs_queue_redrive_allow_policy" "orders_dlq" {
queue_url = aws_sqs_queue.orders_dlq.id
redrive_allow_policy = jsonencode({
redrivePermission = "byQueue"
sourceQueueArns = [aws_sqs_queue.orders.arn]
})
}
# FIFO Queue for ordered processing
resource "aws_sqs_queue" "orders_fifo" {
name = "orders-queue.fifo"
fifo_queue = true
content_based_deduplication = false # We'll provide deduplication IDs
deduplication_scope = "messageGroup"
fifo_throughput_limit = "perMessageGroupId"
visibility_timeout_seconds = 300
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.orders_fifo_dlq.arn
maxReceiveCount = 3
})
tags = {
Name = "orders-fifo-queue"
}
}Step 3: SNS Topic with Fan-out
# SNS Topic
resource "aws_sns_topic" "orders" {
name = "orders-topic"
# Enable SSE
kms_master_key_id = "alias/aws/sns"
tags = {
Name = "orders-topic"
}
}
# Subscribe SQS queues to SNS
resource "aws_sns_topic_subscription" "orders_queue" {
topic_arn = aws_sns_topic.orders.arn
protocol = "sqs"
endpoint = aws_sqs_queue.orders.arn
# Filter policy - only receive certain messages
filter_policy = jsonencode({
event_type = ["order.created", "order.updated"]
})
# Enable raw message delivery (no SNS wrapper)
raw_message_delivery = true
}
resource "aws_sns_topic_subscription" "inventory_queue" {
topic_arn = aws_sns_topic.orders.arn
protocol = "sqs"
endpoint = aws_sqs_queue.inventory.arn
filter_policy = jsonencode({
event_type = ["order.created"]
})
raw_message_delivery = true
}
# SQS policy to allow SNS to send messages
resource "aws_sqs_queue_policy" "orders" {
queue_url = aws_sqs_queue.orders.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowSNS"
Effect = "Allow"
Principal = {
Service = "sns.amazonaws.com"
}
Action = "sqs:SendMessage"
Resource = aws_sqs_queue.orders.arn
Condition = {
ArnEquals = {
"aws:SourceArn" = aws_sns_topic.orders.arn
}
}
}
]
})
}Step 4: Publishing Messages
import boto3
import json
import uuid
from datetime import datetime
sns = boto3.client('sns')
sqs = boto3.client('sqs')
def publish_order_event(order: dict, event_type: str):
"""Publish order event to SNS with message attributes."""
message = {
'order_id': order['id'],
'customer_id': order['customer_id'],
'total': order['total'],
'items': order['items'],
'timestamp': datetime.utcnow().isoformat(),
}
response = sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789:orders-topic',
Message=json.dumps(message),
MessageAttributes={
'event_type': {
'DataType': 'String',
'StringValue': event_type
},
'customer_tier': {
'DataType': 'String',
'StringValue': order.get('customer_tier', 'standard')
}
},
# For FIFO topics
# MessageGroupId=order['customer_id'],
# MessageDeduplicationId=f"{order['id']}-{event_type}"
)
return response['MessageId']
def send_to_fifo_queue(order: dict):
"""Send message to FIFO queue with deduplication."""
response = sqs.send_message(
QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/orders-queue.fifo',
MessageBody=json.dumps(order),
MessageGroupId=order['customer_id'], # Orders for same customer processed in order
MessageDeduplicationId=f"order-{order['id']}-{order['version']}" # Prevent duplicates
)
return response['MessageId']Step 5: Idempotent Consumer
import boto3
import json
import hashlib
from functools import wraps
dynamodb = boto3.resource('dynamodb')
idempotency_table = dynamodb.Table('message-idempotency')
def idempotent(func):
"""Decorator to ensure idempotent message processing."""
@wraps(func)
def wrapper(event, context):
for record in event['Records']:
message_id = record['messageId']
receipt_handle = record['receiptHandle']
# Check if already processed
try:
response = idempotency_table.get_item(
Key={'message_id': message_id}
)
if 'Item' in response:
print(f"Message {message_id} already processed, skipping")
continue
except Exception as e:
print(f"Error checking idempotency: {e}")
# Process message
try:
result = func(record)
# Mark as processed
idempotency_table.put_item(
Item={
'message_id': message_id,
'processed_at': datetime.utcnow().isoformat(),
'result': str(result),
'ttl': int((datetime.utcnow() + timedelta(days=7)).timestamp())
},
ConditionExpression='attribute_not_exists(message_id)'
)
except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
print(f"Message {message_id} processed by another consumer")
except Exception as e:
print(f"Error processing message: {e}")
raise # Let SQS retry
return wrapper
@idempotent
def process_order(record):
"""Process a single order message."""
body = json.loads(record['body'])
order_id = body['order_id']
# Idempotent operation - use order_id as idempotency key
result = db.execute("""
INSERT INTO orders (id, customer_id, total, status)
VALUES (%s, %s, %s, 'processing')
ON CONFLICT (id) DO NOTHING
RETURNING id
""", (order_id, body['customer_id'], body['total']))
if result:
# Order was inserted, process it
process_order_items(order_id, body['items'])
update_inventory(body['items'])
send_confirmation_email(body['customer_id'], order_id)
return {'order_id': order_id, 'status': 'processed'}Step 6: Lambda Consumer with Batch Processing
resource "aws_lambda_function" "order_processor" {
function_name = "order-processor"
runtime = "python3.11"
handler = "handler.process_orders"
timeout = 50 # Less than visibility_timeout / 6
memory_size = 256
role = aws_iam_role.lambda_execution.arn
environment {
variables = {
IDEMPOTENCY_TABLE = aws_dynamodb_table.idempotency.name
}
}
dead_letter_config {
target_arn = aws_sqs_queue.lambda_dlq.arn
}
}
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
event_source_arn = aws_sqs_queue.orders.arn
function_name = aws_lambda_function.order_processor.arn
batch_size = 10
maximum_batching_window_in_seconds = 5
# Partial batch failure reporting
function_response_types = ["ReportBatchItemFailures"]
# Scaling configuration
scaling_config {
maximum_concurrency = 10
}
}# Lambda handler with partial batch failure reporting
def process_orders(event, context):
"""Process SQS messages with partial failure reporting."""
batch_item_failures = []
for record in event['Records']:
try:
body = json.loads(record['body'])
process_single_order(body)
except Exception as e:
print(f"Error processing message {record['messageId']}: {e}")
batch_item_failures.append({
'itemIdentifier': record['messageId']
})
# Return failed items for retry
return {
'batchItemFailures': batch_item_failures
}Step 7: DLQ Processing and Monitoring
import boto3
sqs = boto3.client('sqs')
cloudwatch = boto3.client('cloudwatch')
def process_dlq():
"""Process messages from dead letter queue."""
dlq_url = 'https://sqs.us-east-1.amazonaws.com/123456789/orders-dlq'
while True:
response = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
AttributeNames=['All'],
MessageAttributeNames=['All']
)
messages = response.get('Messages', [])
if not messages:
break
for message in messages:
body = json.loads(message['Body'])
attributes = message.get('Attributes', {})
print(f"DLQ Message: {message['MessageId']}")
print(f" Receive Count: {attributes.get('ApproximateReceiveCount')}")
print(f" First Received: {attributes.get('ApproximateFirstReceiveTimestamp')}")
print(f" Body: {body}")
# Analyze and fix the issue, then either:
# 1. Reprocess manually
# 2. Redrive to source queue
# 3. Archive for investigation
# Delete from DLQ after handling
sqs.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=message['ReceiptHandle']
)
def redrive_dlq_messages():
"""Redrive messages from DLQ back to source queue."""
sqs.start_message_move_task(
SourceArn='arn:aws:sqs:us-east-1:123456789:orders-dlq',
DestinationArn='arn:aws:sqs:us-east-1:123456789:orders-queue',
MaxNumberOfMessagesPerSecond=10
)# CloudWatch Alarms
resource "aws_cloudwatch_metric_alarm" "dlq_messages" {
alarm_name = "orders-dlq-has-messages"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 300
statistic = "Sum"
threshold = 0
alarm_description = "Messages in DLQ indicate processing failures"
alarm_actions = [aws_sns_topic.alerts.arn]
dimensions = {
QueueName = aws_sqs_queue.orders_dlq.name
}
}
resource "aws_cloudwatch_metric_alarm" "queue_age" {
alarm_name = "orders-queue-old-messages"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "ApproximateAgeOfOldestMessage"
namespace = "AWS/SQS"
period = 300
statistic = "Maximum"
threshold = 3600 # 1 hour
alarm_description = "Messages sitting in queue too long"
alarm_actions = [aws_sns_topic.alerts.arn]
dimensions = {
QueueName = aws_sqs_queue.orders.name
}
} SQS/SNS Best Practices
| Setting | Value | Reason |
|---|---|---|
| Visibility Timeout | 6x processing time | Allow retries without duplicates |
| Long Polling | 20 seconds | Reduce empty responses |
| DLQ Max Receive | 3-5 | Balance retries and alerting |
| Message Retention | 4-14 days | Time to fix issues |
| Batch Size | 10 | Efficient processing |
Practice Question
Why should visibility timeout be set to at least 6 times the Lambda function timeout?