1. 概述
確認消息是一種標準機制,在消息傳遞系統中用於通知消息代理消息已收到,且不應再次傳遞。在亞馬遜的 SQS(簡單隊列服務)中,確認消息通過在隊列中刪除消息來實現。
在本教程中,我們將探索 Spring Cloud AWS SQS v3 提供的三種默認確認模式:ON_SUCCESS、MANUAL 和 ALWAYS。
我們將使用事件驅動的場景來演示我們的用例,並利用 Spring Cloud AWS SQS V3 介紹文章中的環境和測試設置。
2. 依賴項
我們首先導入 Spring Cloud AWS Bill of Materials 以確保我們 pom.xml 中的所有依賴項相互兼容:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>我們還會添加 Core 和 SQS 啓動依賴項:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>最後,我們將添加用於測試所需的依賴項,包括 LocalStack 和 TestContainers,以及與 JUnit 5 配合使用的 awaitility 庫(用於驗證異步消息消費)和 AssertJ 庫(用於處理斷言):
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>3. 設置本地測試環境
首先,我們將使用 Testcontainers 配置一個 LocalStack 測試環境以進行本地測試。
@Testcontainers
public class BaseSqsLiveTest {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";
@Container
static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
}
}雖然這種設置使得測試變得容易且可重複,但請注意,本教程中的代碼也可用於直接針對 AWS 進行目標。
4. 設置隊列名稱
默認情況下,Spring Cloud AWS SQS 會自動創建在任何 @SqsListener 註解方法中指定的隊列。 作為初始設置步驟,我們將定義隊列名稱在我們的 application.yaml 文件中:
events:
queues:
order-processing-retry-queue: order_processing_retry_queue
order-processing-async-queue: order_processing_async_queue
order-processing-no-retries-queue: order_processing_no_retries_queue
acknowledgment:
order-processing-no-retries-queue: ALWAYS
<div>
<p>感謝屬性 <em >ALWAYS </em > 也會被我們的其中一位聽眾使用。</p>
<p>我們還將向同一文件中添加幾個 <em >productIds</em>,以便在我們的示例中使用:</p>
</div>
product:
id:
smartphone: 123e4567-e89b-12d3-a456-426614174000
wireless-headphones: 123e4567-e89b-12d3-a456-426614174001
laptop: 123e4567-e89b-12d3-a456-426614174002
tablet: 123e4567-e89b-12d3-a456-426614174004<div>
為了在我們的應用程序中將這些屬性作為POJO,我們將創建兩個 @ConfigurationProperties 類,一個用於隊列:
</div>
<div>
@ConfigurationProperties(prefix = "events.queues")
public class EventsQueuesProperties {
private String orderProcessingRetryQueue;
private String orderProcessingAsyncQueue;
private String orderProcessingNoRetriesQueue;
// getters and setters
}以下是產品列表:
@ConfigurationProperties("product.id")
public class ProductIdProperties {
private UUID smartphone;
private UUID wirelessHeadphones;
private UUID laptop;
// getters and setters
}最後,我們通過在 @Configuration 類中使用 @EnableConfigurationProperties 來啓用配置屬性。
@EnableConfigurationProperties({ EventsQueuesProperties.class, ProductIdProperties.class})
@Configuration
public class OrderProcessingConfiguration {
}<div>
<h1>Introduction</h1>
<p>This document provides an overview of the new API. It covers key features, usage examples, and troubleshooting tips.</p>
<h2>Key Features</h2>
<ul>
<li><strong>Authentication:</strong> OAuth 2.0</li>
<li><strong>Data Storage:</strong> NoSQL database</li>
<li><strong>API Versioning:</strong> Semantic versioning (SemVer)</li>
</ul>
<h2>Usage Examples</h2>
<pre>
<code>
// Example Python code
import requests
response = requests.get('https://api.example.com/data')
print(response.json())
</code>
</pre>
<p>This code snippet demonstrates how to fetch data from the API using the Python requests library.</p>
<h2>Troubleshooting</h2>
<h3>Common Issues</h3>
<ol>
<li><strong>Authentication Errors:</strong> Verify your API key and token.</li>
<li><strong>Rate Limiting:</strong> Respect the API rate limits.</li>
<li><strong>Network Issues:</strong> Check your internet connection.</li>
</ol>
</div>
5. 成功處理的確認
5.1. 創建服務
讓我們首先創建一個名為OrderService 的服務,該服務將負責更新訂單狀態:
@Service
public class OrderService {
Map<UUID, OrderStatus> ORDER_STATUS_STORAGE = new ConcurrentHashMap<>();
public void updateOrderStatus(UUID orderId, OrderStatus status) {
ORDER_STATUS_STORAGE.put(orderId, status);
}
public OrderStatus getOrderStatus(UUID orderId) {
return ORDER_STATUS_STORAGE.getOrDefault(orderId, OrderStatus.UNKNOWN);
}
}
然後,我們將創建 InventoryService。 我們將使用 Map 模擬存儲,並使用 ProductIdProperties 填充它,該類已自動注入來自我們的 application.yaml 文件的值:
@Service
public class InventoryService implements InitializingBean {
private ProductIdProperties productIdProperties;
private Map<UUID, Integer> inventory;
public InventoryService(ProductIdProperties productIdProperties) {
this.productIdProperties = productIdProperties;
}
@Override
public void afterPropertiesSet() {
this.inventory = new ConcurrentHashMap<>(Map.of(productIdProperties.getSmartphone(), 10,
productIdProperties.getWirelessHeadphones(), 15,
productIdProperties.getLaptop(), 5);
}
}InitializingBean 接口提供afterPropertiesSet,這是一個生命週期方法,Spring 在所有依賴項(包括我們的ProductIdProperties 依賴項)解析完成後調用它。
讓我們添加一個checkInventory 方法,該方法驗證產品庫存是否具有請求數量。如果產品不存在,它將拋出ProductNotFoundException 異常;如果產品存在但數量不足,它將拋出OutOfStockException 異常。在第二個場景中,我們還將模擬隨機補貨,以便在幾次重試後,處理最終會成功:
public void checkInventory(UUID productId, int quantity) {
Integer stock = inventory.get(productId);
if (stock < quantity) {
inventory.put(productId, stock + (int) (Math.random() * 5));
throw new OutOfStockException(
"Product with id %s is out of stock. Quantity requested: %s ".formatted(productId, quantity));
};
inventory.put(productId, stock - quantity);
}5.2. 創建監聽器
我們現在將創建我們第一個監聽器。我們將使用 @Component 註解,並通過 Spring 的構造函數依賴注入機制注入服務:
@Component
public class OrderProcessingListeners {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingListeners.class);
private InventoryService inventoryService;
private OrderService orderService;
public OrderProcessingListeners(InventoryService inventoryService, OrderService orderService) {
this.inventoryService = inventoryService;
this.orderService = orderService;
}
}接下來,我們來編寫監聽器方法:
@SqsListener(value = "${events.queues.order-processing-retry-queue}", id = "retry-order-processing-container", messageVisibilitySeconds = "1")
public void stockCheckRetry(OrderCreatedEvent orderCreatedEvent) {
logger.info("Message received: {}", orderCreatedEvent);
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED);
logger.info("Message processed successfully: {}", orderCreatedEvent);
}
The value 屬性通過 application.yaml 文件自動注入。由於 ON_SUCCESS 是默認確認模式,因此我們不需要在註解中指定它。
5.3. 設置測試類
為了驗證邏輯是否按預期工作,讓我們創建一個測試類:
@SpringBootTest
class OrderProcessingApplicationLiveTest extends BaseSqsLiveTest {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingApplicationLiveTest.class);
@Autowired
private EventsQueuesProperties eventsQueuesProperties;
@Autowired
private ProductIdProperties productIdProperties;
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private OrderService orderService;
@Autowired
private MessageListenerContainerRegistry registry;
}我們還將添加一個名為 assertQueueIsEmpty 的方法。在其中,我們將使用自動注入的 MessageListenerContainerRegistry 來獲取容器,然後停止容器以確保它不消耗任何消息。註冊器包含所有由 @SqsListener 註解創建的容器:
private void assertQueueIsEmpty(String queueName, String containerId) {
logger.info("Stopping container {}", containerId);
var container = Objects
.requireNonNull(registry.getContainerById(containerId), () -> "could not find container " + containerId);
container.stop();
// ...
}
容器停止後,我們將使用 SqsTemplate 來查找隊列中的消息。如果確認已成功,則不應返回任何消息。我們還將設置 pollTimeout 為大於可見性超時值的數值,以便如果在消息未被刪除的情況下,它將在指定的時間間隔內再次交付。
以下是 assertQueueIsEmpty 方法的延續:
// ...
logger.info("Checking for messages in queue {}", queueName);
var message = sqsTemplate.receive(from -> from.queue(queueName)
.pollTimeout(Duration.ofSeconds(5)));
assertThat(message).isEmpty();
logger.info("No messages found in queue {}", queueName);
5.4. 測試
在本次測試中,我們將向隊列發送一個 <em >OrderCreatedEvent </em >,其中包含一個數量大於我們庫存的Order 。當異常通過監聽器方法時,框架會發出信號,指示消息處理失敗,並且在 `消息可見性 時間窗口結束後,消息將被重新交付。
為了加快測試速度,<strong >我們設置了messageVisibilitySeconds 為 1,但通常,此配置在隊列本身中完成,默認值為 30 秒。
我們將使用 Spring Cloud AWS 提供的自動配置 <em >SqsTemplate </em >創建事件併發送它。然後,我們將使用Awaitility 等待訂單狀態變為 `PROCESSED ,最後,我們將斷言隊列為空,這意味着確認已成功。
@Test
public void givenOnSuccessAcknowledgementMode_whenProcessingThrows_shouldRetry() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingRetryQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getLaptop(), 10));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.PROCESSED));
assertQueueIsEmpty(queueName, "retry-order-processing-container");
}
請注意,我們正在將 containerId ,該值來自 @SqsListener 註解中指定的值,傳遞給 assertQueueIsEmpty 方法。
現在我們可以運行測試。首先,我們確保 Docker 正在運行,然後執行測試。在容器初始化日誌之後,我們應該看到應用程序的日誌消息:
Message received: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]然後,可能會因為缺貨而出現一個或多個失敗:
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174002 is out of stock. Quantity requested: 10
由於我們已經添加了補貨邏輯,我們最終應該看到消息處理成功。
Message processed successfully: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]最後,我們將確保確認已成功:
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container retry-order-processing-container
INFO 2699 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container retry-order-processing-container stopped
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_retry_queue
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_retry_queue
請注意,“連接被拒絕”錯誤可能會在測試完成後出現——這是因為 Docker 容器在框架可以停止輪詢消息之前就停止了。我們可以安全地忽略這些錯誤。
6. 手動確認
該框架支持手動確認消息,這在我們需要對確認流程擁有更大的控制權的情況下非常有用。
6.1. 創建監聽器
為了説明這一點,我們將創建一個異步場景,其中 InventoryService 具有較慢的連接,並且我們希望在它完成之前釋放監聽器線程:
@SqsListener(value = "${events.queues.order-processing-async-queue}", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL, id = "async-order-processing-container", messageVisibilitySeconds = "3")
public void slowStockCheckAsynchronous(OrderCreatedEvent orderCreatedEvent, Acknowledgement acknowledgement) {
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
CompletableFuture.runAsync(() -> inventoryService.slowCheckInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity()))
.thenRun(() -> orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED))
.thenCompose(voidFuture -> acknowledgement.acknowledgeAsync())
.thenRun(() -> logger.info("Message for order {} acknowledged", orderCreatedEvent.id()));
logger.info("Releasing processing thread.");
}
在這種邏輯中,我們使用 Java 的 CompletableFuture 來異步執行庫存檢查。我們添加了 Acknowledge 對象到監聽器方法,並將 SqsListenerAcknowledgementMode.MANUAL 屬性設置為註解的 acknowledgementMode 屬性。該屬性是一個 String 類型,可以接受屬性佔位符和 SpEL。 Acknowledgement 對象 僅在 我們將 AcknowledgementMode 設置為 MANUAL 時可用。
請注意,在這個例子中,我們利用了 Spring Boot 的自動配置,它提供了合理的默認值,以及 @SqsListener 註解的屬性,用於在確認模式之間切換。 另一種方法是聲明一個 SqsMessageListenerContainerFactory Bean,它允許設置更復雜的配置。
6.2. 模擬慢速連接
現在,讓我們將 slowCheckInventory 方法添加到 InventoryService 類中,使用 Thread.sleep 模擬慢速連接:
public void slowCheckInventory(UUID productId, int quantity) {
simulateBusyConnection();
checkInventory(productId, quantity);
}
private void simulateBusyConnection() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
6.3. 測試
接下來,我們編寫測試代碼:
@Test
public void givenManualAcknowledgementMode_whenManuallyAcknowledge_shouldAcknowledge() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingAsyncQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getSmartphone(), 1));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.PROCESSED));
assertQueueIsEmpty(queueName, "async-order-processing-container");
}
本次,我們要求查詢庫存中的數量,因此不應出現任何錯誤。
在運行測試時,我們將會看到一條日誌消息,指示消息已收到:
INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message received: OrderCreatedEvent[id=013740a3-0a45-478a-b085-fbd634fbe66d, productId=123e4567-e89b-12d3-a456-426614174000, quantity=1]
然後,我們將查看線程釋放消息:
INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Releasing processing thread.
這是因為我們正在異步處理和確認消息。 大約兩秒後,我們應該看到日誌表明消息已被確認:
INFO 2786 --- [onPool-worker-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message for order 013740a3-0a45-478a-b085-fbd634fbe66d acknowledged
最後,我們將查看停止容器並斷言隊列為空的日誌:
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container async-order-processing-container
INFO 2786 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container async-order-processing-container stopped
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_async_queue
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_async_queue7. 成功與錯誤時的確認
我們將探討的最後一種確認模式是 始終,它會導致框架無論監聽方法是否拋出錯誤,都 確認消息。
7.1. 創建監聽器
讓我們模擬一場銷售活動,在此活動期間我們的庫存有限,並且我們不想重新處理任何消息,無論發生任何故障。我們將設置確認模式為 ALWAYS,使用我們在先前定義的 application.yml 屬性中定義的:
@SqsListener(value = "${events.queues.order-processing-no-retries-queue}", acknowledgementMode = ${events.acknowledgment.order-processing-no-retries-queue}, id = "no-retries-order-processing-container", messageVisibilitySeconds = "3")
public void stockCheckNoRetries(OrderCreatedEvent orderCreatedEvent) {
logger.info("Message received: {}", orderCreatedEvent);
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.RECEIVED);
inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
logger.info("Message processed: {}", orderCreatedEvent);
}
在測試中,我們將創建一個數量大於我們庫存的訂單:
7.2. 測試
This section covers the testing process for the system. Testing is a critical part of the development lifecycle and ensures that the system meets the specified requirements.
Types of Testing
- Unit Testing: Testing individual components or modules of the system in isolation.
- Integration Testing: Testing the interaction between different components or modules.
- System Testing: Testing the entire system as a whole to ensure it meets the overall requirements.
- User Acceptance Testing (UAT): Testing performed by end-users to verify that the system meets their needs and expectations.
Testing Process
- Test Planning: Define the scope of testing, identify test objectives, and create a test plan.
- Test Case Design: Develop detailed test cases based on the requirements and design specifications.
- Test Execution: Execute the test cases and record the results.
- Defect Reporting: Report any defects found during testing.
- Retesting: Verify that defects have been fixed.
Tools
Various testing tools are available to support the testing process. These tools can automate test execution, manage test cases, and generate reports.
@Test
public void givenAlwaysAcknowledgementMode_whenProcessThrows_shouldAcknowledge() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingNoRetriesQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getWirelessHeadphones(), 20));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.RECEIVED));
assertQueueIsEmpty(queueName, "no-retries-order-processing-container");
}
現在,即使當 OutOfStockException 被拋出,消息也會被承認,並且不會對該消息進行重試:
Message received: OrderCreatedEvent[id=7587f1a2-328f-4791-8559-ee8e85b25259, productId=123e4567-e89b-12d3-a456-426614174001, quantity=20]
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174001 is out of stock. Quantity requested: 20
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container no-retries-order-processing-container
INFO 2835 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container no-retries-order-processing-container stopped
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_no_retries_queue
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_no_retries_queue8. 結論
在本文中,我們使用基於事件驅動的場景來展示 Spring Cloud AWS v3 SQS 集成提供的三種確認模式:ON_SUCCESS(默認)、MANUAL 和 ALWAYS。
我們利用了自動配置設置,並使用 @SqsListener 註解屬性來切換模式。我們還創建了實時測試,使用 Testcontainers 和 LocalStack 驗證行為。