Contents
- Dependency & Auto-Configuration
- Basic @SqsListener
- Batch Message Processing
- Visibility Timeout & Acknowledgement
- Dead Letter Queue (DLQ)
- FIFO Queues & Message Groups
- Sending Messages with SqsTemplate
- Idempotent Consumer Pattern
- Local Testing with LocalStack
Spring Cloud AWS 3.x aligns with Spring Boot 3 and uses the AWS SDK v2. The spring-cloud-aws-starter-sqs auto-configures the SqsAsyncClient, message converters, and the listener container factory.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>3.2.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
</dependencies>
# application.yml
spring:
cloud:
aws:
region:
static: us-east-1
credentials:
access-key: ${AWS_ACCESS_KEY_ID}
secret-key: ${AWS_SECRET_ACCESS_KEY}
sqs:
listener:
max-concurrent-messages: 10 # parallel messages per listener
max-messages-per-poll: 10 # SQS ReceiveMessage batch size (max 10)
poll-timeout: 20s # long polling timeout
@SqsListener annotates a method that is invoked once per SQS message. Spring Cloud AWS deserialises the message body to the method parameter type using Jackson by default. Successful method return automatically deletes the message from the queue.
@Component
public class OrderListener {
private static final Logger log = LoggerFactory.getLogger(OrderListener.class);
// Simplest form — Jackson deserialises JSON body to Order
@SqsListener("order-processing-queue")
public void processOrder(Order order) {
log.info("Processing order: {}", order.getId());
orderService.process(order);
// method returns normally → message auto-deleted from queue
}
// Access SQS message headers (messageId, receiptHandle, attributes)
@SqsListener("notification-queue")
public void handleNotification(
@Payload Notification notification,
@Header("SenderId") String senderId,
Message<Notification> rawMessage) {
log.info("Received from {} — messageId: {}",
senderId, rawMessage.getHeaders().get(SqsHeaders.SQS_MESSAGE_ID_HEADER));
notificationService.send(notification);
}
}
If the listener method throws an exception, Spring Cloud AWS does not delete the message. The message becomes visible again after the visibility timeout expires — allowing automatic retries up to the queue's Maximum Receives setting before routing to the DLQ.
Batch listeners receive a List of messages in a single invocation, up to the max-messages-per-poll setting (max 10). This is more efficient for high-throughput scenarios because it reduces the number of listener container iterations and allows bulk database or API operations.
@Component
public class EventBatchListener {
@SqsListener(value = "analytics-events-queue", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL)
public void processBatch(List<Message<AnalyticsEvent>> messages,
Acknowledgement acknowledgement) {
List<AnalyticsEvent> events = messages.stream()
.map(Message::getPayload)
.collect(Collectors.toList());
try {
analyticsService.bulkInsert(events); // single DB round-trip
// Acknowledge all messages in the batch at once
acknowledgement.acknowledge();
} catch (Exception e) {
log.error("Batch failed — messages will re-appear after visibility timeout", e);
// Do NOT call acknowledge() — messages remain in queue for retry
}
}
}
When SQS delivers a message to a consumer, it becomes invisible to other consumers for the visibility timeout period (default 30 seconds). If the consumer does not delete the message (acknowledge it) before the timeout, SQS makes it visible again for redelivery. This is the mechanism behind at-least-once delivery.
- Set the visibility timeout longer than your worst-case processing time — if your handler takes up to 10 s, set at least 60 s.
- Use manual acknowledgement to control exactly when a message is deleted, especially when processing involves multiple external calls.
- For long-running handlers, extend the visibility timeout programmatically using SqsAsyncClient.changeMessageVisibility() to avoid premature redelivery.
@Component
public class LongRunningListener {
@Autowired
private SqsAsyncClient sqsClient;
@SqsListener(value = "long-job-queue", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL)
public void processLongJob(Job job, Message<Job> message, Acknowledgement ack) {
String receiptHandle = (String) message.getHeaders()
.get(SqsHeaders.SQS_RECEIPT_HANDLE_HEADER);
String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/long-job-queue";
// Extend visibility by 60s every 45s while processing
ScheduledFuture<?> extender = scheduler.scheduleAtFixedRate(() ->
sqsClient.changeMessageVisibility(r -> r
.queueUrl(queueUrl)
.receiptHandle(receiptHandle)
.visibilityTimeout(60)),
45, 45, TimeUnit.SECONDS);
try {
jobService.execute(job); // may take minutes
ack.acknowledge();
} finally {
extender.cancel(false);
}
}
}
A Dead Letter Queue receives messages that failed processing more than maxReceiveCount times. Configuring a DLQ is essential for production — without one, poison messages loop indefinitely consuming processing capacity. Set up an alarm on DLQ depth to get notified when messages start failing.
# CloudFormation / SAM — queue with DLQ wired via RedrivePolicy
Resources:
OrderQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: order-processing-queue
VisibilityTimeout: 60
RedrivePolicy:
deadLetterTargetArn: !GetAtt OrderDLQ.Arn
maxReceiveCount: 3 # move to DLQ after 3 failed attempts
OrderDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: order-processing-queue-dlq
MessageRetentionPeriod: 1209600 # 14 days
DLQAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: OrderDLQ-NotEmpty
MetricName: ApproximateNumberOfMessagesVisible
Namespace: AWS/SQS
Dimensions:
- Name: QueueName
Value: order-processing-queue-dlq
Threshold: 1
ComparisonOperator: GreaterThanOrEqualToThreshold
EvaluationPeriods: 1
AlarmActions:
- !Ref AlertSNSTopic
// DLQ consumer — logs, stores to DB, and sends alert
@Component
public class OrderDlqListener {
@SqsListener("order-processing-queue-dlq")
public void handleDeadLetter(String rawBody, Message<String> message) {
String messageId = (String) message.getHeaders()
.get(SqsHeaders.SQS_MESSAGE_ID_HEADER);
log.error("DLQ message received — id: {}, body: {}", messageId, rawBody);
failedMessageRepository.save(new FailedMessage(messageId, rawBody, Instant.now()));
alertService.sendDlqAlert(messageId, rawBody);
// Message is acknowledged (deleted from DLQ) after this method returns
}
}
FIFO (First-In-First-Out) queues guarantee ordering and exactly-once processing within a message group. Messages in the same group are delivered in order; groups are processed in parallel. Use FIFO queues when processing order within a logical entity matters (e.g., all events for the same customer ID).
@Component
public class CustomerEventListener {
// FIFO queue name must end with .fifo
@SqsListener("customer-events.fifo")
public void handleCustomerEvent(CustomerEvent event) {
// Messages for the same customerId (MessageGroupId) arrive in order
customerEventService.apply(event);
}
}
// Sending to a FIFO queue — MessageGroupId and MessageDeduplicationId are required
@Service
public class CustomerEventPublisher {
@Autowired
private SqsTemplate sqsTemplate;
public void publish(CustomerEvent event) {
sqsTemplate.send(to -> to
.queue("customer-events.fifo")
.payload(event)
.messageGroupId(String.valueOf(event.getCustomerId())) // ordering key
.messageDeduplicationId(event.getEventId()) // idempotency key
);
}
}
FIFO queues have a maximum throughput of 3,000 messages/second with batching (300 without). For higher throughput, partition your message groups carefully or use standard queues with application-level ordering.
SqsTemplate is the high-level send API from Spring Cloud AWS 3.x. It serialises the payload to JSON, sets content-type headers, and supports deferred queue URL resolution by name.
@Service
public class OrderPublisher {
private final SqsTemplate sqsTemplate;
public OrderPublisher(SqsTemplate sqsTemplate) {
this.sqsTemplate = sqsTemplate;
}
// Send a single message
public void submitOrder(Order order) {
sqsTemplate.send("order-processing-queue", order);
}
// Send with custom attributes (SQS message attributes — searchable metadata)
public void submitPriorityOrder(Order order) {
sqsTemplate.send(to -> to
.queue("order-processing-queue")
.payload(order)
.header("Priority", "HIGH")
.header("Source", "web-checkout")
.delaySeconds(0)
);
}
// Send a batch (up to 10 messages, reduces API calls)
public void submitBatch(List<Order> orders) {
sqsTemplate.sendMany("order-processing-queue",
orders.stream().map(SqsTemplate::wrap).collect(Collectors.toList()));
}
}
SQS standard queues guarantee at-least-once delivery — a message may be delivered more than once in rare network conditions. Your consumer must be idempotent: processing the same message twice must produce the same result as processing it once. The standard approach is a deduplication table.
@Component
public class IdempotentOrderListener {
private final ProcessedMessageRepository processedRepo;
private final OrderService orderService;
@SqsListener("order-processing-queue")
@Transactional
public void processOrder(Order order, Message<Order> message) {
String messageId = (String) message.getHeaders()
.get(SqsHeaders.SQS_MESSAGE_ID_HEADER);
// Check if already processed — prevents duplicate processing
if (processedRepo.existsByMessageId(messageId)) {
log.info("Duplicate message {} — skipping", messageId);
return; // message will be acknowledged and deleted
}
orderService.process(order);
// Mark as processed in the same transaction as business logic
processedRepo.save(new ProcessedMessage(messageId, Instant.now()));
}
}
@Entity
public class ProcessedMessage {
@Id
private String messageId;
private Instant processedAt;
// constructor, getters...
}
LocalStack runs AWS services locally in Docker, including SQS. Combined with Testcontainers, you can write integration tests that create real queues, send messages, and verify your listeners — without an AWS account or internet access.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
@SpringBootTest
@Testcontainers
class OrderListenerIntegrationTest {
@Container
static LocalStackContainer localstack = new LocalStackContainer(
DockerImageName.parse("localstack/localstack:3.5"))
.withServices(LocalStackContainer.Service.SQS);
@DynamicPropertySource
static void properties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.sqs.endpoint",
() -> localstack.getEndpointOverride(LocalStackContainer.Service.SQS));
registry.add("spring.cloud.aws.credentials.access-key", () -> "test");
registry.add("spring.cloud.aws.credentials.secret-key", () -> "test");
registry.add("spring.cloud.aws.region.static", () -> localstack.getRegion());
}
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private OrderRepository orderRepository;
@BeforeEach
void createQueue() throws Exception {
localstack.execInContainer("awslocal", "sqs", "create-queue",
"--queue-name", "order-processing-queue");
}
@Test
void listenerProcessesOrder() throws Exception {
Order order = new Order(UUID.randomUUID(), "item-99", BigDecimal.TEN);
sqsTemplate.send("order-processing-queue", order);
// Wait up to 10 seconds for the listener to process the message
await().atMost(10, TimeUnit.SECONDS)
.until(() -> orderRepository.existsById(order.getId()));
}
}