知識庫 / Spring / Spring Cloud RSS 訂閱

Spring Cloud AWS SQS v3 中消息確認

Spring Cloud
HongKong
5
11:17 AM · Dec 06 ,2025

1. 概述

確認消息是一種標準機制,在消息傳遞系統中用於通知消息代理消息已收到,且不應再次傳遞。在亞馬遜的 SQS(簡單隊列服務)中,確認消息通過在隊列中刪除消息來實現。

在本教程中,我們將探索 Spring Cloud AWS SQS v3 提供的三種默認確認模式:ON_SUCCESSMANUALALWAYS

我們將使用事件驅動的場景來演示我們的用例,並利用 Spring Cloud AWS SQS V3 介紹文章中的環境和測試設置。

2. 依賴項

我們首先導入 Spring Cloud AWS Bill of Materials 以確保我們 pom.xml 中的所有依賴項相互兼容:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>3.1.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

我們還會添加 Core 和 SQS 啓動依賴項:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter</artifactId>
</dependency>

最後,我們將添加用於測試所需的依賴項,包括 LocalStack 和 TestContainers,以及與 JUnit 5 配合使用的 awaitility 庫(用於驗證異步消息消費)和 AssertJ 庫(用於處理斷言):

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>localstack</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.assertj</groupId>
    <artifactId>assertj-core</artifactId>
    <scope>test</scope>
</dependency>

3. 設置本地測試環境

首先,我們將使用 Testcontainers 配置一個 LocalStack 測試環境以進行本地測試。

@Testcontainers
public class BaseSqsLiveTest {

    private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";

    @Container
    static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));

    @DynamicPropertySource
    static void overrideProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
        registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
        registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
        registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
          .toString());
    }
}

雖然這種設置使得測試變得容易且可重複,但請注意,本教程中的代碼也可用於直接針對 AWS 進行目標。

4. 設置隊列名稱

默認情況下,Spring Cloud AWS SQS 會自動創建在任何 @SqsListener 註解方法中指定的隊列。 作為初始設置步驟,我們將定義隊列名稱在我們的 application.yaml 文件中:

events:
  queues:
    order-processing-retry-queue: order_processing_retry_queue
    order-processing-async-queue: order_processing_async_queue
    order-processing-no-retries-queue: order_processing_no_retries_queue
  acknowledgment:
    order-processing-no-retries-queue: ALWAYS
<div>
 <p>感謝屬性 <em >ALWAYS </em > 也會被我們的其中一位聽眾使用。</p>
 <p>我們還將向同一文件中添加幾個 <em >productIds</em>,以便在我們的示例中使用:</p>
</div>
product:
  id:
    smartphone: 123e4567-e89b-12d3-a456-426614174000
    wireless-headphones: 123e4567-e89b-12d3-a456-426614174001
    laptop: 123e4567-e89b-12d3-a456-426614174002
    tablet: 123e4567-e89b-12d3-a456-426614174004
<div>
  為了在我們的應用程序中將這些屬性作為POJO,我們將創建兩個 @ConfigurationProperties 類,一個用於隊列:
</div>
<div>
@ConfigurationProperties(prefix = "events.queues")
public class EventsQueuesProperties {

    private String orderProcessingRetryQueue;

    private String orderProcessingAsyncQueue;

    private String orderProcessingNoRetriesQueue;

    // getters and setters
}

以下是產品列表:

@ConfigurationProperties("product.id")
public class ProductIdProperties {

    private UUID smartphone;

    private UUID wirelessHeadphones;

    private UUID laptop;

    // getters and setters
}

最後,我們通過在 @Configuration 類中使用 @EnableConfigurationProperties 來啓用配置屬性。

@EnableConfigurationProperties({ EventsQueuesProperties.class, ProductIdProperties.class})
@Configuration
public class OrderProcessingConfiguration {
}
<div>
  <h1>Introduction</h1>
  <p>This document provides an overview of the new API. It covers key features, usage examples, and troubleshooting tips.</p>

  <h2>Key Features</h2>
  <ul>
    <li><strong>Authentication:</strong> OAuth 2.0</li>
    <li><strong>Data Storage:</strong> NoSQL database</li>
    <li><strong>API Versioning:</strong> Semantic versioning (SemVer)</li>
  </ul>

  <h2>Usage Examples</h2>
  <pre>
    <code>
    // Example Python code
    import requests

    response = requests.get('https://api.example.com/data')
    print(response.json())
    </code>
  </pre>
  <p>This code snippet demonstrates how to fetch data from the API using the Python requests library.</p>

  <h2>Troubleshooting</h2>
  <h3>Common Issues</h3>
  <ol>
    <li><strong>Authentication Errors:</strong> Verify your API key and token.</li>
    <li><strong>Rate Limiting:</strong>  Respect the API rate limits.</li>
    <li><strong>Network Issues:</strong> Check your internet connection.</li>
  </ol>
</div>

5. 成功處理的確認

默認確認模式針對@SqsListenersON_SUCCESS。 在該模式下,如果監聽器方法執行完畢且未拋出錯誤,則消息將被確認。
為了説明這種行為,我們將創建一個簡單的監聽器,該監聽器將接收一個OrderCreatedEvent,檢查一個InventoryService,並在請求的項目和數量有庫存的情況下,將訂單狀態更改為PROCESSED

5.1. 創建服務

讓我們首先創建一個名為OrderService 的服務,該服務將負責更新訂單狀態:

@Service
public class OrderService {

    Map<UUID, OrderStatus> ORDER_STATUS_STORAGE = new ConcurrentHashMap<>();

    public void updateOrderStatus(UUID orderId, OrderStatus status) {
        ORDER_STATUS_STORAGE.put(orderId, status);
    }

    public OrderStatus getOrderStatus(UUID orderId) {
        return ORDER_STATUS_STORAGE.getOrDefault(orderId, OrderStatus.UNKNOWN);
    }
}

然後,我們將創建 InventoryService。 我們將使用 Map 模擬存儲,並使用 ProductIdProperties 填充它,該類已自動注入來自我們的 application.yaml 文件的值:

@Service
public class InventoryService implements InitializingBean {

    private ProductIdProperties productIdProperties;

    private Map<UUID, Integer> inventory;

    public InventoryService(ProductIdProperties productIdProperties) {
        this.productIdProperties = productIdProperties;
    }

    @Override
    public void afterPropertiesSet() {
        this.inventory = new ConcurrentHashMap<>(Map.of(productIdProperties.getSmartphone(), 10,
          productIdProperties.getWirelessHeadphones(), 15,
          productIdProperties.getLaptop(), 5);
    }
}

InitializingBean 接口提供afterPropertiesSet,這是一個生命週期方法,Spring 在所有依賴項(包括我們的ProductIdProperties 依賴項)解析完成後調用它。

讓我們添加一個checkInventory 方法,該方法驗證產品庫存是否具有請求數量。如果產品不存在,它將拋出ProductNotFoundException 異常;如果產品存在但數量不足,它將拋出OutOfStockException 異常。在第二個場景中,我們還將模擬隨機補貨,以便在幾次重試後,處理最終會成功:

public void checkInventory(UUID productId, int quantity) {
    Integer stock = inventory.get(productId);
    if (stock < quantity) {
        inventory.put(productId, stock + (int) (Math.random() * 5));
        throw new OutOfStockException(
          "Product with id %s is out of stock. Quantity requested: %s ".formatted(productId, quantity));
    };
    inventory.put(productId, stock - quantity);
}

5.2. 創建監聽器

我們現在將創建我們第一個監聽器。我們將使用 @Component 註解,並通過 Spring 的構造函數依賴注入機制注入服務:

@Component
public class OrderProcessingListeners {

    private static final Logger logger = LoggerFactory.getLogger(OrderProcessingListeners.class);

    private InventoryService inventoryService;

    private OrderService orderService;

    public OrderProcessingListeners(InventoryService inventoryService, OrderService orderService) {
        this.inventoryService = inventoryService;
        this.orderService = orderService;
    }
}

接下來,我們來編寫監聽器方法:

@SqsListener(value = "${events.queues.order-processing-retry-queue}", id = "retry-order-processing-container", messageVisibilitySeconds = "1")
public void stockCheckRetry(OrderCreatedEvent orderCreatedEvent) {
    logger.info("Message received: {}", orderCreatedEvent);
    orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
    inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());

    orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED);
    logger.info("Message processed successfully: {}", orderCreatedEvent);
}

The value 屬性通過 application.yaml 文件自動注入。由於 ON_SUCCESS 是默認確認模式,因此我們不需要在註解中指定它。

5.3. 設置測試類

為了驗證邏輯是否按預期工作,讓我們創建一個測試類:

@SpringBootTest
class OrderProcessingApplicationLiveTest extends BaseSqsLiveTest {

    private static final Logger logger = LoggerFactory.getLogger(OrderProcessingApplicationLiveTest.class);

    @Autowired
    private EventsQueuesProperties eventsQueuesProperties;

    @Autowired
    private ProductIdProperties productIdProperties;

    @Autowired
    private SqsTemplate sqsTemplate;

    @Autowired
    private OrderService orderService;

    @Autowired
    private MessageListenerContainerRegistry registry;
}

我們還將添加一個名為 assertQueueIsEmpty 的方法。在其中,我們將使用自動注入的 MessageListenerContainerRegistry 來獲取容器,然後停止容器以確保它不消耗任何消息。註冊器包含所有由 @SqsListener 註解創建的容器:

private void assertQueueIsEmpty(String queueName, String containerId) {
    logger.info("Stopping container {}", containerId);
    var container = Objects
      .requireNonNull(registry.getContainerById(containerId), () -> "could not find container " + containerId);
    container.stop();
    // ...
}

容器停止後,我們將使用 SqsTemplate 來查找隊列中的消息。如果確認已成功,則不應返回任何消息。我們還將設置 pollTimeout 為大於可見性超時值的數值,以便如果在消息未被刪除的情況下,它將在指定的時間間隔內再次交付。

以下是 assertQueueIsEmpty 方法的延續:

// ...
logger.info("Checking for messages in queue {}", queueName);
var message = sqsTemplate.receive(from -> from.queue(queueName)
  .pollTimeout(Duration.ofSeconds(5)));
assertThat(message).isEmpty();
logger.info("No messages found in queue {}", queueName);

5.4. 測試

在本次測試中,我們將向隊列發送一個 <em >OrderCreatedEvent </em >,其中包含一個數量大於我們庫存的Order 。當異常通過監聽器方法時,框架會發出信號,指示消息處理失敗,並且在 `消息可見性 時間窗口結束後,消息將被重新交付。

為了加快測試速度,<strong >我們設置了messageVisibilitySeconds 為 1,但通常,此配置在隊列本身中完成,默認值為 30 秒。

我們將使用 Spring Cloud AWS 提供的自動配置 <em >SqsTemplate </em >創建事件併發送它。然後,我們將使用Awaitility 等待訂單狀態變為 `PROCESSED ,最後,我們將斷言隊列為空,這意味着確認已成功。

@Test
public void givenOnSuccessAcknowledgementMode_whenProcessingThrows_shouldRetry() {
    var orderId = UUID.randomUUID();
    var queueName = eventsQueuesProperties.getOrderProcessingRetryQueue();
    sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getLaptop(), 10));
    Awaitility.await()
      .atMost(Duration.ofMinutes(1))
      .until(() -> orderService.getOrderStatus(orderId)
        .equals(OrderStatus.PROCESSED));
    assertQueueIsEmpty(queueName, "retry-order-processing-container");
}

請注意,我們正在將 containerId ,該值來自 @SqsListener 註解中指定的值,傳遞給 assertQueueIsEmpty 方法。

現在我們可以運行測試。首先,我們確保 Docker 正在運行,然後執行測試。在容器初始化日誌之後,我們應該看到應用程序的日誌消息:

Message received: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]

然後,可能會因為缺貨而出現一個或多個失敗:

Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174002 is out of stock. Quantity requested: 10 

由於我們已經添加了補貨邏輯,我們最終應該看到消息處理成功。

Message processed successfully: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]

最後,我們將確保確認已成功:

INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container retry-order-processing-container
INFO 2699 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container retry-order-processing-container stopped
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_retry_queue
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_retry_queue

請注意,“連接被拒絕”錯誤可能會在測試完成後出現——這是因為 Docker 容器在框架可以停止輪詢消息之前就停止了。我們可以安全地忽略這些錯誤。

6. 手動確認

該框架支持手動確認消息,這在我們需要對確認流程擁有更大的控制權的情況下非常有用。

6.1. 創建監聽器

為了説明這一點,我們將創建一個異步場景,其中 InventoryService 具有較慢的連接,並且我們希望在它完成之前釋放監聽器線程:

@SqsListener(value = "${events.queues.order-processing-async-queue}", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL, id = "async-order-processing-container", messageVisibilitySeconds = "3")
public void slowStockCheckAsynchronous(OrderCreatedEvent orderCreatedEvent, Acknowledgement acknowledgement) {
    orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
    CompletableFuture.runAsync(() -> inventoryService.slowCheckInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity()))
      .thenRun(() -> orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED))
      .thenCompose(voidFuture -> acknowledgement.acknowledgeAsync())
      .thenRun(() -> logger.info("Message for order {} acknowledged", orderCreatedEvent.id()));
    logger.info("Releasing processing thread.");
}

在這種邏輯中,我們使用 Java 的 CompletableFuture 來異步執行庫存檢查。我們添加了 Acknowledge 對象到監聽器方法,並將 SqsListenerAcknowledgementMode.MANUAL 屬性設置為註解的 acknowledgementMode 屬性。該屬性是一個 String 類型,可以接受屬性佔位符和 SpEL。 Acknowledgement 對象 僅在 我們將 AcknowledgementMode 設置為 MANUAL 時可用。

請注意,在這個例子中,我們利用了 Spring Boot 的自動配置,它提供了合理的默認值,以及 @SqsListener 註解的屬性,用於在確認模式之間切換。 另一種方法是聲明一個 SqsMessageListenerContainerFactory Bean,它允許設置更復雜的配置。

6.2. 模擬慢速連接

現在,讓我們將 slowCheckInventory 方法添加到 InventoryService 類中,使用 Thread.sleep 模擬慢速連接:

public void slowCheckInventory(UUID productId, int quantity) {
    simulateBusyConnection();
    checkInventory(productId, quantity);
}

private void simulateBusyConnection() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(e);
    }
}

6.3. 測試

接下來,我們編寫測試代碼:

@Test
public void givenManualAcknowledgementMode_whenManuallyAcknowledge_shouldAcknowledge() {
    var orderId = UUID.randomUUID();
    var queueName = eventsQueuesProperties.getOrderProcessingAsyncQueue();
    sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getSmartphone(), 1));
    Awaitility.await()
      .atMost(Duration.ofMinutes(1))
      .until(() -> orderService.getOrderStatus(orderId)
        .equals(OrderStatus.PROCESSED));
    assertQueueIsEmpty(queueName, "async-order-processing-container");
}

本次,我們要求查詢庫存中的數量,因此不應出現任何錯誤。

在運行測試時,我們將會看到一條日誌消息,指示消息已收到:

INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message received: OrderCreatedEvent[id=013740a3-0a45-478a-b085-fbd634fbe66d, productId=123e4567-e89b-12d3-a456-426614174000, quantity=1]

然後,我們將查看線程釋放消息:

INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Releasing processing thread.

這是因為我們正在異步處理和確認消息。 大約兩秒後,我們應該看到日誌表明消息已被確認:

INFO 2786 --- [onPool-worker-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message for order 013740a3-0a45-478a-b085-fbd634fbe66d acknowledged

最後,我們將查看停止容器並斷言隊列為空的日誌:

INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container async-order-processing-container
INFO 2786 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container async-order-processing-container stopped
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_async_queue
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_async_queue

7. 成功與錯誤時的確認

我們將探討的最後一種確認模式是 始終,它會導致框架無論監聽方法是否拋出錯誤,都 確認消息

7.1. 創建監聽器

讓我們模擬一場銷售活動,在此活動期間我們的庫存有限,並且我們不想重新處理任何消息,無論發生任何故障。我們將設置確認模式為 ALWAYS,使用我們在先前定義的 application.yml 屬性中定義的:

@SqsListener(value = "${events.queues.order-processing-no-retries-queue}", acknowledgementMode = ${events.acknowledgment.order-processing-no-retries-queue}, id = "no-retries-order-processing-container", messageVisibilitySeconds = "3")
public void stockCheckNoRetries(OrderCreatedEvent orderCreatedEvent) {
    logger.info("Message received: {}", orderCreatedEvent);
    orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.RECEIVED);
    inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());

    logger.info("Message processed: {}", orderCreatedEvent);
}

在測試中,我們將創建一個數量大於我們庫存的訂單:

7.2. 測試

This section covers the testing process for the system. Testing is a critical part of the development lifecycle and ensures that the system meets the specified requirements.

Types of Testing

  • Unit Testing: Testing individual components or modules of the system in isolation.
  • Integration Testing: Testing the interaction between different components or modules.
  • System Testing: Testing the entire system as a whole to ensure it meets the overall requirements.
  • User Acceptance Testing (UAT): Testing performed by end-users to verify that the system meets their needs and expectations.

Testing Process

  1. Test Planning: Define the scope of testing, identify test objectives, and create a test plan.
  2. Test Case Design: Develop detailed test cases based on the requirements and design specifications.
  3. Test Execution: Execute the test cases and record the results.
  4. Defect Reporting: Report any defects found during testing.
  5. Retesting: Verify that defects have been fixed.

Tools

Various testing tools are available to support the testing process. These tools can automate test execution, manage test cases, and generate reports.

@Test
public void givenAlwaysAcknowledgementMode_whenProcessThrows_shouldAcknowledge() {
    var orderId = UUID.randomUUID();
    var queueName = eventsQueuesProperties.getOrderProcessingNoRetriesQueue();
    sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getWirelessHeadphones(), 20));
    Awaitility.await()
      .atMost(Duration.ofMinutes(1))
      .until(() -> orderService.getOrderStatus(orderId)
        .equals(OrderStatus.RECEIVED));
    assertQueueIsEmpty(queueName, "no-retries-order-processing-container");
}

現在,即使當 OutOfStockException 被拋出,消息也會被承認,並且不會對該消息進行重試:

Message received: OrderCreatedEvent[id=7587f1a2-328f-4791-8559-ee8e85b25259, productId=123e4567-e89b-12d3-a456-426614174001, quantity=20]
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174001 is out of stock. Quantity requested: 20
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container no-retries-order-processing-container
INFO 2835 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container no-retries-order-processing-container stopped
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_no_retries_queue
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_no_retries_queue

8. 結論

在本文中,我們使用基於事件驅動的場景來展示 Spring Cloud AWS v3 SQS 集成提供的三種確認模式:ON_SUCCESS(默認)、MANUALALWAYS

我們利用了自動配置設置,並使用 @SqsListener 註解屬性來切換模式。我們還創建了實時測試,使用 Testcontainers 和 LocalStack 驗證行為。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.