知識庫 / Spring / Spring Cloud RSS 訂閱

Spring Cloud AWS v3 中消息轉換

Spring Cloud
HongKong
6
11:06 AM · Dec 06 ,2025

1. 概述

消息轉換是指在應用程序傳輸和接收消息時,將消息轉換成不同格式和表示形式的過程。

AWS SQS 允許使用文本負載,Spring Cloud AWS SQS 集成則提供熟悉的 Spring 抽象,用於管理文本負載的序列化和反序列化到和從 POJO 和記錄,默認使用 JSON。

在本教程中,我們將使用事件驅動的場景,探討消息轉換的三種常見用例:POJO/記錄的序列化和反序列化、設置自定義 ObjectMapper 以及反序列化到子類/接口實現。

為了測試這些用例,我們將利用 Spring Cloud AWS SQS V3 介紹文章中的環境和測試設置。

2. 依賴項

首先,導入 Spring Cloud AWS Bill of Materials,該組件管理我們依賴項的版本,確保它們之間版本兼容性:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>${spring-cloud-aws.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

現在,我們可以添加核心和 SQS 啓動依賴項:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>

對於本教程,我們將使用 Spring Boot Web starter。特別地,我們未指定版本,因為我們正在導入 Spring Cloud AWS 的 BOM

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

最後,我們添加測試依賴項——LocalStack 和 TestContainers,以及與 JUnit 5 配合使用,該庫用於驗證異步消息消費(awaitility 庫),以及 AssertJ 用於使用流暢 API 處理斷言,除此之外,還包括 Spring Boot 的測試依賴。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>localstack</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.assertj</groupId>
    <artifactId>assertj-core</artifactId>
    <scope>test</scope>
</dependency>

3. 設置本地測試環境

現在我們已經添加了依賴項,接下來我們將通過創建 BaseSqsLiveTest 類來設置我們的測試環境,該類應由我們的測試套件進行擴展:

@Testcontainers
public class BaseSqsLiveTest {

    private static final String LOCAL_STACK_VERSION = "localstack/localstack:3.4.0";

    @Container
    static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));

    @DynamicPropertySource
    static void overrideProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
        registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
        registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
        registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
          .toString());
    }
}

4. 設置隊列名稱

為了利用 Spring Boot 的配置外部化功能,我們將隊列名稱添加到我們的 <em >application.yml</em> 文件中:

events:
  queues:
    shipping:
      simple-pojo-conversion-queue: shipping_pojo_conversion_queue
      custom-object-mapper-queue: shipping_custom_object_mapper_queue
      deserializes-subclass: deserializes_subclass_queue

我們現在創建一個帶有@ConfigurationProperties註解的類,我們將注入到我們的測試中以檢索隊列名稱:

@ConfigurationProperties(prefix = "events.queues.shipping")
public class ShipmentEventsQueuesProperties {

    private String simplePojoConversionQueue;

    private String customObjectMapperQueue;

    private String subclassDeserializationQueue;

    // ...getters and setters

}

最後,我們向一個 @Configuration 類添加了 @EnableConfigurationProperties 註解:

@EnableConfigurationProperties({ ShipmentEventsQueuesProperties.class })
@Configuration
public class ShipmentServiceConfiguration {
}

5. 設置應用程序

我們將創建一個名為 Shipment 的微服務,該微服務會響應 ShipmentRequestedEvent,以説明我們的用例。

首先,讓我們創建一個名為 Shipment 的實體,該實體將存儲有關貨運的信息:

public class Shipment {

    private UUID orderId;
    private String customerAddress;
    private LocalDate shipBy;
    private ShipmentStatus status;

    public Shipment(){}

    public Shipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status) {
        this.orderId = orderId;
        this.customerAddress = customerAddress;
        this.shipBy = shipBy;
        this.status = status;
    }
    
    // ...getters and setters
}

接下來,讓我們添加一個 ShipmentStatus 枚舉:

public enum ShipmentStatus {
    REQUESTED,
    PROCESSED,
    CUSTOMS_CHECK,
    READY_FOR_DISPATCH,
    SENT,
    DELIVERED
}

我們還需要 ShipmentRequestedEvent

public class ShipmentRequestedEvent {

    private UUID orderId;
    private String customerAddress;
    private LocalDate shipBy;

    public ShipmentRequestedEvent() {
    }

    public ShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy) {
        this.orderId = orderId;
        this.customerAddress = customerAddress;
        this.shipBy = shipBy;
    }

    public Shipment toDomain() {
        return new Shipment(orderId, customerAddress, shipBy, ShipmentStatus.REQUESTED);
    }

    // ...getters and setters

}

為了處理我們的貨運,我們將創建一個簡單的 ShipmentService 類,其中包含一個模擬的倉庫,我們將用於驗證我們的測試:

@Service
public class ShipmentService {

    private static final Logger logger = LoggerFactory.getLogger(ShipmentService.class);

    private final Map<UUID, Shipment> shippingRepository = new ConcurrentHashMap<>();

    public void processShippingRequest(Shipment shipment) {
        logger.info("Processing shipping for order: {}", shipment.getOrderId());
        shipment.setStatus(ShipmentStatus.PROCESSED);
        shippingRepository.put(shipment.getOrderId(), shipment);
        logger.info("Shipping request processed: {}", shipment.getOrderId());
    }

    public Shipment getShipment(UUID requestId) {
        return shippingRepository.get(requestId);
    }
}

6. 處理默認配置下的 POJOs 和記錄

Spring Cloud AWS SQS 預配置了一個 SqsMessagingMessageConverter,用於將 POJOs 和記錄序列化和反序列化到和從 JSON 中。 當使用 SqsTemplate、<@em>SqsListener 標註或手動實例化 SqsMessageListenerContainer 時,它會完成此操作。

我們的第一個用例是發送和接收一個簡單的 POJO 以説明這種默認配置。 我們將使用 @SqsListener 標註來接收消息,並利用 Spring Boot 的自動配置來在必要時配置反序列化。

首先,我們將創建用於發送消息的測試。

@SpringBootTest
public class ShipmentServiceApplicationLiveTest extends BaseSqsLiveTest {

    @Autowired
    private SqsTemplate sqsTemplate;

    @Autowired
    private ShipmentService shipmentService;

    @Autowired
    private ShipmentEventsQueuesProperties queuesProperties;

    @Test
    void givenPojoPayload_whenMessageReceived_thenDeserializesCorrectly() {
        UUID orderId = UUID.randomUUID();
        ShipmentRequestedEvent shipmentRequestedEvent = new ShipmentRequestedEvent(orderId, "123 Main St", LocalDate.parse("2024-05-12"));

        sqsTemplate.send(queuesProperties.getSimplePojoConversionQueue(), shipmentRequestedEvent);

        await().atMost(Duration.ofSeconds(10))
            .untilAsserted(() -> {
                Shipment shipment = shipmentService.getShipment(orderId);
                assertThat(shipment).isNotNull();
                assertThat(shipment).usingRecursiveComparison()
                  .ignoringFields("status")
                  .isEqualTo(shipmentRequestedEvent);
                assertThat(shipment
                  .getStatus()).isEqualTo(ShipmentStatus.PROCESSED);
            });
    }
}

在這裏,我們創建事件,並使用自動配置的 SqsTemplate 將其發送到隊列,然後等待狀態變為 PROCESSED。這表明消息已成功接收並處理。

當測試觸發時,由於我們還沒有監聽器,所以在 10 秒後就會失敗。

為了解決這個問題,我們創建第一個 @SqsListener

@Component
public class ShipmentRequestListener {

    private final ShipmentService shippingService;

    public ShipmentRequestListener(ShipmentService shippingService) {
        this.shippingService = shippingService;
    }

    @SqsListener("${events.queues.shipping.simple-pojo-conversion-queue}")
    public void receiveShipmentRequest(ShipmentRequestedEvent shipmentRequestedEvent) {
        shippingService.processShippingRequest(shipmentRequestedEvent.toDomain());
    }

}

當我們再次運行測試時,經過一段時間後通過了。

值得注意的是,監聽器具有 註解,並且我們正在引用在 文件中設置的隊列名稱。

此示例展示了 Spring Cloud AWS 如何能夠直接處理 POJO 轉換,這與 Java 記錄同樣有效。

7. 配置自定義對象映射器

使用消息轉換的常見用例是在應用程序特定配置下設置自定義 <em>ObjectMapper</em>

在下一個場景中,我們將使用一個 ObjectMapper,並配置一個 LocalDateDeserializer 以讀取日期,日期格式為 "dd-MM-yyyy"

再次,我們首先會創建測試場景。在本例中,我們將通過框架自動配置的 <em>SqsAsyncClient</em> 直接發送原始 JSON 負載:

    @Autowired
    private SqsAsyncClient sqsAsyncClient;

    @Test
    void givenShipmentRequestWithCustomDateFormat_whenMessageReceived_thenDeserializesDateCorrectly() {
        UUID orderId = UUID.randomUUID();
        String shipBy = LocalDate.parse("2024-05-12")
          .format(DateTimeFormatter.ofPattern("dd-MM-yyyy"));

        var jsonMessage = """
            {
                "orderId": "%s",
                "customerAddress": "123 Main St",
                "shipBy": "%s"
            }
            """.formatted(orderId, shipBy);

        sendRawMessage(queuesProperties.getCustomObjectMapperQueue(), jsonMessage);

        await().atMost(Duration.ofSeconds(10))
          .untilAsserted(() -> {
              var shipment = shipmentService.getShipment(orderId);
              assertThat(shipment).isNotNull();
              assertThat(shipment.getShipBy()).isEqualTo(LocalDate.parse(shipBy, DateTimeFormatter.ofPattern("dd-MM-yyyy")));
          });
    }

    private void sendRawMessage(String queueName, String jsonMessage) {
        sqsAsyncClient.getQueueUrl(req -> req.queueName(queueName))
          .thenCompose(resp -> sqsAsyncClient.sendMessage(req -> req.messageBody(jsonMessage)
              .queueUrl(resp.queueUrl())))
          .join();
    }

讓我們也添加這個隊列的監聽器:

@SqsListener("${events.queues.shipping.custom-object-mapper-queue}")
public void receiveShipmentRequestWithCustomObjectMapper(ShipmentRequestedEvent shipmentRequestedEvent) {
    shippingService.processShippingRequest(shipmentRequestedEvent.toDomain());
}

當我們現在運行測試時,它會失敗,並且在堆棧跟蹤中我們會看到類似的消息:

Cannot deserialize value of type `java.time.LocalDate` from String "12-05-2024"

之所以這樣,是因為我們沒有使用標準的 “yyyy-MM-dd”日期格式。

為了解決這個問題,我們需要配置一個 ObjectMapper,它可以解析這種日期格式。 我們可以簡單地將其聲明為一個 Bean,在一個帶有 @Configuration註解的類中,自動配置將正確地將其設置到自動配置的 SqsTemplate@SqsListener方法中:

@Bean
public ObjectMapper objectMapper() {
    ObjectMapper mapper = new ObjectMapper();
    JavaTimeModule module = new JavaTimeModule();
    LocalDateDeserializer customDeserializer = new LocalDateDeserializer(DateTimeFormatter.ofPattern("dd-MM-yyyy", Locale.getDefault()));
    module.addDeserializer(LocalDate.class, customDeserializer);
    mapper.registerModule(module);
    return mapper;
}

當我們再次運行測試時,結果符合預期。

8. 配置繼承和接口反序列化

另一個常見場景是在有多種子類或實現的情況下,擁有父類或接口。需要告知框架應將消息反序列化到哪個特定類,基於諸如 MessageHeader 或消息的一部分之類的標準。

為了説明這個用例,讓我們為我們的場景增加一些複雜性,幷包含兩種類型的運輸:InternationalShipmentDomesticShipment,它們都是 Shipment 的子類,具有特定的屬性。

8.1. 創建實體和事件

public class InternationalShipment extends Shipment {

    private String destinationCountry;
    private String customsInfo;

    public InternationalShipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status,
        String destinationCountry, String customsInfo) {
        super(orderId, customerAddress, shipBy, status);
        this.destinationCountry = destinationCountry;
        this.customsInfo = customsInfo;
    }

    // ...getters and setters

}

public class DomesticShipment extends Shipment {

    private String deliveryRouteCode;

    public DomesticShipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status,
        String deliveryRouteCode) {
        super(orderId, customerAddress, shipBy, status);
        this.deliveryRouteCode = deliveryRouteCode;
    }

    public String getDeliveryRouteCode() {
        return deliveryRouteCode;
    }

    public void setDeliveryRouteCode(String deliveryRouteCode) {
        this.deliveryRouteCode = deliveryRouteCode;
    }

}

讓我們添加他們的各自事件:

public class DomesticShipmentRequestedEvent extends ShipmentRequestedEvent {

    private String deliveryRouteCode;

    public DomesticShipmentRequestedEvent(){}

    public DomesticShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy, String deliveryRouteCode) {
        super(orderId, customerAddress, shipBy);
        this.deliveryRouteCode = deliveryRouteCode;
    }

    public DomesticShipment toDomain() {
        return new DomesticShipment(getOrderId(), getCustomerAddress(), getShipBy(), ShipmentStatus.REQUESTED, deliveryRouteCode);
    }

    // ...getters and setters

}

public class InternationalShipmentRequestedEvent extends ShipmentRequestedEvent {

    private String destinationCountry;
    private String customsInfo;

    public InternationalShipmentRequestedEvent(){}

    public InternationalShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy, String destinationCountry,
        String customsInfo) {
        super(orderId, customerAddress, shipBy);
        this.destinationCountry = destinationCountry;
        this.customsInfo = customsInfo;
    }

    public InternationalShipment toDomain() {
        return new InternationalShipment(getOrderId(), getCustomerAddress(), getShipBy(), ShipmentStatus.REQUESTED, destinationCountry,
            customsInfo);
    }

    // ...getters and setters

}

8.2. 添加服務和監聽器邏輯

我們將為我們的 Service 添加兩個方法,每個方法用於處理不同類型的貨運:

@Service
public class ShipmentService {

    // ...previous code stays the same

    public void processDomesticShipping(DomesticShipment shipment) {
        logger.info("Processing domestic shipping for order: {}", shipment.getOrderId());
        shipment.setStatus(ShipmentStatus.READY_FOR_DISPATCH);
        shippingRepository.put(shipment.getOrderId(), shipment);
        logger.info("Domestic shipping processed: {}", shipment.getOrderId());
    }

    public void processInternationalShipping(InternationalShipment shipment) {
        logger.info("Processing international shipping for order: {}", shipment.getOrderId());
        shipment.setStatus(ShipmentStatus.CUSTOMS_CHECK);
        shippingRepository.put(shipment.getOrderId(), shipment);
        logger.info("International shipping processed: {}", shipment.getOrderId());
    }

}

現在,讓我們添加處理消息的監聽器。需要注意的是,監聽器方法使用了基類的類型,因為該方法接收來自所有子類型的消息:

@SqsListener(queueNames = "${events.queues.shipping.subclass-deserialization-queue}")
public void receiveShippingRequestWithType(ShipmentRequestedEvent shipmentRequestedEvent) {
    if (shipmentRequestedEvent instanceof InternationalShipmentRequestedEvent event) {
        shippingService.processInternationalShipping(event.toDomain());
    } else if (shipmentRequestedEvent instanceof DomesticShipmentRequestedEvent event) {
        shippingService.processDomesticShipping(event.toDomain());
    } else {
        throw new RuntimeException("Event type not supported " + shipmentRequestedEvent.getClass()
            .getSimpleName());
    }
}

8.3. 使用默認類型頭映射進行反序列化

在場景設置完成後,我們可以創建測試。首先,讓我們為每種類型創建事件:

@Test
void givenPayloadWithSubclasses_whenMessageReceived_thenDeserializesCorrectType() {
    var domesticOrderId = UUID.randomUUID();
    var domesticEvent = new DomesticShipmentRequestedEvent(domesticOrderId, "123 Main St", LocalDate.parse("2024-05-12"), "XPTO1234");
    var internationalOrderId = UUID.randomUUID();
    InternationalShipmentRequestedEvent internationalEvent = new InternationalShipmentRequestedEvent(internationalOrderId, "123 Main St", LocalDate.parse("2024-05-24"), "Canada", "HS Code: 8471.30, Origin: China, Value: $500");

}

繼續在相同的測試方法上,我們現在將發送事件。默認情況下,SqsTemplate 會附帶特定類型的元數據,用於反序列化。 通過利用這一點,我們可以簡單地使用自動配置的 SqsTemplate 發送消息,並且它能夠正確地反序列化消息:

sqsTemplate.send(queuesProperties.getSubclassDeserializationQueue(), internationalEvent);
sqsTemplate.send(queuesProperties.getSubclassDeserializationQueue(), domesticEvent);

最後,我們斷言每個運輸訂單的狀態與它的類型相對應:

await().atMost(Duration.ofSeconds(10))
    .untilAsserted(() -> {
        var domesticShipment = (DomesticShipment) shipmentService.getShipment(domesticOrderId);
        assertThat(domesticShipment).isNotNull();
        assertThat(domesticShipment).usingRecursiveComparison()
          .ignoringFields("status")
          .isEqualTo(domesticEvent);
        assertThat(domesticShipment.getStatus()).isEqualTo(ShipmentStatus.READY_FOR_DISPATCH);

        var internationalShipment = (InternationalShipment) shipmentService.getShipment(internationalOrderId);
        assertThat(internationalShipment).isNotNull();
        assertThat(internationalShipment).usingRecursiveComparison()
          .ignoringFields("status")
          .isEqualTo(internationalEvent);
        assertThat(internationalShipment.getStatus()).isEqualTo(ShipmentStatus.CUSTOMS_CHECK);
    });

當我們現在運行測試,它通過了,這表明每個子類都已正確地反序列化,並具有正確的類型和信息。

8.4. 使用自定義類型頭部映射反序列化

在許多情況下,您可能會接收來自不使用 SqsTemplate 發送消息的服務的消息,或者表示事件的 POJO 或記錄位於不同的包中。

為了模擬這種情況,讓我們在測試方法中創建一個自定義 SqsTemplate,並配置它在沒有在標題中包含類型信息的情況下發送消息。 對於這種情況,我們還需要注入一個能夠序列化 LocalDate 實例的 ObjectMapper,例如我們之前配置的或由 Spring Boot 自動配置的:

@Autowired
private ObjectMapper objectMapper;

var customTemplate = SqsTemplate.builder()
  .sqsAsyncClient(sqsAsyncClient)
  .configureDefaultConverter(converter -> {
        converter.doNotSendPayloadTypeHeader();
        converter.setObjectMapper(objectMapper);
    })
  .build();

customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
  .payload(internationalEvent);

customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
  .payload(domesticEvent);

現在,我們的測試失敗時,錯誤信息類似於堆棧跟蹤中的這些內容,因為框架無法知道應該將其反序列化到哪個具體的類:

Could not read JSON: Unrecognized field "destinationCountry"
Could not read JSON: Unrecognized field "deliveryRouteCode"

為了解決該用例,SqsMessagingMessageConverter 類具有 setPayloadTypeMapper 方法,該方法可用於讓框架根據消息的任意屬性瞭解目標類。對於本次測試,我們將使用自定義標題作為依據。

首先,讓我們將自定義標題配置添加到 application.yml 中:

headers:
  types:
    shipping:
      header-name: SHIPPING_TYPE
      international: INTERNATIONAL
      domestic: DOMESTIC

我們還將創建一個屬性類來存儲這些值:

@ConfigurationProperties(prefix = "headers.types.shipping")
public class ShippingHeaderTypesProperties {

    private String headerName;
    private String international;
    private String domestic;

    // ...getters and setters

}

接下來,讓我們在我們的 Configuration 類中啓用 properties 類:

@EnableConfigurationProperties({ ShipmentEventsQueuesProperties.class, ShippingHeaderTypesProperties.class })
@Configuration
public class ShipmentServiceConfiguration {
   // ...rest of code remains the same
}

我們現在將配置一個自定義的 SqsMessagingMessageConverter,以使用這些頭部信息,並將其設置為 defaultSqsListenerContainerFactory 豆子:

@Bean
public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(ObjectMapper objectMapper) {
    SqsMessagingMessageConverter converter = new SqsMessagingMessageConverter();
    converter.setPayloadTypeMapper(message -> {
        if (!message.getHeaders()
          .containsKey(typesProperties.getHeaderName())) {
            return Object.class;
        }
        String eventTypeHeader = MessageHeaderUtils.getHeaderAsString(message, typesProperties.getHeaderName());
        if (eventTypeHeader.equals(typesProperties.getDomestic())) {
            return DomesticShipmentRequestedEvent.class;
        } else if (eventTypeHeader.equals(typesProperties.getInternational())) {
            return InternationalShipmentRequestedEvent.class;
        }
        throw new RuntimeException("Invalid shipping type");
    });
    converter.setObjectMapper(objectMapper);

    return SqsMessageListenerContainerFactory.builder()
      .sqsAsyncClient(sqsAsyncClient)
      .configure(configure -> configure.messageConverter(converter))
      .build();
}

之後,我們在測試方法中將標題添加到自定義模板中:

customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
  .payload(internationalEvent)
  .header(headerTypesProperties.getHeaderName(), headerTypesProperties.getInternational()));

customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
  .payload(domesticEvent)
  .header(headerTypesProperties.getHeaderName(), headerTypesProperties.getDomestic()));

當我們再次運行測試時,測試通過,驗證了每個事件都正確地反序列化了適當的子類類型。

9. 結論

本文介紹了 Message Conversion 的三種常見使用場景:利用內置設置對 POJO/record 進行序列化和反序列化、使用自定義 ObjectMapper 處理不同日期格式和其他特定配置,以及將消息反序列化到子類/接口實現的不同方式。

我們通過設置本地測試環境並創建實時測試來測試每個場景,以驗證我們的邏輯。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.