Questions
Messages are lost and processed out of order in Service Bus. Implement reliable messaging.
The Scenario
Your order processing system is losing messages:
Service Bus Configuration:
├── Namespace: Standard tier
├── Queue: orders-queue
├── Current issues:
│ ├── ~5% of orders never processed
│ ├── Some orders processed multiple times
│ ├── Dead letter queue has 10,000 messages
│ └── No monitoring or alerting configured
└── Message flow: Web API → Queue → Order Processor → Database
The business is losing revenue due to unprocessed orders.
The Challenge
Implement reliable messaging with proper error handling, dead letter management, duplicate detection, and monitoring to achieve zero message loss.
A junior engineer might use auto-complete mode without proper error handling, ignore the dead letter queue, skip duplicate detection, or use ReceiveAndDelete mode for performance. These approaches cause message loss, duplicate processing, and silent failures.
A senior engineer implements PeekLock with explicit completion, configures dead letter handling with monitoring, enables duplicate detection, implements idempotent message handlers, and sets up proper retry policies with circuit breakers.
Step 1: Understand Message Loss Scenarios
Why Messages Get Lost:
┌─────────────────────────────────────────────────────────────────┐
│ 1. ReceiveAndDelete Mode │
│ Message deleted immediately on receive │
│ If processor crashes before completing work = LOST │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 2. PeekLock Timeout │
│ Lock expires before processing completes │
│ Message becomes visible again = DUPLICATES │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 3. MaxDeliveryCount Exceeded │
│ Failed processing > 10 times (default) │
│ Message moved to Dead Letter Queue │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 4. TTL Expired │
│ Message not processed within TimeToLive │
│ Message discarded or dead-lettered │
└─────────────────────────────────────────────────────────────────┘Step 2: Configure Queue Properly
// Service Bus namespace and queue with reliability settings
resource serviceBusNamespace 'Microsoft.ServiceBus/namespaces@2022-10-01-preview' = {
name: 'sb-orders-${uniqueString(resourceGroup().id)}'
location: location
sku: {
name: 'Premium' // Premium for production (partitioning, geo-DR)
tier: 'Premium'
capacity: 1
}
properties: {
zoneRedundant: true // High availability
}
}
resource ordersQueue 'Microsoft.ServiceBus/namespaces/queues@2022-10-01-preview' = {
parent: serviceBusNamespace
name: 'orders-queue'
properties: {
// Reliability settings
requiresDuplicateDetection: true
duplicateDetectionHistoryTimeWindow: 'PT10M' // 10 minute window
// Dead letter configuration
deadLetteringOnMessageExpiration: true
maxDeliveryCount: 10
// Message settings
defaultMessageTimeToLive: 'P14D' // 14 days
lockDuration: 'PT5M' // 5 minute lock (extend for long processing)
// Ordering
requiresSession: false // Enable if order matters
// Size
maxSizeInMegabytes: 5120 // 5GB
enablePartitioning: false // Disable for ordering guarantees
}
}
// Dead letter queue alert
resource dlqAlert 'Microsoft.Insights/metricAlerts@2018-03-01' = {
name: 'orders-dlq-alert'
location: 'global'
properties: {
description: 'Alert when dead letter queue has messages'
severity: 2
enabled: true
scopes: [serviceBusNamespace.id]
evaluationFrequency: 'PT5M'
windowSize: 'PT15M'
criteria: {
'odata.type': 'Microsoft.Azure.Monitor.SingleResourceMultipleMetricCriteria'
allOf: [
{
name: 'DeadLetteredMessages'
metricName: 'DeadletteredMessages'
dimensions: [
{
name: 'EntityName'
operator: 'Include'
values: ['orders-queue']
}
]
operator: 'GreaterThan'
threshold: 0
timeAggregation: 'Total'
}
]
}
actions: [
{
actionGroupId: actionGroup.id
}
]
}
}Step 3: Implement Reliable Message Sender
// Reliable message sender with retry and duplicate detection
public class ReliableOrderSender
{
private readonly ServiceBusSender _sender;
private readonly ILogger<ReliableOrderSender> _logger;
public ReliableOrderSender(ServiceBusClient client, ILogger<ReliableOrderSender> logger)
{
_sender = client.CreateSender("orders-queue");
_logger = logger;
}
public async Task SendOrderAsync(Order order)
{
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(order))
{
// Unique ID for duplicate detection
MessageId = $"order-{order.Id}-{order.Version}",
// Business correlation
CorrelationId = order.CorrelationId,
// Content type for deserialization
ContentType = "application/json",
// Custom properties for filtering/routing
ApplicationProperties =
{
["OrderType"] = order.Type,
["Priority"] = order.Priority,
["CustomerId"] = order.CustomerId
},
// Schedule for later if needed
// ScheduledEnqueueTime = DateTimeOffset.UtcNow.AddMinutes(5)
};
// Retry policy for transient failures
var retryPolicy = Policy
.Handle<ServiceBusException>(ex => ex.IsTransient)
.WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
(ex, delay, attempt, ctx) =>
{
_logger.LogWarning(ex,
"Send attempt {Attempt} failed, retrying in {Delay}ms",
attempt, delay.TotalMilliseconds);
});
await retryPolicy.ExecuteAsync(async () =>
{
await _sender.SendMessageAsync(message);
_logger.LogInformation("Order {OrderId} sent successfully", order.Id);
});
}
// Batch sending for high throughput
public async Task SendOrderBatchAsync(IEnumerable<Order> orders)
{
using ServiceBusMessageBatch batch = await _sender.CreateMessageBatchAsync();
foreach (var order in orders)
{
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(order))
{
MessageId = $"order-{order.Id}-{order.Version}"
};
if (!batch.TryAddMessage(message))
{
// Batch is full, send it and create new batch
await _sender.SendMessagesAsync(batch);
batch = await _sender.CreateMessageBatchAsync();
if (!batch.TryAddMessage(message))
{
throw new Exception($"Message too large for batch: {order.Id}");
}
}
}
if (batch.Count > 0)
{
await _sender.SendMessagesAsync(batch);
}
}
}Step 4: Implement Reliable Message Processor
// Reliable processor with proper error handling
public class OrderProcessor : BackgroundService
{
private readonly ServiceBusProcessor _processor;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OrderProcessor> _logger;
public OrderProcessor(ServiceBusClient client, IServiceProvider serviceProvider,
ILogger<OrderProcessor> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
_processor = client.CreateProcessor("orders-queue", new ServiceBusProcessorOptions
{
// PeekLock mode (default) - explicit completion required
ReceiveMode = ServiceBusReceiveMode.PeekLock,
// Process multiple messages concurrently
MaxConcurrentCalls = 10,
// Auto-renew lock for long processing
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(30),
// Prefetch for performance
PrefetchCount = 20
});
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessMessageAsync += ProcessMessageAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
await _processor.StartProcessingAsync(stoppingToken);
// Keep running until cancellation
await Task.Delay(Timeout.Infinite, stoppingToken);
}
private async Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
var orderId = args.Message.MessageId;
var deliveryCount = args.Message.DeliveryCount;
_logger.LogInformation(
"Processing order {OrderId}, attempt {DeliveryCount}",
orderId, deliveryCount);
try
{
var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToArray());
using var scope = _serviceProvider.CreateScope();
var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>();
// Process with idempotency check
await orderService.ProcessOrderAsync(order);
// Explicitly complete the message
await args.CompleteMessageAsync(args.Message);
_logger.LogInformation("Order {OrderId} completed successfully", orderId);
}
catch (DuplicateOrderException ex)
{
// Already processed - complete the message to remove it
_logger.LogWarning("Duplicate order {OrderId}, completing message", orderId);
await args.CompleteMessageAsync(args.Message);
}
catch (InvalidOrderException ex)
{
// Business validation failed - dead letter immediately
_logger.LogError(ex, "Invalid order {OrderId}, dead-lettering", orderId);
await args.DeadLetterMessageAsync(args.Message,
deadLetterReason: "InvalidOrder",
deadLetterErrorDescription: ex.Message);
}
catch (TransientException ex) when (deliveryCount < 5)
{
// Transient error - abandon to retry
_logger.LogWarning(ex,
"Transient error processing {OrderId}, abandoning for retry", orderId);
await args.AbandonMessageAsync(args.Message);
}
catch (Exception ex)
{
// Unexpected error - decide based on delivery count
if (deliveryCount >= 10)
{
_logger.LogError(ex,
"Max delivery count reached for {OrderId}, dead-lettering", orderId);
await args.DeadLetterMessageAsync(args.Message,
deadLetterReason: "MaxDeliveryCountExceeded",
deadLetterErrorDescription: ex.Message);
}
else
{
_logger.LogWarning(ex,
"Error processing {OrderId}, abandoning for retry", orderId);
await args.AbandonMessageAsync(args.Message);
}
}
}
private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception,
"Service Bus error. Source: {Source}, Entity: {Entity}",
args.ErrorSource, args.EntityPath);
return Task.CompletedTask;
}
}Step 5: Implement Idempotent Processing
// Idempotent order processing to handle duplicates
public class OrderService : IOrderService
{
private readonly ApplicationDbContext _context;
public async Task ProcessOrderAsync(Order order)
{
// Check if already processed (idempotency key)
var existingOrder = await _context.Orders
.FirstOrDefaultAsync(o => o.Id == order.Id);
if (existingOrder != null)
{
if (existingOrder.Version >= order.Version)
{
throw new DuplicateOrderException(order.Id);
}
// Newer version - update
_context.Entry(existingOrder).CurrentValues.SetValues(order);
}
else
{
// New order
_context.Orders.Add(order);
}
// Use database transaction for atomicity
using var transaction = await _context.Database.BeginTransactionAsync();
try
{
await _context.SaveChangesAsync();
// Record processing in idempotency table
await _context.ProcessedMessages.AddAsync(new ProcessedMessage
{
MessageId = $"order-{order.Id}-{order.Version}",
ProcessedAt = DateTime.UtcNow
});
await _context.SaveChangesAsync();
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}Step 6: Dead Letter Queue Processing
// Process dead letter queue for investigation and retry
public class DeadLetterProcessor : BackgroundService
{
private readonly ServiceBusReceiver _dlqReceiver;
private readonly ServiceBusSender _sender;
private readonly ILogger<DeadLetterProcessor> _logger;
public DeadLetterProcessor(ServiceBusClient client, ILogger<DeadLetterProcessor> logger)
{
_logger = logger;
// Create receiver for dead letter sub-queue
_dlqReceiver = client.CreateReceiver(
"orders-queue",
new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter,
ReceiveMode = ServiceBusReceiveMode.PeekLock
});
_sender = client.CreateSender("orders-queue");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var messages = await _dlqReceiver.ReceiveMessagesAsync(
maxMessages: 10,
maxWaitTime: TimeSpan.FromSeconds(30),
cancellationToken: stoppingToken);
foreach (var message in messages)
{
var reason = message.DeadLetterReason;
var description = message.DeadLetterErrorDescription;
_logger.LogWarning(
"Dead letter message {MessageId}. Reason: {Reason}, Description: {Description}",
message.MessageId, reason, description);
// Decide whether to retry or discard
if (ShouldRetry(reason))
{
// Create new message for retry
var retryMessage = new ServiceBusMessage(message.Body)
{
MessageId = $"{message.MessageId}-retry",
ApplicationProperties = { ["RetryCount"] = GetRetryCount(message) + 1 }
};
await _sender.SendMessageAsync(retryMessage);
_logger.LogInformation("Retrying message {MessageId}", message.MessageId);
}
else
{
// Log for manual investigation
await LogToInvestigationTableAsync(message);
}
await _dlqReceiver.CompleteMessageAsync(message);
}
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
}
private bool ShouldRetry(string reason)
{
return reason switch
{
"TransientError" => true,
"MaxDeliveryCountExceeded" => true, // May have been a temporary issue
"InvalidOrder" => false, // Business error, won't succeed
_ => false
};
}
}Message Flow Summary
Reliable Message Flow:
┌─────────────────────────────┐
│ Dead Letter Queue │
│ (Monitor + Alert + Retry) │
└──────────────▲──────────────┘
│ Max Delivery
│ or Explicit DLQ
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ ┌─────────┐
│ API │──▶│ Queue │──▶│ PeekLock│──▶│ Processor │──▶│Database │
│ (Send) │ │ │ │ │ │ (Idempotent) │ │ │
└─────────┘ └─────────┘ └────┬────┘ └────────┬────────┘ └─────────┘
│ ▲ │ │
│ │ │ Complete │
│ └────────────┴─────────────────┘
│ │
│ Duplicate │ Abandon (retry)
└──────Detection────────────┘ Service Bus Reliability Checklist
| Setting | Value | Purpose |
|---|---|---|
| ReceiveMode | PeekLock | Don’t delete until processed |
| DuplicateDetection | Enabled | Prevent duplicate sends |
| MaxDeliveryCount | 10 | Retry before dead-letter |
| LockDuration | 5 min | Match processing time |
| DeadLetterOnExpiration | true | Don’t lose expired messages |
Practice Question
Why should you use PeekLock mode instead of ReceiveAndDelete mode for critical messages?