DeployU
Interviews / Cloud & DevOps / Messages are lost and processed out of order in Service Bus. Implement reliable messaging.

Messages are lost and processed out of order in Service Bus. Implement reliable messaging.

practical Messaging Interactive Quiz Code Examples

The Scenario

Your order processing system is losing messages:

Service Bus Configuration:
├── Namespace: Standard tier
├── Queue: orders-queue
├── Current issues:
│   ├── ~5% of orders never processed
│   ├── Some orders processed multiple times
│   ├── Dead letter queue has 10,000 messages
│   └── No monitoring or alerting configured
└── Message flow: Web API → Queue → Order Processor → Database

The business is losing revenue due to unprocessed orders.

The Challenge

Implement reliable messaging with proper error handling, dead letter management, duplicate detection, and monitoring to achieve zero message loss.

Wrong Approach

A junior engineer might use auto-complete mode without proper error handling, ignore the dead letter queue, skip duplicate detection, or use ReceiveAndDelete mode for performance. These approaches cause message loss, duplicate processing, and silent failures.

Right Approach

A senior engineer implements PeekLock with explicit completion, configures dead letter handling with monitoring, enables duplicate detection, implements idempotent message handlers, and sets up proper retry policies with circuit breakers.

Step 1: Understand Message Loss Scenarios

Why Messages Get Lost:
┌─────────────────────────────────────────────────────────────────┐
│ 1. ReceiveAndDelete Mode                                        │
│    Message deleted immediately on receive                       │
│    If processor crashes before completing work = LOST           │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 2. PeekLock Timeout                                             │
│    Lock expires before processing completes                     │
│    Message becomes visible again = DUPLICATES                   │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 3. MaxDeliveryCount Exceeded                                    │
│    Failed processing > 10 times (default)                       │
│    Message moved to Dead Letter Queue                           │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 4. TTL Expired                                                   │
│    Message not processed within TimeToLive                      │
│    Message discarded or dead-lettered                           │
└─────────────────────────────────────────────────────────────────┘

Step 2: Configure Queue Properly

// Service Bus namespace and queue with reliability settings
resource serviceBusNamespace 'Microsoft.ServiceBus/namespaces@2022-10-01-preview' = {
  name: 'sb-orders-${uniqueString(resourceGroup().id)}'
  location: location
  sku: {
    name: 'Premium'  // Premium for production (partitioning, geo-DR)
    tier: 'Premium'
    capacity: 1
  }
  properties: {
    zoneRedundant: true  // High availability
  }
}

resource ordersQueue 'Microsoft.ServiceBus/namespaces/queues@2022-10-01-preview' = {
  parent: serviceBusNamespace
  name: 'orders-queue'
  properties: {
    // Reliability settings
    requiresDuplicateDetection: true
    duplicateDetectionHistoryTimeWindow: 'PT10M'  // 10 minute window

    // Dead letter configuration
    deadLetteringOnMessageExpiration: true
    maxDeliveryCount: 10

    // Message settings
    defaultMessageTimeToLive: 'P14D'  // 14 days
    lockDuration: 'PT5M'  // 5 minute lock (extend for long processing)

    // Ordering
    requiresSession: false  // Enable if order matters

    // Size
    maxSizeInMegabytes: 5120  // 5GB
    enablePartitioning: false  // Disable for ordering guarantees
  }
}

// Dead letter queue alert
resource dlqAlert 'Microsoft.Insights/metricAlerts@2018-03-01' = {
  name: 'orders-dlq-alert'
  location: 'global'
  properties: {
    description: 'Alert when dead letter queue has messages'
    severity: 2
    enabled: true
    scopes: [serviceBusNamespace.id]
    evaluationFrequency: 'PT5M'
    windowSize: 'PT15M'
    criteria: {
      'odata.type': 'Microsoft.Azure.Monitor.SingleResourceMultipleMetricCriteria'
      allOf: [
        {
          name: 'DeadLetteredMessages'
          metricName: 'DeadletteredMessages'
          dimensions: [
            {
              name: 'EntityName'
              operator: 'Include'
              values: ['orders-queue']
            }
          ]
          operator: 'GreaterThan'
          threshold: 0
          timeAggregation: 'Total'
        }
      ]
    }
    actions: [
      {
        actionGroupId: actionGroup.id
      }
    ]
  }
}

Step 3: Implement Reliable Message Sender

// Reliable message sender with retry and duplicate detection
public class ReliableOrderSender
{
    private readonly ServiceBusSender _sender;
    private readonly ILogger<ReliableOrderSender> _logger;

    public ReliableOrderSender(ServiceBusClient client, ILogger<ReliableOrderSender> logger)
    {
        _sender = client.CreateSender("orders-queue");
        _logger = logger;
    }

    public async Task SendOrderAsync(Order order)
    {
        var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(order))
        {
            // Unique ID for duplicate detection
            MessageId = $"order-{order.Id}-{order.Version}",

            // Business correlation
            CorrelationId = order.CorrelationId,

            // Content type for deserialization
            ContentType = "application/json",

            // Custom properties for filtering/routing
            ApplicationProperties =
            {
                ["OrderType"] = order.Type,
                ["Priority"] = order.Priority,
                ["CustomerId"] = order.CustomerId
            },

            // Schedule for later if needed
            // ScheduledEnqueueTime = DateTimeOffset.UtcNow.AddMinutes(5)
        };

        // Retry policy for transient failures
        var retryPolicy = Policy
            .Handle<ServiceBusException>(ex => ex.IsTransient)
            .WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
                (ex, delay, attempt, ctx) =>
                {
                    _logger.LogWarning(ex,
                        "Send attempt {Attempt} failed, retrying in {Delay}ms",
                        attempt, delay.TotalMilliseconds);
                });

        await retryPolicy.ExecuteAsync(async () =>
        {
            await _sender.SendMessageAsync(message);
            _logger.LogInformation("Order {OrderId} sent successfully", order.Id);
        });
    }

    // Batch sending for high throughput
    public async Task SendOrderBatchAsync(IEnumerable<Order> orders)
    {
        using ServiceBusMessageBatch batch = await _sender.CreateMessageBatchAsync();

        foreach (var order in orders)
        {
            var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(order))
            {
                MessageId = $"order-{order.Id}-{order.Version}"
            };

            if (!batch.TryAddMessage(message))
            {
                // Batch is full, send it and create new batch
                await _sender.SendMessagesAsync(batch);
                batch = await _sender.CreateMessageBatchAsync();

                if (!batch.TryAddMessage(message))
                {
                    throw new Exception($"Message too large for batch: {order.Id}");
                }
            }
        }

        if (batch.Count > 0)
        {
            await _sender.SendMessagesAsync(batch);
        }
    }
}

Step 4: Implement Reliable Message Processor

// Reliable processor with proper error handling
public class OrderProcessor : BackgroundService
{
    private readonly ServiceBusProcessor _processor;
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<OrderProcessor> _logger;

    public OrderProcessor(ServiceBusClient client, IServiceProvider serviceProvider,
        ILogger<OrderProcessor> logger)
    {
        _serviceProvider = serviceProvider;
        _logger = logger;

        _processor = client.CreateProcessor("orders-queue", new ServiceBusProcessorOptions
        {
            // PeekLock mode (default) - explicit completion required
            ReceiveMode = ServiceBusReceiveMode.PeekLock,

            // Process multiple messages concurrently
            MaxConcurrentCalls = 10,

            // Auto-renew lock for long processing
            MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(30),

            // Prefetch for performance
            PrefetchCount = 20
        });
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _processor.ProcessMessageAsync += ProcessMessageAsync;
        _processor.ProcessErrorAsync += ProcessErrorAsync;

        await _processor.StartProcessingAsync(stoppingToken);

        // Keep running until cancellation
        await Task.Delay(Timeout.Infinite, stoppingToken);
    }

    private async Task ProcessMessageAsync(ProcessMessageEventArgs args)
    {
        var orderId = args.Message.MessageId;
        var deliveryCount = args.Message.DeliveryCount;

        _logger.LogInformation(
            "Processing order {OrderId}, attempt {DeliveryCount}",
            orderId, deliveryCount);

        try
        {
            var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToArray());

            using var scope = _serviceProvider.CreateScope();
            var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>();

            // Process with idempotency check
            await orderService.ProcessOrderAsync(order);

            // Explicitly complete the message
            await args.CompleteMessageAsync(args.Message);

            _logger.LogInformation("Order {OrderId} completed successfully", orderId);
        }
        catch (DuplicateOrderException ex)
        {
            // Already processed - complete the message to remove it
            _logger.LogWarning("Duplicate order {OrderId}, completing message", orderId);
            await args.CompleteMessageAsync(args.Message);
        }
        catch (InvalidOrderException ex)
        {
            // Business validation failed - dead letter immediately
            _logger.LogError(ex, "Invalid order {OrderId}, dead-lettering", orderId);
            await args.DeadLetterMessageAsync(args.Message,
                deadLetterReason: "InvalidOrder",
                deadLetterErrorDescription: ex.Message);
        }
        catch (TransientException ex) when (deliveryCount < 5)
        {
            // Transient error - abandon to retry
            _logger.LogWarning(ex,
                "Transient error processing {OrderId}, abandoning for retry", orderId);
            await args.AbandonMessageAsync(args.Message);
        }
        catch (Exception ex)
        {
            // Unexpected error - decide based on delivery count
            if (deliveryCount >= 10)
            {
                _logger.LogError(ex,
                    "Max delivery count reached for {OrderId}, dead-lettering", orderId);
                await args.DeadLetterMessageAsync(args.Message,
                    deadLetterReason: "MaxDeliveryCountExceeded",
                    deadLetterErrorDescription: ex.Message);
            }
            else
            {
                _logger.LogWarning(ex,
                    "Error processing {OrderId}, abandoning for retry", orderId);
                await args.AbandonMessageAsync(args.Message);
            }
        }
    }

    private Task ProcessErrorAsync(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception,
            "Service Bus error. Source: {Source}, Entity: {Entity}",
            args.ErrorSource, args.EntityPath);
        return Task.CompletedTask;
    }
}

Step 5: Implement Idempotent Processing

// Idempotent order processing to handle duplicates
public class OrderService : IOrderService
{
    private readonly ApplicationDbContext _context;

    public async Task ProcessOrderAsync(Order order)
    {
        // Check if already processed (idempotency key)
        var existingOrder = await _context.Orders
            .FirstOrDefaultAsync(o => o.Id == order.Id);

        if (existingOrder != null)
        {
            if (existingOrder.Version >= order.Version)
            {
                throw new DuplicateOrderException(order.Id);
            }
            // Newer version - update
            _context.Entry(existingOrder).CurrentValues.SetValues(order);
        }
        else
        {
            // New order
            _context.Orders.Add(order);
        }

        // Use database transaction for atomicity
        using var transaction = await _context.Database.BeginTransactionAsync();

        try
        {
            await _context.SaveChangesAsync();

            // Record processing in idempotency table
            await _context.ProcessedMessages.AddAsync(new ProcessedMessage
            {
                MessageId = $"order-{order.Id}-{order.Version}",
                ProcessedAt = DateTime.UtcNow
            });

            await _context.SaveChangesAsync();
            await transaction.CommitAsync();
        }
        catch
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

Step 6: Dead Letter Queue Processing

// Process dead letter queue for investigation and retry
public class DeadLetterProcessor : BackgroundService
{
    private readonly ServiceBusReceiver _dlqReceiver;
    private readonly ServiceBusSender _sender;
    private readonly ILogger<DeadLetterProcessor> _logger;

    public DeadLetterProcessor(ServiceBusClient client, ILogger<DeadLetterProcessor> logger)
    {
        _logger = logger;

        // Create receiver for dead letter sub-queue
        _dlqReceiver = client.CreateReceiver(
            "orders-queue",
            new ServiceBusReceiverOptions
            {
                SubQueue = SubQueue.DeadLetter,
                ReceiveMode = ServiceBusReceiveMode.PeekLock
            });

        _sender = client.CreateSender("orders-queue");
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var messages = await _dlqReceiver.ReceiveMessagesAsync(
                maxMessages: 10,
                maxWaitTime: TimeSpan.FromSeconds(30),
                cancellationToken: stoppingToken);

            foreach (var message in messages)
            {
                var reason = message.DeadLetterReason;
                var description = message.DeadLetterErrorDescription;

                _logger.LogWarning(
                    "Dead letter message {MessageId}. Reason: {Reason}, Description: {Description}",
                    message.MessageId, reason, description);

                // Decide whether to retry or discard
                if (ShouldRetry(reason))
                {
                    // Create new message for retry
                    var retryMessage = new ServiceBusMessage(message.Body)
                    {
                        MessageId = $"{message.MessageId}-retry",
                        ApplicationProperties = { ["RetryCount"] = GetRetryCount(message) + 1 }
                    };

                    await _sender.SendMessageAsync(retryMessage);
                    _logger.LogInformation("Retrying message {MessageId}", message.MessageId);
                }
                else
                {
                    // Log for manual investigation
                    await LogToInvestigationTableAsync(message);
                }

                await _dlqReceiver.CompleteMessageAsync(message);
            }

            await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
        }
    }

    private bool ShouldRetry(string reason)
    {
        return reason switch
        {
            "TransientError" => true,
            "MaxDeliveryCountExceeded" => true,  // May have been a temporary issue
            "InvalidOrder" => false,              // Business error, won't succeed
            _ => false
        };
    }
}

Message Flow Summary

Reliable Message Flow:
                              ┌─────────────────────────────┐
                              │     Dead Letter Queue       │
                              │  (Monitor + Alert + Retry)  │
                              └──────────────▲──────────────┘
                                             │ Max Delivery
                                             │ or Explicit DLQ
┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────────────┐   ┌─────────┐
│   API   │──▶│  Queue  │──▶│ PeekLock│──▶│    Processor    │──▶│Database │
│ (Send)  │   │         │   │         │   │ (Idempotent)    │   │         │
└─────────┘   └─────────┘   └────┬────┘   └────────┬────────┘   └─────────┘
     │              ▲            │                 │
     │              │            │   Complete      │
     │              └────────────┴─────────────────┘
     │                           │
     │        Duplicate          │   Abandon (retry)
     └──────Detection────────────┘

Service Bus Reliability Checklist

SettingValuePurpose
ReceiveModePeekLockDon’t delete until processed
DuplicateDetectionEnabledPrevent duplicate sends
MaxDeliveryCount10Retry before dead-letter
LockDuration5 minMatch processing time
DeadLetterOnExpirationtrueDon’t lose expired messages

Practice Question

Why should you use PeekLock mode instead of ReceiveAndDelete mode for critical messages?