All Articles
Cloud & DevOps
15 min read

Mastering AWS SQS: Building Reliable Message Queues

Deep dive into AWS SQS for building resilient, scalable message queue systems with dead letter queues, FIFO ordering, and batch processing.

Mastering AWS SQS: Building Reliable Message Queues
DP

Dibyank Padhy

Engineering Manager & Full Stack Developer

Mastering AWS SQS: Building Reliable Message Queues

Amazon Simple Queue Service (SQS) is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications. In this comprehensive guide, we'll explore advanced patterns for building production-ready queue systems.

Standard vs FIFO Queues

Understanding when to use each queue type is crucial:

Standard Queues offer maximum throughput (nearly unlimited transactions per second), at-least-once delivery, and best-effort ordering. FIFO Queues guarantee exactly-once processing and strict ordering, but are limited to 3,000 messages per second with batching.

typescript
import {
  SQSClient,
  CreateQueueCommand,
  SendMessageCommand,
  SendMessageBatchCommand,
  ReceiveMessageCommand,
  DeleteMessageCommand,
  GetQueueAttributesCommand
} from '@aws-sdk/client-sqs';

const sqsClient = new SQSClient({ region: 'us-east-1' });

// Create a Standard Queue with Dead Letter Queue
async function createQueueWithDLQ(queueName: string) {
  // First, create the Dead Letter Queue
  const dlqResponse = await sqsClient.send(new CreateQueueCommand({
    QueueName: `${queueName}-dlq`,
    Attributes: {
      MessageRetentionPeriod: '1209600', // 14 days
    },
  }));

  // Get DLQ ARN
  const dlqAttributes = await sqsClient.send(new GetQueueAttributesCommand({
    QueueUrl: dlqResponse.QueueUrl,
    AttributeNames: ['QueueArn'],
  }));

  // Create main queue with DLQ redrive policy
  const mainQueueResponse = await sqsClient.send(new CreateQueueCommand({
    QueueName: queueName,
    Attributes: {
      DelaySeconds: '0',
      MessageRetentionPeriod: '345600', // 4 days
      VisibilityTimeout: '30',
      RedrivePolicy: JSON.stringify({
        deadLetterTargetArn: dlqAttributes.Attributes?.QueueArn,
        maxReceiveCount: 3, // Move to DLQ after 3 failed attempts
      }),
    },
  }));

  return {
    mainQueueUrl: mainQueueResponse.QueueUrl,
    dlqUrl: dlqResponse.QueueUrl,
  };
}

// Create a FIFO Queue for ordered processing
async function createFIFOQueue(queueName: string) {
  const response = await sqsClient.send(new CreateQueueCommand({
    QueueName: `${queueName}.fifo`, // FIFO queues must end with .fifo
    Attributes: {
      FifoQueue: 'true',
      ContentBasedDeduplication: 'true', // Auto-generate deduplication ID from content
      DeduplicationScope: 'messageGroup', // Deduplication per message group
      FifoThroughputLimit: 'perMessageGroupId', // Higher throughput
    },
  }));

  return response.QueueUrl;
}

Batch Processing for High Throughput

Batch operations significantly improve throughput and reduce costs. Here's an optimized batch processor:

typescript
interface QueueMessage<T> {
  id: string;
  body: T;
  attributes?: Record<string, string>;
  groupId?: string; // For FIFO queues
  deduplicationId?: string;
}

class BatchMessageProcessor<T> {
  private buffer: QueueMessage<T>[] = [];
  private readonly maxBatchSize = 10; // SQS limit
  private readonly flushInterval: number;
  private flushTimer: NodeJS.Timeout | null = null;

  constructor(
    private queueUrl: string,
    private client: SQSClient,
    flushIntervalMs: number = 1000
  ) {
    this.flushInterval = flushIntervalMs;
  }

  async add(message: QueueMessage<T>): Promise<void> {
    this.buffer.push(message);

    if (this.buffer.length >= this.maxBatchSize) {
      await this.flush();
    } else if (!this.flushTimer) {
      this.flushTimer = setTimeout(() => this.flush(), this.flushInterval);
    }
  }

  async flush(): Promise<void> {
    if (this.flushTimer) {
      clearTimeout(this.flushTimer);
      this.flushTimer = null;
    }

    if (this.buffer.length === 0) return;

    const batch = this.buffer.splice(0, this.maxBatchSize);

    const command = new SendMessageBatchCommand({
      QueueUrl: this.queueUrl,
      Entries: batch.map(msg => ({
        Id: msg.id,
        MessageBody: JSON.stringify(msg.body),
        MessageAttributes: msg.attributes ?
          Object.entries(msg.attributes).reduce((acc, [key, value]) => ({
            ...acc,
            [key]: { DataType: 'String', StringValue: value },
          }), {}) : undefined,
        MessageGroupId: msg.groupId,
        MessageDeduplicationId: msg.deduplicationId,
      })),
    });

    const response = await this.client.send(command);

    // Handle partial failures
    if (response.Failed && response.Failed.length > 0) {
      console.error('Failed messages:', response.Failed);
      // Retry failed messages
      for (const failed of response.Failed) {
        const originalMessage = batch.find(m => m.id === failed.Id);
        if (originalMessage) {
          this.buffer.unshift(originalMessage); // Add back to front for retry
        }
      }
    }

    // Recursively flush if more messages in buffer
    if (this.buffer.length > 0) {
      await this.flush();
    }
  }

  async close(): Promise<void> {
    await this.flush();
  }
}

// Usage
const processor = new BatchMessageProcessor<{ userId: string; action: string }>(
  'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
  sqsClient
);

// Add messages - they'll be batched automatically
await processor.add({
  id: '1',
  body: { userId: 'user-123', action: 'signup' }
});

Consumer with Graceful Shutdown

Building a robust consumer that handles signals and processes messages reliably:

typescript
type MessageHandler<T> = (message: T) => Promise<void>;

class SQSConsumer<T> {
  private isRunning = false;
  private activeProcessing = 0;
  private readonly maxConcurrency: number;

  constructor(
    private queueUrl: string,
    private client: SQSClient,
    private handler: MessageHandler<T>,
    options: { maxConcurrency?: number } = {}
  ) {
    this.maxConcurrency = options.maxConcurrency || 10;
    this.setupGracefulShutdown();
  }

  private setupGracefulShutdown() {
    const shutdown = async (signal: string) => {
      console.log(`Received ${signal}, starting graceful shutdown...`);
      this.isRunning = false;

      // Wait for active processing to complete
      while (this.activeProcessing > 0) {
        console.log(`Waiting for ${this.activeProcessing} messages to complete...`);
        await new Promise(resolve => setTimeout(resolve, 1000));
      }

      console.log('Graceful shutdown complete');
      process.exit(0);
    };

    process.on('SIGTERM', () => shutdown('SIGTERM'));
    process.on('SIGINT', () => shutdown('SIGINT'));
  }

  async start(): Promise<void> {
    this.isRunning = true;
    console.log(`Starting SQS consumer for ${this.queueUrl}`);

    while (this.isRunning) {
      // Wait if at max concurrency
      while (this.activeProcessing >= this.maxConcurrency && this.isRunning) {
        await new Promise(resolve => setTimeout(resolve, 100));
      }

      if (!this.isRunning) break;

      try {
        const response = await this.client.send(new ReceiveMessageCommand({
          QueueUrl: this.queueUrl,
          MaxNumberOfMessages: Math.min(10, this.maxConcurrency - this.activeProcessing),
          WaitTimeSeconds: 20, // Long polling
          AttributeNames: ['All'],
          MessageAttributeNames: ['All'],
        }));

        if (response.Messages) {
          // Process messages concurrently
          const promises = response.Messages.map(msg => this.processMessage(msg));
          // Don't await - let them process in parallel
          Promise.all(promises).catch(console.error);
        }
      } catch (error) {
        console.error('Error receiving messages:', error);
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    }
  }

  private async processMessage(message: any): Promise<void> {
    this.activeProcessing++;

    try {
      const body: T = JSON.parse(message.Body);
      await this.handler(body);

      // Delete message after successful processing
      await this.client.send(new DeleteMessageCommand({
        QueueUrl: this.queueUrl,
        ReceiptHandle: message.ReceiptHandle,
      }));
    } catch (error) {
      console.error(`Error processing message ${message.MessageId}:`, error);
      // Message will return to queue after visibility timeout
    } finally {
      this.activeProcessing--;
    }
  }

  stop(): void {
    this.isRunning = false;
  }
}

// Usage
const consumer = new SQSConsumer<{ userId: string; action: string }>(
  'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
  sqsClient,
  async (message) => {
    console.log(`Processing: ${message.userId} - ${message.action}`);
    // Your processing logic here
  },
  { maxConcurrency: 5 }
);

consumer.start();

Dead Letter Queue Processing

Implement a DLQ processor to handle failed messages:

typescript
interface DLQMessage<T> {
  originalMessage: T;
  failureReason: string;
  attemptCount: number;
  firstFailedAt: string;
  lastFailedAt: string;
}

class DLQProcessor<T> {
  constructor(
    private dlqUrl: string,
    private mainQueueUrl: string,
    private client: SQSClient
  ) {}

  async processDeadLetters(
    analyzer: (message: T) => Promise<'retry' | 'discard' | 'manual'>
  ): Promise<{ retried: number; discarded: number; manual: number }> {
    const stats = { retried: 0, discarded: 0, manual: 0 };

    while (true) {
      const response = await this.client.send(new ReceiveMessageCommand({
        QueueUrl: this.dlqUrl,
        MaxNumberOfMessages: 10,
        WaitTimeSeconds: 5,
        AttributeNames: ['ApproximateReceiveCount'],
      }));

      if (!response.Messages || response.Messages.length === 0) break;

      for (const message of response.Messages) {
        const body: T = JSON.parse(message.Body!);
        const decision = await analyzer(body);

        switch (decision) {
          case 'retry':
            // Send back to main queue
            await this.client.send(new SendMessageCommand({
              QueueUrl: this.mainQueueUrl,
              MessageBody: message.Body!,
              MessageAttributes: {
                RetryFromDLQ: { DataType: 'String', StringValue: 'true' },
                OriginalMessageId: { DataType: 'String', StringValue: message.MessageId! },
              },
            }));
            stats.retried++;
            break;

          case 'discard':
            // Log and delete
            console.log(`Discarding message: ${message.MessageId}`);
            stats.discarded++;
            break;

          case 'manual':
            // Leave in DLQ for manual review
            stats.manual++;
            continue; // Don't delete
        }

        // Delete from DLQ
        await this.client.send(new DeleteMessageCommand({
          QueueUrl: this.dlqUrl,
          ReceiptHandle: message.ReceiptHandle!,
        }));
      }
    }

    return stats;
  }

  // Redrive all messages back to main queue
  async redriveAll(): Promise<number> {
    let count = 0;

    while (true) {
      const response = await this.client.send(new ReceiveMessageCommand({
        QueueUrl: this.dlqUrl,
        MaxNumberOfMessages: 10,
        WaitTimeSeconds: 0,
      }));

      if (!response.Messages || response.Messages.length === 0) break;

      const batch = response.Messages.map((msg, idx) => ({
        Id: idx.toString(),
        MessageBody: msg.Body!,
      }));

      await this.client.send(new SendMessageBatchCommand({
        QueueUrl: this.mainQueueUrl,
        Entries: batch,
      }));

      // Delete from DLQ
      await Promise.all(response.Messages.map(msg =>
        this.client.send(new DeleteMessageCommand({
          QueueUrl: this.dlqUrl,
          ReceiptHandle: msg.ReceiptHandle!,
        }))
      ));

      count += response.Messages.length;
    }

    return count;
  }
}

Conclusion

AWS SQS provides a powerful foundation for building distributed systems. By implementing proper batching, consumer patterns, and dead letter queue handling, you can build highly reliable message-driven architectures that scale effortlessly.

Found this helpful? Share it with others

DP

About the Author

Dibyank Padhy is an Engineering Manager & Full Stack Developer with 7+ years of experience building scalable software solutions. Passionate about cloud architecture, team leadership, and AI integration.