1. 概述
AWS SQS 中的 FIFO(先進先出)隊列旨在確保消息按發送的順序處理,並且每個消息僅傳遞一次。
Spring Cloud AWS v3 支持此功能,通過易於使用的抽象,允許開發人員使用最少的樣板代碼來處理 FIFO 隊列的特性,如消息排序和去重。
在本教程中,我們將探索 FIFO 隊列在金融交易處理系統中的三個實用用例:
- 確保同一賬户內的交易消息嚴格排序
- 在保持每個賬户的 FIFO 語義的前提下,並行處理來自不同賬户的交易
- 在處理失敗的情況下處理消息重試,確保重試尊重原始消息順序
我們將通過設置事件驅動應用程序並創建實時測試來驗證行為符合預期,利用 Spring Cloud AWS SQS V3 介紹文章中的環境和測試設置。
2. 依賴項
為了開始,我們將管理依賴項並確保版本兼容性,通過導入 Spring Cloud AWS Bill of Materials (BOM) 來實現:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>接下來,我們添加必要的 Spring Cloud AWS Starter 組件,用於核心功能和 SQS 集成:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>我們還會包含 Spring Boot Web Starter。 由於我們使用了 Spring Cloud AWS BOM,因此無需指定其版本:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>最後,為了測試,我們將添加 LocalStack 和 TestContainers 的依賴項,以及與 JUnit 5、Awaitility(用於異步操作驗證)和 Spring Boot Test Starter(版本 3.4.0)一起使用:
// 依賴項配置示例
// 依賴項配置示例
// 依賴項配置示例
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
3. 設置本地測試環境
接下來,我們將使用 Testcontainers 和 LocalStack 配置我們的本地測試環境。 我們將創建一個 SqsLiveTestConfiguration 類:
@Configuration
public class SqsLiveTestConfiguration {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:3.4.0";
@Bean
@ServiceConnection
LocalStackContainer localStackContainer() {
return new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
}
}
在本課程中,我們聲明 LocalStack 測試容器為 Spring Bean,並使用 @ServiceConnection 註解來處理我們的 Wiring。
4. 設置隊列名稱
我們將定義我們的 SQS 隊列名稱,並在 application.yml 文件中利用 Spring Boot 的配置外部化功能。
events:
queues:
fifo:
transactions-queue: "transactions-queue.fifo"
slow-queue: "slow-queue.fifo"
failure-queue: "failure-queue.fifo"這種結構將我們的隊列名稱組織在一個分層結構中,方便我們在應用程序代碼中進行管理和訪問。所有隊列名稱必須以 .fifo 後綴結尾,用於 FIFO 隊列。
5. 設置應用程序
讓我們通過使用一個交易微服務來闡述這些概念。 該服務將處理 TransactionEvent 消息,這些消息代表必須在每個賬户中保持有序的財務交易。
首先,我們定義我們的 Transaction 實體:
public record Transaction(UUID transactionId, UUID accountId, double amount, TransactionType type) {}伴隨着一個 交易類型枚舉:
public enum TransactionType {
DEPOSIT,
WITHDRAW
}接下來,我們創建 TransactionEvent:
public record TransactionEvent(UUID transactionId, UUID accountId, double amount, TransactionType type) {
public Transaction toEntity() {
return new Transaction(transactionId, accountId, amount, type);
}
}TransactionService 類負責處理邏輯併為測試目的維護一個模擬的倉庫。
@Service
public class TransactionService {
private static final Logger logger = LoggerFactory.getLogger(TransactionService.class);
private final ConcurrentHashMap<UUID, List<Transaction>> processedTransactions =
new ConcurrentHashMap<>();
public void processTransaction(Transaction transaction) {
logger.info("Processing transaction: {} for account {}",
transaction.transactionId(), transaction.accountId());
processedTransactions.computeIfAbsent(transaction.accountId(), k -> new ArrayList<>())
.add(transaction);
}
public List<Transaction> getProcessedTransactionsByAccount(UUID accountId) {
return processedTransactions.getOrDefault(accountId, new ArrayList<>());
}
}6. 按順序處理事件
在我們的第一個場景中,我們將創建一個監聽器,用於處理事件,並創建一個測試,以斷言我們接收到的事件與發送的順序相同。 我們將使用 @RepeatedTest 標註來運行測試 100 次,以確保其一致性,並觀察在標準 SQS 隊列和 FIFO 隊列中運行時的行為。
6.1. 創建監聽器
讓我們創建一個第一個監聽器,用於接收和處理事件的順序。 我們將使用 <em @SqsListener </em> 標註,利用 Spring 的佔位符解析功能,從 <em application.yml </em> 文件中解析隊列名稱:
@Component
public class TransactionListener {
private final TransactionService transactionService;
public TransactionListener(TransactionService transactionService) {
this.transactionService = transactionService;
}
@SqsListener("${events.queues.fifo.transactions-queue}")
public void processTransaction(TransactionEvent transactionEvent) {
transactionService.processTransaction(transactionEvent.toEntity());
}
}請注意,無需進行任何進一步的設置。在幕後,框架會檢測隊列類型為 FIFO,並做出所有必要的調整,以確保監聽方法接收到的消息按正確的順序。
6.2. 創建測試
讓我們創建一個測試,以斷言接收消息的順序與發送順序完全一致。我們從一個擴展了之前創建的 BaseSqsLiveTest 的測試套件開始:
@SpringBootTest
public class SpringCloudAwsSQSTransactionProcessingTest extends BaseSqsLiveTest {
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private TransactionService transactionService;
@Value("${events.queues.fifo.transactions-queue}")
String transactionsQueue;
@Test
void givenTransactionsFromSameAccount_whenSend_shouldReceiveInOrder() {
var accountId = UUID.randomUUID();
var transactions = List.of(createDeposit(accountId, 100.0),
createWithdraw(accountId, 50.0), createDeposit(accountId, 25.0));
var messages = createTransactionMessages(accountId, transactions);
sqsTemplate.sendMany(transactionsQueue, messages);
await().atMost(Duration.ofSeconds(5))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId),
isEqual(eventsToEntities(transactions)));
}
}
在本測試中,我們利用了 SqsTemplate 的 sendMany() 方法,該方法允許我們在同一批次中發送最多 10 條消息。然後,我們等待最多 5 秒來接收這些消息。
我們還將創建一些輔助方法,以幫助我們保持測試邏輯的整潔。 sendMany() 方法期望接收一個 List<Pojo>, 因此 createTransactionMessages() 方法將每個 accountId 的交易映射到一個消息:
private List<Message<TransactionEvent>> createTransactionMessages(UUID accountId,
Collection<TransactionEvent> transactions) {
return transactions.stream()
.map(transaction -> MessageBuilder.withPayload(transaction)
.setHeader(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER,
accountId.toString())
.build())
.toList();
}
在 SQS FIFO 中,MessageGroupId 屬性用於指示哪些消息應分組在一起並按順序接收。 在我們的場景中,我們必須確保一個 Account 的交易按順序處理,但不需要在 Account 之間進行排序,因此我們將 accountId 作為 MessageGroupId 使用。 為此,我們可以使用 SqsHeaders 中的標頭,框架將它們映射到 SQS 消息屬性:
其餘的輔助方法是簡單的方法,用於將事件映射到交易,並創建 TransactionEvents:
private List<Transaction> eventsToEntities(List<TransactionEvent> transactionEvents) {
return transactionEvents.stream()
.map(TransactionEvent::toEntity)
.toList();
}
private TransactionEvent createWithdraw(UUID accountId1, double amount) {
return new TransactionEvent(UUID.randomUUID(), accountId1, amount, TransactionType.WITHDRAW);
}
private TransactionEvent createDeposit(UUID accountId1, double amount) {
return new TransactionEvent(UUID.randomUUID(), accountId1, amount, TransactionType.DEPOSIT);
}
6.3. 運行測試
當運行測試時,我們會看到測試通過並生成與以下類似的日誌,交易按照我們聲明的順序進行:
TransactionService : Processing transaction: DEPOSIT:100.0 for account f97876f9-5ef9-4b62-a69d-a5d87b5b8e7e
TransactionService : Processing transaction: WITHDRAW:50.0 for account f97876f9-5ef9-4b62-a69d-a5d87b5b8e7e
TransactionService : Processing transaction: DEPOSIT:25.0 for account f97876f9-5ef9-4b62-a69d-a5d87b5b8e7e
如果我們仍然不確定,並且想確保這不是巧合,我們可以添加 RepeatableTest 註解,從而運行測試 100 次
@RepeatedTest(100)
void givenTransactionsFromSameAccount_whenSend_shouldReceiveInOrder() {
// ...test remains the same
}
所有 100 個運行應該以相同的順序同時通過,日誌順序保持一致。
為了進行額外的驗證,我們使用標準隊列而不是 FIFO 隊列,並驗證其行為。
為此,我們需要從隊列名稱中移除 .fifo 後綴,在 application.yml 中進行操作。
transactions-queue: "transactions-queue"
接下來,我們將註釋掉在 createTransactionMessages() 方法中添加 MessageId 標頭的代碼,因為 Standard SQS 隊列不支持該屬性:
// .setHeader(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, accountId.toString())現在,讓我們再次運行測試 100 次。您會注意到,測試有時會通過,這僅僅因為消息恰好以預期的順序到達,但其他時候它會失敗,因為標準隊列中消息排序沒有保證。
在總結本節之前,讓我們撤銷這些更改,並在隊列中添加 .fifo 後綴,移除 @RepeatedTest 註解,並取消註釋 MessageGroupId 代碼。
7. 並行處理多個消息組
在 SQS FIFO 中,為了最大化消息消費吞吐量,可以並行處理來自不同消息組的消息,同時保持消息組內的消息順序。 Spring Cloud AWS SQS 默認支持此行為,無需進行任何額外的配置。
為了説明此行為,我們向 TransactionService 添加一個模擬慢連接的方法:
public void simulateSlowProcessing(Transaction transaction) {
try {
processTransaction(transaction);
Thread.sleep(Thread.sleep(100));
logger.info("Transaction processing completed: {}:{} for account {}",
transaction.type(), transaction.amount(), transaction.accountId());
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
throw new RuntimeException(e);
}
}
緩慢的連接將幫助我們確認不同賬户的消息正在並行處理,同時在每個賬户內保持交易順序。
現在,讓我們創建一個監聽器,該監聽器將使用 TransactionListener 類中的新方法:
@SqsListener("${events.queues.fifo.slow-queue}")
public void processParallelTransaction(TransactionEvent transactionEvent) {
transactionService.simulateSlowProcessing(transactionEvent.toEntity());
}
最後,我們創建一個測試來驗證其行為:
@Test
void givenTransactionsFromDifferentAccounts_whenSend_shouldProcessInParallel() {
var accountId1 = UUID.randomUUID();
var accountId2 = UUID.randomUUID();
var account1Transactions = List.of(createDeposit(accountId1, 100.0),
createWithdraw(accountId1, 50.0), createDeposit(accountId1, 25.0));
var account2Transactions = List.of(createDeposit(accountId2, 50.0),
createWithdraw(accountId2, 25.0), createDeposit(accountId2, 50.0));
var allMessages = Stream.concat(createTransactionMessages(accountId1, account1Transactions).stream(),
createTransactionMessages(accountId2, account2Transactions).stream()).toList();
sqsTemplate.sendMany(slowQueue, allMessages);
await().atMost(Duration.ofSeconds(5))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId1),
isEqual(eventsToEntities(account1Transactions)));
await().atMost(Duration.ofSeconds(5))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId2),
isEqual(eventsToEntities(account2Transactions)));
}
在本次測試中,我們發送了兩個不同賬户的兩個交易事件集。我們再次利用了 sendMany() 方法,以批量發送所有消息,並斷言消息按預期順序接收。
當運行測試時,我們應該看到類似以下的日誌:
TransactionService : Processing transaction: DEPOSIT:50.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: DEPOSIT:100.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Transaction processing completed: DEPOSIT:100.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Transaction processing completed: DEPOSIT:50.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: WITHDRAW:50.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Processing transaction: WITHDRAW:25.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Transaction processing completed: WITHDRAW:50.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Transaction processing completed: WITHDRAW:25.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: DEPOSIT:50.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: DEPOSIT:25.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
我們可以看到,這兩個賬户都在並行處理,同時保持每個賬户內的順序,這一點也通過測試通過驗證。
8. 按照順序重試處理
在上一場景中,我們將模擬網絡故障並確保處理順序保持一致。 當監聽器方法拋出錯誤時,框架會暫停對該消息羣組的執行,且不會確認這些消息。 SQS 在可見窗口過期後再次服務剩餘的消息。
為了説明這種行為,我們將向我們的 TransactionService 中添加一個新方法,該方法總是會在第一次處理消息時失敗。
首先,讓我們向 TransactionService 中添加一個 Set 以存儲已失敗的 ID:
private final Set<UUID> failedTransactions = ConcurrentHashMap.newKeySet();然後我們來添加 processTransactionWithFailure() 方法:
public void processTransactionWithFailure(Transaction transaction) {
if (!failedTransactions.contains(transaction.transactionId())) {
failedTransactions.add(transaction.transactionId());
throw new RuntimeException("Simulated failure for transaction " +
transaction.type() + ":" + transaction.amount());
}
processTransaction(transaction);
}此方法將在首次處理交易時拋出錯誤,但在後續重試中將正常處理。
現在,讓我們為處理消息添加監聽器。我們將將 messageVisibilitySeconds 設置為 1,以縮小可見性窗口並加快測試中的重試速度。
@SqsListener(value = "${events.queues.fifo.failure-queue}", messageVisibilitySeconds = "1")
public void retryFailedTransaction(TransactionEvent transactionEvent) {
transactionService.processTransactionWithFailure(transactionEvent.toEntity());
}
最後,讓我們創建一個測試來驗證行為是否符合預期:
@Test
void givenTransactionProcessingFailure_whenSend_shouldRetryInOrder() {
var accountId = UUID.randomUUID();
var transactions = List.of(createDeposit(accountId, 100.0),
createWithdraw(accountId, 50.0), createDeposit(accountId, 25.0));
var messages = createTransactionMessages(accountId, transactions);
sqsTemplate.sendMany(failureQueue, messages);
await().atMost(Duration.ofSeconds(10))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId),
isEqual(eventsToEntities(transactions)));
}在本次測試中,我們發送了三個事件,並斷言它們按照預期順序被處理。
當我們運行測試時,異常堆棧跟蹤中應該會看到類似以下的日誌:
Caused by: java.lang.RuntimeException: Simulated failure for transaction DEPOSIT:100.0
隨後:
TransactionService : Processing transaction: DEPOSIT:100.0 for account 3f684ccb-80e8-4e40-9136-c3b59bdd980b
表明該事件已在第二次嘗試中成功處理。
對於下一次事件,我們應該看到 2 個類似的配對:
Caused by: java.lang.RuntimeException: Simulated failure for transaction WITHDRAW:50.0
TransactionService : Processing transaction: WITHDRAW:50.0 for account 3f684ccb-80e8-4e40-9136-c3b59bdd980b
Caused by: java.lang.RuntimeException: Simulated failure for transaction DEPOSIT:25.0
TransactionService : Processing transaction: DEPOSIT:25.0 for account 3f684ccb-80e8-4e40-9136-c3b59bdd980b
這表明事件已按照正確的順序處理,即使在存在故障的情況下也是如此。
9. 結論
本文介紹了 Spring Cloud AWS v3 對 FIFO 隊列的支持。我們構建了一個依賴於事件按順序處理的事務處理服務,並在三種不同場景下驗證消息順序是否得到尊重:處理單個消息組、並行處理多個消息組以及在失敗後重試消息。
我們通過設置本地測試環境並創建實時測試來驗證每個場景,以確保我們的邏輯正確。