Mastering AWS SQS: Building Reliable Message Queues
Deep dive into AWS SQS for building resilient, scalable message queue systems with dead letter queues, FIFO ordering, and batch processing.
Dibyank Padhy
Engineering Manager & Full Stack Developer
Mastering AWS SQS: Building Reliable Message Queues
Amazon Simple Queue Service (SQS) is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications. In this comprehensive guide, we'll explore advanced patterns for building production-ready queue systems.
Standard vs FIFO Queues
Understanding when to use each queue type is crucial:
Standard Queues offer maximum throughput (nearly unlimited transactions per second), at-least-once delivery, and best-effort ordering. FIFO Queues guarantee exactly-once processing and strict ordering, but are limited to 3,000 messages per second with batching.
import {
SQSClient,
CreateQueueCommand,
SendMessageCommand,
SendMessageBatchCommand,
ReceiveMessageCommand,
DeleteMessageCommand,
GetQueueAttributesCommand
} from '@aws-sdk/client-sqs';
const sqsClient = new SQSClient({ region: 'us-east-1' });
// Create a Standard Queue with Dead Letter Queue
async function createQueueWithDLQ(queueName: string) {
// First, create the Dead Letter Queue
const dlqResponse = await sqsClient.send(new CreateQueueCommand({
QueueName: `${queueName}-dlq`,
Attributes: {
MessageRetentionPeriod: '1209600', // 14 days
},
}));
// Get DLQ ARN
const dlqAttributes = await sqsClient.send(new GetQueueAttributesCommand({
QueueUrl: dlqResponse.QueueUrl,
AttributeNames: ['QueueArn'],
}));
// Create main queue with DLQ redrive policy
const mainQueueResponse = await sqsClient.send(new CreateQueueCommand({
QueueName: queueName,
Attributes: {
DelaySeconds: '0',
MessageRetentionPeriod: '345600', // 4 days
VisibilityTimeout: '30',
RedrivePolicy: JSON.stringify({
deadLetterTargetArn: dlqAttributes.Attributes?.QueueArn,
maxReceiveCount: 3, // Move to DLQ after 3 failed attempts
}),
},
}));
return {
mainQueueUrl: mainQueueResponse.QueueUrl,
dlqUrl: dlqResponse.QueueUrl,
};
}
// Create a FIFO Queue for ordered processing
async function createFIFOQueue(queueName: string) {
const response = await sqsClient.send(new CreateQueueCommand({
QueueName: `${queueName}.fifo`, // FIFO queues must end with .fifo
Attributes: {
FifoQueue: 'true',
ContentBasedDeduplication: 'true', // Auto-generate deduplication ID from content
DeduplicationScope: 'messageGroup', // Deduplication per message group
FifoThroughputLimit: 'perMessageGroupId', // Higher throughput
},
}));
return response.QueueUrl;
}Batch Processing for High Throughput
Batch operations significantly improve throughput and reduce costs. Here's an optimized batch processor:
interface QueueMessage<T> {
id: string;
body: T;
attributes?: Record<string, string>;
groupId?: string; // For FIFO queues
deduplicationId?: string;
}
class BatchMessageProcessor<T> {
private buffer: QueueMessage<T>[] = [];
private readonly maxBatchSize = 10; // SQS limit
private readonly flushInterval: number;
private flushTimer: NodeJS.Timeout | null = null;
constructor(
private queueUrl: string,
private client: SQSClient,
flushIntervalMs: number = 1000
) {
this.flushInterval = flushIntervalMs;
}
async add(message: QueueMessage<T>): Promise<void> {
this.buffer.push(message);
if (this.buffer.length >= this.maxBatchSize) {
await this.flush();
} else if (!this.flushTimer) {
this.flushTimer = setTimeout(() => this.flush(), this.flushInterval);
}
}
async flush(): Promise<void> {
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
if (this.buffer.length === 0) return;
const batch = this.buffer.splice(0, this.maxBatchSize);
const command = new SendMessageBatchCommand({
QueueUrl: this.queueUrl,
Entries: batch.map(msg => ({
Id: msg.id,
MessageBody: JSON.stringify(msg.body),
MessageAttributes: msg.attributes ?
Object.entries(msg.attributes).reduce((acc, [key, value]) => ({
...acc,
[key]: { DataType: 'String', StringValue: value },
}), {}) : undefined,
MessageGroupId: msg.groupId,
MessageDeduplicationId: msg.deduplicationId,
})),
});
const response = await this.client.send(command);
// Handle partial failures
if (response.Failed && response.Failed.length > 0) {
console.error('Failed messages:', response.Failed);
// Retry failed messages
for (const failed of response.Failed) {
const originalMessage = batch.find(m => m.id === failed.Id);
if (originalMessage) {
this.buffer.unshift(originalMessage); // Add back to front for retry
}
}
}
// Recursively flush if more messages in buffer
if (this.buffer.length > 0) {
await this.flush();
}
}
async close(): Promise<void> {
await this.flush();
}
}
// Usage
const processor = new BatchMessageProcessor<{ userId: string; action: string }>(
'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
sqsClient
);
// Add messages - they'll be batched automatically
await processor.add({
id: '1',
body: { userId: 'user-123', action: 'signup' }
});Consumer with Graceful Shutdown
Building a robust consumer that handles signals and processes messages reliably:
type MessageHandler<T> = (message: T) => Promise<void>;
class SQSConsumer<T> {
private isRunning = false;
private activeProcessing = 0;
private readonly maxConcurrency: number;
constructor(
private queueUrl: string,
private client: SQSClient,
private handler: MessageHandler<T>,
options: { maxConcurrency?: number } = {}
) {
this.maxConcurrency = options.maxConcurrency || 10;
this.setupGracefulShutdown();
}
private setupGracefulShutdown() {
const shutdown = async (signal: string) => {
console.log(`Received ${signal}, starting graceful shutdown...`);
this.isRunning = false;
// Wait for active processing to complete
while (this.activeProcessing > 0) {
console.log(`Waiting for ${this.activeProcessing} messages to complete...`);
await new Promise(resolve => setTimeout(resolve, 1000));
}
console.log('Graceful shutdown complete');
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
}
async start(): Promise<void> {
this.isRunning = true;
console.log(`Starting SQS consumer for ${this.queueUrl}`);
while (this.isRunning) {
// Wait if at max concurrency
while (this.activeProcessing >= this.maxConcurrency && this.isRunning) {
await new Promise(resolve => setTimeout(resolve, 100));
}
if (!this.isRunning) break;
try {
const response = await this.client.send(new ReceiveMessageCommand({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: Math.min(10, this.maxConcurrency - this.activeProcessing),
WaitTimeSeconds: 20, // Long polling
AttributeNames: ['All'],
MessageAttributeNames: ['All'],
}));
if (response.Messages) {
// Process messages concurrently
const promises = response.Messages.map(msg => this.processMessage(msg));
// Don't await - let them process in parallel
Promise.all(promises).catch(console.error);
}
} catch (error) {
console.error('Error receiving messages:', error);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
private async processMessage(message: any): Promise<void> {
this.activeProcessing++;
try {
const body: T = JSON.parse(message.Body);
await this.handler(body);
// Delete message after successful processing
await this.client.send(new DeleteMessageCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
}));
} catch (error) {
console.error(`Error processing message ${message.MessageId}:`, error);
// Message will return to queue after visibility timeout
} finally {
this.activeProcessing--;
}
}
stop(): void {
this.isRunning = false;
}
}
// Usage
const consumer = new SQSConsumer<{ userId: string; action: string }>(
'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
sqsClient,
async (message) => {
console.log(`Processing: ${message.userId} - ${message.action}`);
// Your processing logic here
},
{ maxConcurrency: 5 }
);
consumer.start();Dead Letter Queue Processing
Implement a DLQ processor to handle failed messages:
interface DLQMessage<T> {
originalMessage: T;
failureReason: string;
attemptCount: number;
firstFailedAt: string;
lastFailedAt: string;
}
class DLQProcessor<T> {
constructor(
private dlqUrl: string,
private mainQueueUrl: string,
private client: SQSClient
) {}
async processDeadLetters(
analyzer: (message: T) => Promise<'retry' | 'discard' | 'manual'>
): Promise<{ retried: number; discarded: number; manual: number }> {
const stats = { retried: 0, discarded: 0, manual: 0 };
while (true) {
const response = await this.client.send(new ReceiveMessageCommand({
QueueUrl: this.dlqUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 5,
AttributeNames: ['ApproximateReceiveCount'],
}));
if (!response.Messages || response.Messages.length === 0) break;
for (const message of response.Messages) {
const body: T = JSON.parse(message.Body!);
const decision = await analyzer(body);
switch (decision) {
case 'retry':
// Send back to main queue
await this.client.send(new SendMessageCommand({
QueueUrl: this.mainQueueUrl,
MessageBody: message.Body!,
MessageAttributes: {
RetryFromDLQ: { DataType: 'String', StringValue: 'true' },
OriginalMessageId: { DataType: 'String', StringValue: message.MessageId! },
},
}));
stats.retried++;
break;
case 'discard':
// Log and delete
console.log(`Discarding message: ${message.MessageId}`);
stats.discarded++;
break;
case 'manual':
// Leave in DLQ for manual review
stats.manual++;
continue; // Don't delete
}
// Delete from DLQ
await this.client.send(new DeleteMessageCommand({
QueueUrl: this.dlqUrl,
ReceiptHandle: message.ReceiptHandle!,
}));
}
}
return stats;
}
// Redrive all messages back to main queue
async redriveAll(): Promise<number> {
let count = 0;
while (true) {
const response = await this.client.send(new ReceiveMessageCommand({
QueueUrl: this.dlqUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 0,
}));
if (!response.Messages || response.Messages.length === 0) break;
const batch = response.Messages.map((msg, idx) => ({
Id: idx.toString(),
MessageBody: msg.Body!,
}));
await this.client.send(new SendMessageBatchCommand({
QueueUrl: this.mainQueueUrl,
Entries: batch,
}));
// Delete from DLQ
await Promise.all(response.Messages.map(msg =>
this.client.send(new DeleteMessageCommand({
QueueUrl: this.dlqUrl,
ReceiptHandle: msg.ReceiptHandle!,
}))
));
count += response.Messages.length;
}
return count;
}
}Conclusion
AWS SQS provides a powerful foundation for building distributed systems. By implementing proper batching, consumer patterns, and dead letter queue handling, you can build highly reliable message-driven architectures that scale effortlessly.