1. 概述
消息轉換是指在應用程序傳輸和接收消息時,將消息轉換成不同格式和表示形式的過程。
AWS SQS 允許使用文本負載,Spring Cloud AWS SQS 集成則提供熟悉的 Spring 抽象,用於管理文本負載的序列化和反序列化到和從 POJO 和記錄,默認使用 JSON。
在本教程中,我們將使用事件驅動的場景,探討消息轉換的三種常見用例:POJO/記錄的序列化和反序列化、設置自定義 ObjectMapper 以及反序列化到子類/接口實現。
為了測試這些用例,我們將利用 Spring Cloud AWS SQS V3 介紹文章中的環境和測試設置。
2. 依賴項
首先,導入 Spring Cloud AWS Bill of Materials,該組件管理我們依賴項的版本,確保它們之間版本兼容性:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>${spring-cloud-aws.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
現在,我們可以添加核心和 SQS 啓動依賴項:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
對於本教程,我們將使用 Spring Boot Web starter。特別地,我們未指定版本,因為我們正在導入 Spring Cloud AWS 的 BOM:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
最後,我們添加測試依賴項——LocalStack 和 TestContainers,以及與 JUnit 5 配合使用,該庫用於驗證異步消息消費(awaitility 庫),以及 AssertJ 用於使用流暢 API 處理斷言,除此之外,還包括 Spring Boot 的測試依賴。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>3. 設置本地測試環境
現在我們已經添加了依賴項,接下來我們將通過創建 BaseSqsLiveTest 類來設置我們的測試環境,該類應由我們的測試套件進行擴展:
@Testcontainers
public class BaseSqsLiveTest {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:3.4.0";
@Container
static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
}
}
4. 設置隊列名稱
為了利用 Spring Boot 的配置外部化功能,我們將隊列名稱添加到我們的 <em >application.yml</em> 文件中:
events:
queues:
shipping:
simple-pojo-conversion-queue: shipping_pojo_conversion_queue
custom-object-mapper-queue: shipping_custom_object_mapper_queue
deserializes-subclass: deserializes_subclass_queue
我們現在創建一個帶有@ConfigurationProperties註解的類,我們將注入到我們的測試中以檢索隊列名稱:
@ConfigurationProperties(prefix = "events.queues.shipping")
public class ShipmentEventsQueuesProperties {
private String simplePojoConversionQueue;
private String customObjectMapperQueue;
private String subclassDeserializationQueue;
// ...getters and setters
}
最後,我們向一個 @Configuration 類添加了 @EnableConfigurationProperties 註解:
@EnableConfigurationProperties({ ShipmentEventsQueuesProperties.class })
@Configuration
public class ShipmentServiceConfiguration {
}
5. 設置應用程序
我們將創建一個名為 Shipment 的微服務,該微服務會響應 ShipmentRequestedEvent,以説明我們的用例。
首先,讓我們創建一個名為 Shipment 的實體,該實體將存儲有關貨運的信息:
public class Shipment {
private UUID orderId;
private String customerAddress;
private LocalDate shipBy;
private ShipmentStatus status;
public Shipment(){}
public Shipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status) {
this.orderId = orderId;
this.customerAddress = customerAddress;
this.shipBy = shipBy;
this.status = status;
}
// ...getters and setters
}
接下來,讓我們添加一個 ShipmentStatus 枚舉:
public enum ShipmentStatus {
REQUESTED,
PROCESSED,
CUSTOMS_CHECK,
READY_FOR_DISPATCH,
SENT,
DELIVERED
}我們還需要 ShipmentRequestedEvent:
public class ShipmentRequestedEvent {
private UUID orderId;
private String customerAddress;
private LocalDate shipBy;
public ShipmentRequestedEvent() {
}
public ShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy) {
this.orderId = orderId;
this.customerAddress = customerAddress;
this.shipBy = shipBy;
}
public Shipment toDomain() {
return new Shipment(orderId, customerAddress, shipBy, ShipmentStatus.REQUESTED);
}
// ...getters and setters
}
為了處理我們的貨運,我們將創建一個簡單的 ShipmentService 類,其中包含一個模擬的倉庫,我們將用於驗證我們的測試:
@Service
public class ShipmentService {
private static final Logger logger = LoggerFactory.getLogger(ShipmentService.class);
private final Map<UUID, Shipment> shippingRepository = new ConcurrentHashMap<>();
public void processShippingRequest(Shipment shipment) {
logger.info("Processing shipping for order: {}", shipment.getOrderId());
shipment.setStatus(ShipmentStatus.PROCESSED);
shippingRepository.put(shipment.getOrderId(), shipment);
logger.info("Shipping request processed: {}", shipment.getOrderId());
}
public Shipment getShipment(UUID requestId) {
return shippingRepository.get(requestId);
}
}
6. 處理默認配置下的 POJOs 和記錄
Spring Cloud AWS SQS 預配置了一個 SqsMessagingMessageConverter,用於將 POJOs 和記錄序列化和反序列化到和從 JSON 中。 當使用 SqsTemplate、<@em>SqsListener 標註或手動實例化 SqsMessageListenerContainer 時,它會完成此操作。
我們的第一個用例是發送和接收一個簡單的 POJO 以説明這種默認配置。 我們將使用 @SqsListener 標註來接收消息,並利用 Spring Boot 的自動配置來在必要時配置反序列化。
首先,我們將創建用於發送消息的測試。
@SpringBootTest
public class ShipmentServiceApplicationLiveTest extends BaseSqsLiveTest {
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private ShipmentService shipmentService;
@Autowired
private ShipmentEventsQueuesProperties queuesProperties;
@Test
void givenPojoPayload_whenMessageReceived_thenDeserializesCorrectly() {
UUID orderId = UUID.randomUUID();
ShipmentRequestedEvent shipmentRequestedEvent = new ShipmentRequestedEvent(orderId, "123 Main St", LocalDate.parse("2024-05-12"));
sqsTemplate.send(queuesProperties.getSimplePojoConversionQueue(), shipmentRequestedEvent);
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
Shipment shipment = shipmentService.getShipment(orderId);
assertThat(shipment).isNotNull();
assertThat(shipment).usingRecursiveComparison()
.ignoringFields("status")
.isEqualTo(shipmentRequestedEvent);
assertThat(shipment
.getStatus()).isEqualTo(ShipmentStatus.PROCESSED);
});
}
}
在這裏,我們創建事件,並使用自動配置的 SqsTemplate 將其發送到隊列,然後等待狀態變為 PROCESSED。這表明消息已成功接收並處理。
當測試觸發時,由於我們還沒有監聽器,所以在 10 秒後就會失敗。
為了解決這個問題,我們創建第一個 @SqsListener:
@Component
public class ShipmentRequestListener {
private final ShipmentService shippingService;
public ShipmentRequestListener(ShipmentService shippingService) {
this.shippingService = shippingService;
}
@SqsListener("${events.queues.shipping.simple-pojo-conversion-queue}")
public void receiveShipmentRequest(ShipmentRequestedEvent shipmentRequestedEvent) {
shippingService.processShippingRequest(shipmentRequestedEvent.toDomain());
}
}
當我們再次運行測試時,經過一段時間後通過了。
值得注意的是,監聽器具有 註解,並且我們正在引用在 文件中設置的隊列名稱。
此示例展示了 Spring Cloud AWS 如何能夠直接處理 POJO 轉換,這與 Java 記錄同樣有效。
7. 配置自定義對象映射器
使用消息轉換的常見用例是在應用程序特定配置下設置自定義 <em>ObjectMapper</em>。
在下一個場景中,我們將使用一個 ObjectMapper,並配置一個 LocalDateDeserializer 以讀取日期,日期格式為 "dd-MM-yyyy"。
再次,我們首先會創建測試場景。在本例中,我們將通過框架自動配置的 <em>SqsAsyncClient</em> 直接發送原始 JSON 負載:
@Autowired
private SqsAsyncClient sqsAsyncClient;
@Test
void givenShipmentRequestWithCustomDateFormat_whenMessageReceived_thenDeserializesDateCorrectly() {
UUID orderId = UUID.randomUUID();
String shipBy = LocalDate.parse("2024-05-12")
.format(DateTimeFormatter.ofPattern("dd-MM-yyyy"));
var jsonMessage = """
{
"orderId": "%s",
"customerAddress": "123 Main St",
"shipBy": "%s"
}
""".formatted(orderId, shipBy);
sendRawMessage(queuesProperties.getCustomObjectMapperQueue(), jsonMessage);
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
var shipment = shipmentService.getShipment(orderId);
assertThat(shipment).isNotNull();
assertThat(shipment.getShipBy()).isEqualTo(LocalDate.parse(shipBy, DateTimeFormatter.ofPattern("dd-MM-yyyy")));
});
}
private void sendRawMessage(String queueName, String jsonMessage) {
sqsAsyncClient.getQueueUrl(req -> req.queueName(queueName))
.thenCompose(resp -> sqsAsyncClient.sendMessage(req -> req.messageBody(jsonMessage)
.queueUrl(resp.queueUrl())))
.join();
}
讓我們也添加這個隊列的監聽器:
@SqsListener("${events.queues.shipping.custom-object-mapper-queue}")
public void receiveShipmentRequestWithCustomObjectMapper(ShipmentRequestedEvent shipmentRequestedEvent) {
shippingService.processShippingRequest(shipmentRequestedEvent.toDomain());
}
當我們現在運行測試時,它會失敗,並且在堆棧跟蹤中我們會看到類似的消息:
Cannot deserialize value of type `java.time.LocalDate` from String "12-05-2024"
之所以這樣,是因為我們沒有使用標準的 “yyyy-MM-dd”日期格式。
為了解決這個問題,我們需要配置一個 ObjectMapper,它可以解析這種日期格式。 我們可以簡單地將其聲明為一個 Bean,在一個帶有 @Configuration註解的類中,自動配置將正確地將其設置到自動配置的 SqsTemplate和@SqsListener方法中:
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
JavaTimeModule module = new JavaTimeModule();
LocalDateDeserializer customDeserializer = new LocalDateDeserializer(DateTimeFormatter.ofPattern("dd-MM-yyyy", Locale.getDefault()));
module.addDeserializer(LocalDate.class, customDeserializer);
mapper.registerModule(module);
return mapper;
}
當我們再次運行測試時,結果符合預期。
8. 配置繼承和接口反序列化
另一個常見場景是在有多種子類或實現的情況下,擁有父類或接口。需要告知框架應將消息反序列化到哪個特定類,基於諸如 MessageHeader 或消息的一部分之類的標準。
為了説明這個用例,讓我們為我們的場景增加一些複雜性,幷包含兩種類型的運輸:InternationalShipment 和 DomesticShipment,它們都是 Shipment 的子類,具有特定的屬性。
8.1. 創建實體和事件
public class InternationalShipment extends Shipment {
private String destinationCountry;
private String customsInfo;
public InternationalShipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status,
String destinationCountry, String customsInfo) {
super(orderId, customerAddress, shipBy, status);
this.destinationCountry = destinationCountry;
this.customsInfo = customsInfo;
}
// ...getters and setters
}
public class DomesticShipment extends Shipment {
private String deliveryRouteCode;
public DomesticShipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status,
String deliveryRouteCode) {
super(orderId, customerAddress, shipBy, status);
this.deliveryRouteCode = deliveryRouteCode;
}
public String getDeliveryRouteCode() {
return deliveryRouteCode;
}
public void setDeliveryRouteCode(String deliveryRouteCode) {
this.deliveryRouteCode = deliveryRouteCode;
}
}
讓我們添加他們的各自事件:
public class DomesticShipmentRequestedEvent extends ShipmentRequestedEvent {
private String deliveryRouteCode;
public DomesticShipmentRequestedEvent(){}
public DomesticShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy, String deliveryRouteCode) {
super(orderId, customerAddress, shipBy);
this.deliveryRouteCode = deliveryRouteCode;
}
public DomesticShipment toDomain() {
return new DomesticShipment(getOrderId(), getCustomerAddress(), getShipBy(), ShipmentStatus.REQUESTED, deliveryRouteCode);
}
// ...getters and setters
}
public class InternationalShipmentRequestedEvent extends ShipmentRequestedEvent {
private String destinationCountry;
private String customsInfo;
public InternationalShipmentRequestedEvent(){}
public InternationalShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy, String destinationCountry,
String customsInfo) {
super(orderId, customerAddress, shipBy);
this.destinationCountry = destinationCountry;
this.customsInfo = customsInfo;
}
public InternationalShipment toDomain() {
return new InternationalShipment(getOrderId(), getCustomerAddress(), getShipBy(), ShipmentStatus.REQUESTED, destinationCountry,
customsInfo);
}
// ...getters and setters
}
8.2. 添加服務和監聽器邏輯
我們將為我們的 Service 添加兩個方法,每個方法用於處理不同類型的貨運:
@Service
public class ShipmentService {
// ...previous code stays the same
public void processDomesticShipping(DomesticShipment shipment) {
logger.info("Processing domestic shipping for order: {}", shipment.getOrderId());
shipment.setStatus(ShipmentStatus.READY_FOR_DISPATCH);
shippingRepository.put(shipment.getOrderId(), shipment);
logger.info("Domestic shipping processed: {}", shipment.getOrderId());
}
public void processInternationalShipping(InternationalShipment shipment) {
logger.info("Processing international shipping for order: {}", shipment.getOrderId());
shipment.setStatus(ShipmentStatus.CUSTOMS_CHECK);
shippingRepository.put(shipment.getOrderId(), shipment);
logger.info("International shipping processed: {}", shipment.getOrderId());
}
}
現在,讓我們添加處理消息的監聽器。需要注意的是,監聽器方法使用了基類的類型,因為該方法接收來自所有子類型的消息:
@SqsListener(queueNames = "${events.queues.shipping.subclass-deserialization-queue}")
public void receiveShippingRequestWithType(ShipmentRequestedEvent shipmentRequestedEvent) {
if (shipmentRequestedEvent instanceof InternationalShipmentRequestedEvent event) {
shippingService.processInternationalShipping(event.toDomain());
} else if (shipmentRequestedEvent instanceof DomesticShipmentRequestedEvent event) {
shippingService.processDomesticShipping(event.toDomain());
} else {
throw new RuntimeException("Event type not supported " + shipmentRequestedEvent.getClass()
.getSimpleName());
}
}
8.3. 使用默認類型頭映射進行反序列化
在場景設置完成後,我們可以創建測試。首先,讓我們為每種類型創建事件:
@Test
void givenPayloadWithSubclasses_whenMessageReceived_thenDeserializesCorrectType() {
var domesticOrderId = UUID.randomUUID();
var domesticEvent = new DomesticShipmentRequestedEvent(domesticOrderId, "123 Main St", LocalDate.parse("2024-05-12"), "XPTO1234");
var internationalOrderId = UUID.randomUUID();
InternationalShipmentRequestedEvent internationalEvent = new InternationalShipmentRequestedEvent(internationalOrderId, "123 Main St", LocalDate.parse("2024-05-24"), "Canada", "HS Code: 8471.30, Origin: China, Value: $500");
}
繼續在相同的測試方法上,我們現在將發送事件。默認情況下,SqsTemplate 會附帶特定類型的元數據,用於反序列化。 通過利用這一點,我們可以簡單地使用自動配置的 SqsTemplate 發送消息,並且它能夠正確地反序列化消息:
sqsTemplate.send(queuesProperties.getSubclassDeserializationQueue(), internationalEvent);
sqsTemplate.send(queuesProperties.getSubclassDeserializationQueue(), domesticEvent);
最後,我們斷言每個運輸訂單的狀態與它的類型相對應:
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
var domesticShipment = (DomesticShipment) shipmentService.getShipment(domesticOrderId);
assertThat(domesticShipment).isNotNull();
assertThat(domesticShipment).usingRecursiveComparison()
.ignoringFields("status")
.isEqualTo(domesticEvent);
assertThat(domesticShipment.getStatus()).isEqualTo(ShipmentStatus.READY_FOR_DISPATCH);
var internationalShipment = (InternationalShipment) shipmentService.getShipment(internationalOrderId);
assertThat(internationalShipment).isNotNull();
assertThat(internationalShipment).usingRecursiveComparison()
.ignoringFields("status")
.isEqualTo(internationalEvent);
assertThat(internationalShipment.getStatus()).isEqualTo(ShipmentStatus.CUSTOMS_CHECK);
});
當我們現在運行測試,它通過了,這表明每個子類都已正確地反序列化,並具有正確的類型和信息。
8.4. 使用自定義類型頭部映射反序列化
在許多情況下,您可能會接收來自不使用 SqsTemplate 發送消息的服務的消息,或者表示事件的 POJO 或記錄位於不同的包中。
為了模擬這種情況,讓我們在測試方法中創建一個自定義 SqsTemplate,並配置它在沒有在標題中包含類型信息的情況下發送消息。 對於這種情況,我們還需要注入一個能夠序列化 LocalDate 實例的 ObjectMapper,例如我們之前配置的或由 Spring Boot 自動配置的:
@Autowired
private ObjectMapper objectMapper;
var customTemplate = SqsTemplate.builder()
.sqsAsyncClient(sqsAsyncClient)
.configureDefaultConverter(converter -> {
converter.doNotSendPayloadTypeHeader();
converter.setObjectMapper(objectMapper);
})
.build();
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(internationalEvent);
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(domesticEvent);
現在,我們的測試失敗時,錯誤信息類似於堆棧跟蹤中的這些內容,因為框架無法知道應該將其反序列化到哪個具體的類:
Could not read JSON: Unrecognized field "destinationCountry"
Could not read JSON: Unrecognized field "deliveryRouteCode"
為了解決該用例,SqsMessagingMessageConverter 類具有 setPayloadTypeMapper 方法,該方法可用於讓框架根據消息的任意屬性瞭解目標類。對於本次測試,我們將使用自定義標題作為依據。
首先,讓我們將自定義標題配置添加到 application.yml 中:
headers:
types:
shipping:
header-name: SHIPPING_TYPE
international: INTERNATIONAL
domestic: DOMESTIC
我們還將創建一個屬性類來存儲這些值:
@ConfigurationProperties(prefix = "headers.types.shipping")
public class ShippingHeaderTypesProperties {
private String headerName;
private String international;
private String domestic;
// ...getters and setters
}
接下來,讓我們在我們的 Configuration 類中啓用 properties 類:
@EnableConfigurationProperties({ ShipmentEventsQueuesProperties.class, ShippingHeaderTypesProperties.class })
@Configuration
public class ShipmentServiceConfiguration {
// ...rest of code remains the same
}
我們現在將配置一個自定義的 SqsMessagingMessageConverter,以使用這些頭部信息,並將其設置為 defaultSqsListenerContainerFactory 豆子:
@Bean
public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(ObjectMapper objectMapper) {
SqsMessagingMessageConverter converter = new SqsMessagingMessageConverter();
converter.setPayloadTypeMapper(message -> {
if (!message.getHeaders()
.containsKey(typesProperties.getHeaderName())) {
return Object.class;
}
String eventTypeHeader = MessageHeaderUtils.getHeaderAsString(message, typesProperties.getHeaderName());
if (eventTypeHeader.equals(typesProperties.getDomestic())) {
return DomesticShipmentRequestedEvent.class;
} else if (eventTypeHeader.equals(typesProperties.getInternational())) {
return InternationalShipmentRequestedEvent.class;
}
throw new RuntimeException("Invalid shipping type");
});
converter.setObjectMapper(objectMapper);
return SqsMessageListenerContainerFactory.builder()
.sqsAsyncClient(sqsAsyncClient)
.configure(configure -> configure.messageConverter(converter))
.build();
}
之後,我們在測試方法中將標題添加到自定義模板中:
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(internationalEvent)
.header(headerTypesProperties.getHeaderName(), headerTypesProperties.getInternational()));
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(domesticEvent)
.header(headerTypesProperties.getHeaderName(), headerTypesProperties.getDomestic()));
當我們再次運行測試時,測試通過,驗證了每個事件都正確地反序列化了適當的子類類型。
9. 結論
本文介紹了 Message Conversion 的三種常見使用場景:利用內置設置對 POJO/record 進行序列化和反序列化、使用自定義 ObjectMapper 處理不同日期格式和其他特定配置,以及將消息反序列化到子類/接口實現的不同方式。
我們通過設置本地測試環境並創建實時測試來測試每個場景,以驗證我們的邏輯。