1. 概述
Spring Cloud AWS 是一個旨在簡化與 AWS 服務交互的項目。 簡單隊列服務 (SQS) 是 AWS 提供的,用於以可擴展的方式發送和接收異步消息的解決方案。
在本教程中,我們將重新介紹 Spring Cloud AWS SQS 集成,該集成已完全重寫為 Spring Cloud AWS 3.0。
該框架提供了熟悉的 Spring 抽象,用於處理 SQS 隊列,例如 SqsTemplate 和 @SqsListener 註解。
我們將探討一個基於事件驅動的場景,並提供發送和接收消息的示例,同時展示如何使用 Testcontainers(一個用於管理可丟棄 Docker 容器的工具)和 LocalStack(一個用於本地模擬 AWS 環境的工具)來設置集成測試,以及展示如何設置集成測試。
2. 依賴項
<a href="https://mvnrepository.com/artifact/io.awspring.cloud/spring-cloud-aws">Spring Cloud AWS Bill of Materials (BOM)</a> 確保項目間兼容的版本。它聲明瞭許多依賴項的版本,包括 Spring Boot,並且應該替代 Spring Boot 自帶的 BOM。
以下是在我們的 <em >pom.xml</em> 文件中導入它的方法:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
我們需要的主要依賴是 SQS Starter,它包含了所有與SQS相關的類,用於該項目。SQS集成與Spring Boot無關,可以在任何標準的Java應用程序中獨立使用:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>對於像我們在本教程中構建的 Spring Boot 應用(例如)這樣的應用,我們應該添加項目的 Core Starter,這樣我們就可以利用 Spring Boot 的自動配置功能,用於 SQS 和 AWS 配置,例如憑證和區域:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>3. 設置本地測試環境
本節將指導您使用 Testcontainers 設置 LocalStack 環境,以便在本地環境中測試您的代碼。請注意,本教程中的示例 也可以直接針對 AWS 執行。
3.1. 依賴項
要使用 JUnit 5 與 LocalStack 和 TestContainers 一起運行,我們需要兩個額外的依賴項:LocalStack 和 TestContainers。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
讓我們也包含 awaitility 庫,以幫助我們斷言異步消息消費:
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
3.2. 配置
接下來,我們將使用 Testcontainers 和 LocalStack 配置我們的本地測試環境。 我們將創建一個 SqsLiveTestConfiguration 類:
@Configuration
public class SqsLiveTestConfiguration {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:3.4.0";
@Bean
@ServiceConnection
LocalStackContainer localStackContainer() {
return new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
}
}<div>
<h1>Introduction</h1>
<p>This document provides an overview of the new API. It covers key features, usage examples, and troubleshooting tips.</p>
<h2>Key Features</h2>
<ul>
<li><strong>Data Validation:</strong> Ensures data integrity by validating input against predefined rules.</li>
<li><strong>Asynchronous Operations:</strong> Enables non-blocking operations for improved performance.</li>
<li><strong>Error Handling:</strong> Provides robust error handling mechanisms for graceful failure management.</li>
</ul>
<pre>
<code>
function calculateSum(a, b) {
return a + b;
}
</code>
</pre>
<p><em>Note:</em> This function performs basic addition.</p>
<h2>Usage Examples</h2>
<pre>
<code>
// Example 1: Validating an email address
const email = "test@example.com";
if (isValidEmail(email)) {
console.log("Email is valid");
} else {
console.log("Email is invalid");
}
</code>
</pre>
<p><em>Important:</em> Always validate user input to prevent security vulnerabilities.</p>
</div>
4. 設置隊列名稱
我們可以通過利用 Spring Boot 的 application.yml 屬性機制來設置隊列名稱。
對於本教程,我們將創建三個隊列:
events:
queues:
user-created-by-name-queue: user_created_by_name_queue
user-created-record-queue: user_created_record_queue
user-created-event-type-queue: user_created_event_type_queue
讓我們創建一個POJO來表示這些屬性:
@ConfigurationProperties(prefix = "events.queues")
public class EventQueuesProperties {
private String userCreatedByNameQueue;
private String userCreatedRecordQueue;
private String userCreatedEventTypeQueue;
// getters and setters
}最後,我們需要在帶有@EnableConfigurationProperties註解的@Configuration註解的類,或者主Spring Application類中,使用該註解,讓Spring Boot知道我們希望使用它來填充我們的application.yml屬性:
@SpringBootApplication
@EnableConfigurationProperties(EventQueuesProperties.class)
public class SpringCloudAwsApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudAwsApplication.class, args);
}
}
現在,我們準備好注入實際值或 POJO,以便獲取隊列名稱。
默認情況下,Spring Cloud AWS SQS 會在隊列未找到時創建隊列,這有助於我們快速設置開發環境。在生產環境中,應用程序不應具有創建隊列的權限,如果未找到隊列,則啓動將失敗。框架還可以配置為在未找到隊列時明確失敗。
5. 發送和接收消息
使用 Spring Cloud AWS,您可以採用多種方式向和從 SQS 發送和接收消息。這裏我們將介紹最常用的方法,使用 SqsTemplate 發送消息,以及 @SqsListener 註解接收消息。
5.1. 場景
在我們的場景中,我們將 模擬一個事件驅動應用程序,該應用程序響應 UserCreatedEvent,並在其本地存儲庫中保存相關信息。
讓我們創建一個 User 實體:
public record User(String id, String name, String email) {
}
讓我們創建一個簡單的基於內存的UserRepository:
@Repository
public class UserRepository {
private final Map<String, User> persistedUsers = new ConcurrentHashMap<>();
public void save(User userToSave) {
persistedUsers.put(userToSave.id(), userToSave);
}
public Optional<User> findById(String userId) {
return Optional.ofNullable(persistedUsers.get(userId));
}
public Optional<User> findByName(String name) {
return persistedUsers.values().stream()
.filter(user -> user.name().equals(name))
.findFirst();
}
}
最後,讓我們創建一個 UserCreatedEvent Java Record 類:
public record UserCreatedEvent(String id, String username, String email) {
}
5.2. 部署
為了測試我們的場景,我們將創建一個名為 SpringCloudAwsSQSLiveTest 的類,該類繼承自我們之前創建的 BaseSqsIntegrationTest 文件。我們將自動注入三個依賴項:由框架自動配置的 SqsTemplate,UserRepository,以便我們驗證消息處理是否成功,以及包含隊列名稱的我們的 EventQueuesProperties POJO。
public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest {
private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class);
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private UserRepository userRepository;
@Autowired
private EventQueuesProperties eventQueuesProperties;
// ...
}為了容納我們的聽眾,我們創建一個 UserEventListeners 類並將其聲明為 Spring 的 @Component:
@Component
public class UserEventListeners {
private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class);
public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType";
private final UserRepository userRepository;
public UserEventListeners(UserRepository userRepository) {
this.userRepository = userRepository;
}
// Our listeners will be added here
}
5.3. 字符串 負載
在第一個示例中,我們將發送一個帶有 字符串 負載的消息,在我們的監聽器中接收它,並將其持久化到我們的存儲庫。然後,我們將輪詢存儲庫以確保我們的應用程序正確地持久化數據。
首先,讓我們在測試類中創建一個用於發送消息的測試:
@Test
void givenAStringPayload_whenSend_shouldReceive() {
// given
var userName = "Albert";
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue())
.payload(userName));
logger.info("Message sent with payload {}", userName);
// then
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findByName(userName)
.isPresent());
}我們應該看到類似以下的日誌:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload Albert
然後,請注意,測試失敗是因為我們尚未為此隊列設置監聽器。
讓我們設置監聽器,以便在監聽器類中消費來自此隊列的消息,並使測試通過:
@SqsListener("${events.queues.user-created-by-name-queue}")
public void receiveStringMessage(String username) {
logger.info("Received message: {}", username);
userRepository.save(new User(UUID.randomUUID()
.toString(), username, null));
}
現在,當我們運行測試時,我們應該在日誌中看到結果:
INFO [ntContainer#0-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: Albert測試通過。
請注意,我們正在使用 Spring 的屬性解析功能從我們之前創建的 application.yml 中獲取隊列名稱。
5.4. POJO 和 Record 負載
現在我們已經成功地發送和接收了 String 負載,讓我們設置一個使用 Java Record 的場景,即我們之前創建的 UserCreatedEvent。
首先,讓我們編寫我們的失敗測試:
@Test
void givenARecordPayload_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "[email protected]");
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue())
.payload(payload));
// then
logger.info("Message sent with payload: {}", payload);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}我們應該在測試失敗之前看到類似以下的日誌:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload: UserCreatedEvent[id=67f52cf6-c750-4200-9a02-345bda0516f8, username=John, [email protected]]
現在,讓我們創建一個相應的監聽器,以使測試通過:
@SqsListener("${events.queues.user-created-record-queue}")
public void receiveRecordMessage(UserCreatedEvent event) {
logger.info("Received message: {}", event);
userRepository.save(new User(event.id(), event.username(), event.email()));
}
我們將會看到輸出,確認消息已收到,測試通過:
INFO [ntContainer#1-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: UserCreatedEvent[id=2d66df3d-2dbd-4aed-8fc0-ddd08416ed12, username=John, [email protected]]
該框架將自動配置 Spring Context 中任何可用的 ObjectMapper Bean,以處理消息的序列化和反序列化。 我們可以配置自己的 ObjectMapper 並以多種方式自定義序列化,但這超出了本教程的範圍。
5.5. Spring 消息與頭部
在最後一個場景中,我們將發送一個帶有自定義頭部和接收消息作為 Spring <em >Message</em> 實例,以及我們添加的自定義頭部和標準 SQS 頭部。<strong >框架會自動將所有 SQS消息屬性轉換為消息頭部,包括用户提供的任何頭部。
讓我們首先創建失敗的測試:
@Test
void givenCustomHeaders_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "[email protected]");
var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent");
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue())
.payload(payload)
.headers(headers));
// then
logger.info("Sent message with payload {} and custom headers: {}", payload, headers);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}測試應該生成與以下內容類似的日誌,在失敗之前:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Sent message with payload UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, [email protected]] and custom headers: {eventType=UserCreatedEvent}
現在,讓我們添加相應的監聽器以使測試通過:
@SqsListener("${events.queues.user-created-event-type-queue}")
public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType,
@Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) {
logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive);
UserCreatedEvent payload = message.getPayload();
userRepository.save(new User(payload.id(), payload.username(), payload.email()));
}
我們將會看到在重新運行測試後產生的輸出結果,表明測試成功。
INFO [ntContainer#2-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message GenericMessage [payload=UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, [email protected]], headers=...在本示例中,我們接收到一個包含已反序列化的 UserCreatedEvent 記錄作為 payload,以及兩個 headers 的 Message。為了確保項目整體的一致性,我們應該 使用 SqsHeader 類常量來獲取 SQS 標準 headers 。
6. 結論
本文通過一個基於事件驅動的場景,詳細介紹了使用 Spring Cloud AWS SQS 3.0 發送和接收消息的不同示例。
我們搭建了本地環境,使用了 LocalStack 和 TestContainers,並配置了框架使用適當的本地配置,以用於我們的集成測試。