知識庫 / Spring / Spring Cloud RSS 訂閱

Spring Cloud AWS 3.0 – SQS 集成

Spring Cloud
HongKong
9
11:18 AM · Dec 06 ,2025

1. 概述

Spring Cloud AWS 是一個旨在簡化與 AWS 服務交互的項目。 簡單隊列服務 (SQS) 是 AWS 提供的,用於以可擴展的方式發送和接收異步消息的解決方案。

在本教程中,我們將重新介紹 Spring Cloud AWS SQS 集成,該集成已完全重寫為 Spring Cloud AWS 3.0

該框架提供了熟悉的 Spring 抽象,用於處理 SQS 隊列,例如 SqsTemplate@SqsListener 註解。

我們將探討一個基於事件驅動的場景,並提供發送和接收消息的示例,同時展示如何使用 Testcontainers(一個用於管理可丟棄 Docker 容器的工具)和 LocalStack(一個用於本地模擬 AWS 環境的工具)來設置集成測試,以及展示如何設置集成測試。

2. 依賴項

<a href="https://mvnrepository.com/artifact/io.awspring.cloud/spring-cloud-aws">Spring Cloud AWS Bill of Materials (BOM)</a> 確保項目間兼容的版本。它聲明瞭許多依賴項的版本,包括 Spring Boot,並且應該替代 Spring Boot 自帶的 BOM

以下是在我們的 <em >pom.xml</em> 文件中導入它的方法:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>3.2.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

我們需要的主要依賴是 SQS Starter,它包含了所有與SQS相關的類,用於該項目。SQS集成與Spring Boot無關,可以在任何標準的Java應用程序中獨立使用:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>

對於像我們在本教程中構建的 Spring Boot 應用(例如)這樣的應用,我們應該添加項目的 Core Starter,這樣我們就可以利用 Spring Boot 的自動配置功能,用於 SQS 和 AWS 配置,例如憑證和區域:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter</artifactId>
</dependency>

3. 設置本地測試環境

本節將指導您使用 Testcontainers 設置 LocalStack 環境,以便在本地環境中測試您的代碼。請注意,本教程中的示例 也可以直接針對 AWS 執行

3.1. 依賴項

要使用 JUnit 5 與 LocalStack 和 TestContainers 一起運行,我們需要兩個額外的依賴項:LocalStack 和 TestContainers

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-testcontainers</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>localstack</artifactId>
    <scope>test</scope>
</dependency>

讓我們也包含 awaitility 庫,以幫助我們斷言異步消息消費:

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>

3.2. 配置

接下來,我們將使用 TestcontainersLocalStack 配置我們的本地測試環境。 我們將創建一個 SqsLiveTestConfiguration 類:

@Configuration
public class SqsLiveTestConfiguration {
    private static final String LOCAL_STACK_VERSION = "localstack/localstack:3.4.0";

    @Bean
    @ServiceConnection
    LocalStackContainer localStackContainer() {
        return new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
    }
}
<div>
  <h1>Introduction</h1>
  <p>This document provides an overview of the new API. It covers key features, usage examples, and troubleshooting tips.</p>
  <h2>Key Features</h2>
  <ul>
    <li><strong>Data Validation:</strong> Ensures data integrity by validating input against predefined rules.</li>
    <li><strong>Asynchronous Operations:</strong> Enables non-blocking operations for improved performance.</li>
    <li><strong>Error Handling:</strong> Provides robust error handling mechanisms for graceful failure management.</li>
  </ul>
  <pre>
    <code>
      function calculateSum(a, b) {
        return a + b;
      }
    </code>
  </pre>
  <p><em>Note:</em> This function performs basic addition.</p>
  <h2>Usage Examples</h2>
  <pre>
    <code>
      // Example 1: Validating an email address
      const email = "test@example.com";
      if (isValidEmail(email)) {
        console.log("Email is valid");
      } else {
        console.log("Email is invalid");
      }
    </code>
  </pre>
  <p><em>Important:</em> Always validate user input to prevent security vulnerabilities.</p>
</div>

4. 設置隊列名稱

我們可以通過利用 Spring Boot 的 application.yml 屬性機制來設置隊列名稱。

對於本教程,我們將創建三個隊列:

events:
  queues:
    user-created-by-name-queue: user_created_by_name_queue
    user-created-record-queue: user_created_record_queue
    user-created-event-type-queue: user_created_event_type_queue

讓我們創建一個POJO來表示這些屬性:

@ConfigurationProperties(prefix = "events.queues")
public class EventQueuesProperties {

    private String userCreatedByNameQueue;
    private String userCreatedRecordQueue;
    private String userCreatedEventTypeQueue;

    // getters and setters
}

最後,我們需要在帶有@EnableConfigurationProperties註解的@Configuration註解的類,或者主Spring Application類中,使用該註解,讓Spring Boot知道我們希望使用它來填充我們的application.yml屬性:

@SpringBootApplication
@EnableConfigurationProperties(EventQueuesProperties.class)
public class SpringCloudAwsApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudAwsApplication.class, args);
    }
}

現在,我們準備好注入實際值或 POJO,以便獲取隊列名稱。

默認情況下,Spring Cloud AWS SQS 會在隊列未找到時創建隊列,這有助於我們快速設置開發環境。在生產環境中,應用程序不應具有創建隊列的權限,如果未找到隊列,則啓動將失敗。框架還可以配置為在未找到隊列時明確失敗。

5. 發送和接收消息

使用 Spring Cloud AWS,您可以採用多種方式向和從 SQS 發送和接收消息。這裏我們將介紹最常用的方法,使用 SqsTemplate 發送消息,以及 @SqsListener 註解接收消息。

5.1. 場景

在我們的場景中,我們將 模擬一個事件驅動應用程序,該應用程序響應 UserCreatedEvent,並在其本地存儲庫中保存相關信息。

讓我們創建一個 User 實體:

public record User(String id, String name, String email) {
}

讓我們創建一個簡單的基於內存的UserRepository

@Repository
public class UserRepository {

    private final Map<String, User> persistedUsers = new ConcurrentHashMap<>();

    public void save(User userToSave) {
        persistedUsers.put(userToSave.id(), userToSave);
    }

    public Optional<User> findById(String userId) {
        return Optional.ofNullable(persistedUsers.get(userId));
    }

    public Optional<User> findByName(String name) {
        return persistedUsers.values().stream()
          .filter(user -> user.name().equals(name))
          .findFirst();
    }
}

最後,讓我們創建一個 UserCreatedEvent Java Record 類:

public record UserCreatedEvent(String id, String username, String email) {
}

5.2. 部署

為了測試我們的場景,我們將創建一個名為 SpringCloudAwsSQSLiveTest 的類,該類繼承自我們之前創建的 BaseSqsIntegrationTest 文件。我們將自動注入三個依賴項:由框架自動配置的 SqsTemplateUserRepository,以便我們驗證消息處理是否成功,以及包含隊列名稱的我們的 EventQueuesProperties POJO。

public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest {

    private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class);

    @Autowired
    private SqsTemplate sqsTemplate;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private EventQueuesProperties eventQueuesProperties;

   // ...
}

為了容納我們的聽眾,我們創建一個 UserEventListeners 類並將其聲明為 Spring 的 @Component

@Component
public class UserEventListeners {

    private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class);

    public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType";

    private final UserRepository userRepository;

    public UserEventListeners(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    // Our listeners will be added here 
}

5.3. 字符串 負載

在第一個示例中,我們將發送一個帶有 字符串 負載的消息,在我們的監聽器中接收它,並將其持久化到我們的存儲庫。然後,我們將輪詢存儲庫以確保我們的應用程序正確地持久化數據。

首先,讓我們在測試類中創建一個用於發送消息的測試

@Test
void givenAStringPayload_whenSend_shouldReceive() {
    // given
    var userName = "Albert";

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue())
      .payload(userName));
    logger.info("Message sent with payload {}", userName);

    // then
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findByName(userName)
        .isPresent());
}

我們應該看到類似以下的日誌:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload Albert

然後,請注意,測試失敗是因為我們尚未為此隊列設置監聽器

讓我們設置監聽器,以便在監聽器類中消費來自此隊列的消息,並使測試通過:

@SqsListener("${events.queues.user-created-by-name-queue}")
public void receiveStringMessage(String username) {
    logger.info("Received message: {}", username);
    userRepository.save(new User(UUID.randomUUID()
      .toString(), username, null));
}

現在,當我們運行測試時,我們應該在日誌中看到結果:

INFO [ntContainer#0-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: Albert

測試通過。

請注意,我們正在使用 Spring 的屬性解析功能從我們之前創建的 application.yml 中獲取隊列名稱。

5.4. POJO 和 Record 負載

現在我們已經成功地發送和接收了 String 負載,讓我們設置一個使用 Java Record 的場景,即我們之前創建的 UserCreatedEvent

首先,讓我們編寫我們的失敗測試:

@Test
void givenARecordPayload_whenSend_shouldReceive() {
    // given
    String userId = UUID.randomUUID()
      .toString();
    var payload = new UserCreatedEvent(userId, "John", "[email protected]");

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue())
      .payload(payload));

    // then
    logger.info("Message sent with payload: {}", payload);
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findById(userId)
        .isPresent());
}

我們應該在測試失敗之前看到類似以下的日誌:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload: UserCreatedEvent[id=67f52cf6-c750-4200-9a02-345bda0516f8, username=John, [email protected]]

現在,讓我們創建一個相應的監聽器,以使測試通過:

@SqsListener("${events.queues.user-created-record-queue}")
public void receiveRecordMessage(UserCreatedEvent event) {
    logger.info("Received message: {}", event);
    userRepository.save(new User(event.id(), event.username(), event.email()));
}

我們將會看到輸出,確認消息已收到,測試通過:

INFO [ntContainer#1-1] c.b.s.cloud.aws.sqs.UserEventListeners   : Received message: UserCreatedEvent[id=2d66df3d-2dbd-4aed-8fc0-ddd08416ed12, username=John, [email protected]]

該框架將自動配置 Spring Context 中任何可用的 ObjectMapper Bean,以處理消息的序列化和反序列化。 我們可以配置自己的 ObjectMapper 並以多種方式自定義序列化,但這超出了本教程的範圍。

5.5. Spring 消息與頭部

在最後一個場景中,我們將發送一個帶有自定義頭部和接收消息作為 Spring <em >Message</em> 實例,以及我們添加的自定義頭部和標準 SQS 頭部。<strong >框架會自動將所有 SQS消息屬性轉換為消息頭部,包括用户提供的任何頭部。

讓我們首先創建失敗的測試:

@Test
void givenCustomHeaders_whenSend_shouldReceive() {
    // given
    String userId = UUID.randomUUID()
      .toString();
    var payload = new UserCreatedEvent(userId, "John", "[email protected]");
    var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent");

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue())
      .payload(payload)
      .headers(headers));

    // then
    logger.info("Sent message with payload {} and custom headers: {}", payload, headers);
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findById(userId)
        .isPresent());
}

測試應該生成與以下內容類似的日誌,在失敗之前:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest  : Sent message with payload UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, [email protected]] and custom headers: {eventType=UserCreatedEvent}

現在,讓我們添加相應的監聽器以使測試通過:

@SqsListener("${events.queues.user-created-event-type-queue}")
public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType,
    @Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) {
    logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive);
    UserCreatedEvent payload = message.getPayload();
    userRepository.save(new User(payload.id(), payload.username(), payload.email()));
}

我們將會看到在重新運行測試後產生的輸出結果,表明測試成功。

INFO [ntContainer#2-1] c.b.s.cloud.aws.sqs.UserEventListeners   : Received message GenericMessage [payload=UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, [email protected]], headers=...

在本示例中,我們接收到一個包含已反序列化的 UserCreatedEvent 記錄作為 payload,以及兩個 headers 的 Message。為了確保項目整體的一致性,我們應該 使用 SqsHeader 類常量來獲取 SQS 標準 headers 。

6. 結論

本文通過一個基於事件驅動的場景,詳細介紹了使用 Spring Cloud AWS SQS 3.0 發送和接收消息的不同示例。

我們搭建了本地環境,使用了 LocalStack 和 TestContainers,並配置了框架使用適當的本地配置,以用於我們的集成測試。

發佈 評論

Some HTML is okay.