1. 概述
Spring Cloud Stream 是建立在 Spring Boot 和 Spring Integration 之上的一個框架,它 幫助您創建基於事件或消息的微服務。
在本文中,我們將介紹 Spring Cloud Stream 的概念和構造,並提供一些簡單的示例。
2. Maven 依賴項
為了開始,我們需要將 Spring Cloud Starter Stream 與 RabbitMQ 消息代理集成 Maven 依賴項添加到我們的 pom.xml 中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>我們還將添加 spring-cloud-stream-test-binder 依賴項,以啓用 JUnit 支持:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>最後,我們將使用 spring-cloud-dependencies 進行依賴管理,並根據 兼容性矩陣 選擇合適的版本:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2024.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>3. 主要概念
微服務架構遵循“智能端點和啞管道”原則。端點之間的通信由如 RabbitMQ 或 Apache Kafka 這樣的消息中間件驅動。 服務通過這些端點或通道發佈領域事件。
讓我們探討構成 Spring Cloud Stream 框架的主要概念,以及我們必須瞭解的關鍵範式,以便構建基於消息的應用程序。
3.1. 構建塊 (Constructs)
通過將 <em >spring-cloud-stream</em> 依賴項添加到類路徑中,我們自動連接到消息代理,並通過 Spring Cloud Stream “綁定器” 進行連接。 此外,我們還可以將 Java 函數定義為 Spring Bean,以處理傳入的消息:
@SpringBootApplication
public class LogEnricherApplication {
public static void main(String[] args) {
SpringApplication.run(LogEnricherApplication.class, args);
}
@Bean
public Function<String, String> enrichLogMessage() {
return value -> "[%s] - %s".formatted("Baeldung", value);
}
}讓我們來查看一下這些概念的定義:
- Bindings — 是一系列 Java 函數,用於處理、轉換或發送消息
- Binder — 消息中間件實現,例如 Kafka 或 RabbitMQ
- Channel — 代表消息中間件與應用程序之間的通信管道
- Message Schemas — 用於消息的序列化和反序列化,這些模式可以靜態地從位置讀取或動態加載,支持領域對象類型的演化
- StreamListeners — 是 Bean 中的消息處理方法,將在從通道接收到消息後自動調用,在 MessageConverter 完成中間件特定事件與領域對象類型或 POJO 之間的序列化/反序列化之後
3.2. 通信模式
指定目的地消息通過 發佈-訂閲 消息模式進行傳遞。發佈者將消息分類到主題中,每個主題都通過名稱進行標識。訂閲者表達對一個或多個主題的興趣。中間件過濾消息,並將感興趣的主題傳遞給訂閲者。
現在,訂閲者可以被分組。一個 消費者組 是由一個或多個訂閲者或消費者組成的,這些訂閲者或消費者通過 組 ID 進行標識,在其中,來自主題或主題分區的消息以負載均衡的方式進行傳遞。
4. 編程模型
本節介紹了構建 Spring Cloud Stream 應用程序的基礎知識。
4.1. 功能測試
該測試支持允許我們利用 binder 實現與 Spring Cloud Stream 渠道的交互:
@EnableTestBinder
@SpringBootTest
class LogEnricherApplicationUnitTest {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
@Test
void whenSendingLogMessage_thenMessageIsEnrichedWithPrefix() {
// ...
}
}讓我們向上述 enrichLogMessage 服務發送一條消息,並檢查響應是否包含文本前綴 “Baeldung”:
@Test
void whenSendingLogMessage_thenItsEnrichedWithPrefix() {
input.send(MessageBuilder.withPayload("hello world").build(), "enrichLogMessage-in-0");
Message<byte[]> message = output.receive(1000L, "enrichLogMessage-out-0");
assertThat(message.getPayload())
.asString()
.isEqualTo("[Baeldung] - hello world");
}如我們所見,輸入綁定使用函數名後跟 “-in-“ 後綴和索引命名。 類似地,輸出綁定使用 “-out-“ 後綴和索引。 “in” 和 “out” 表示綁定的類型,索引通常為單個輸入和輸出函數中為 0,僅適用於具有多個輸入或輸出的函數。
4.2. 綁定目的地
我們可以使用 Spring Cloud Stream 配置將綁定映射到自定義目的地——例如主題或隊列名稱。例如,讓我們更新 application.yml 並使用隊列 “queue.log.messages” 和 “queue.pretty.log.messages” 作為我們服務的輸入和輸出:
spring:
cloud:
stream:
bindings:
enrichLogMessage-in-0:
destination: queue.log.messages
enrichLogMessage-out-0:
destination: queue.pretty.log.messages現在,我們可以更新我們的測試,並使用目標名稱而不是綁定名稱:
@Test
void whenSendingLogMessage_thenItsEnrichedWithPrefix() {
input.send(MessageBuilder.withPayload("hello world").build(), "queue.log.messages");
Message<byte[]> message = output.receive(1000L, "queue.pretty.log.messages");
assertThat(message.getPayload())
.asString()
.isEqualTo("[Baeldung] - hello world");
}因此,我們的應用程序將監聽 “queue.log.messages”,處理這些消息,並將結果發佈到 “queue.pretty.log.messages”。
4.3. 事件路由
在 Spring Cloud Stream 中,事件路由涉及管理事件在源頭和目標之間的流向。其主要目的是根據事件生產者,將事件路由到特定的訂閲者或指定的目標。
例如,假設我們只想對超過十個字符的日誌消息進行增強,而將較短的消息保持不變。要實現這一點,讓我們定義一個返回 <em >Message<String></em > 的函數,從而可以自定義消息元數據:
@Bean
public Function<String, Message<String>> processLogs() {
return log -> {
boolean shouldBeEnriched = log.length() > 10;
// ...
};
}之後,我們將使用 spring.cloud.stream.sendto.destination 鍵在消息頭中指定新的目標目的地。 在我們的例子中,如果需要對日誌進行增強,我們將路由它到 enrichLogMessage-in-0 綁定。 否則,我們將直接將消息發送到輸出隊列:
@Bean
public Function<String, Message<String>> processLogs() {
return log -> {
boolean shouldBeEnriched = log.length() > 10;
String destination = shouldBeEnriched ? "enrichLogMessage-in-0" : "queue.pretty.log.messages";
return MessageBuilder.withPayload(log)
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
};
}現在,我們需要更新配置以啓用事件路由。由於我們聲明瞭多個函數作為 Bean,我們還應該通過 spring.cloud.function.definition 屬性包含它們:
spring:
cloud:
function:
definition: enrichLogMessage;processLogs
stream:
function.routing.enabled: true
bindings:
# ...好的,以下是翻譯後的內容:
這就完成了!讓我們測試我們的代碼——如果我們將 “hello world” 字符串發送到 processLog 綁定,我們應該期望它會被更新併發布到 “queue.pretty.log.message”。
@Test
void whenProcessingLongLogMessage_thenItsEnrichedWithPrefix() {
input.send(MessageBuilder.withPayload("hello world").build(), "processLogs-in-0");
Message<byte[]> message = output.receive(1000L, "queue.pretty.log.messages");
assertThat(message.getPayload())
.asString()
.isEqualTo("[Baeldung] - hello world");
}另一方面,如果我們將一個少於十個字符的字符串發送到 log 中,我們期望它將被髮布到 “queue.pretty.log.message”,而不會進行任何修改:
@Test
void whenProcessingShortLogMessage_thenItsNotEnrichedWithPrefix() {
input.send(MessageBuilder.withPayload("hello").build(), "processLogs-in-0");
Message<byte[]> messgae= output.receive(1000L, "queue.pretty.log.messages");
assertThat(messgae.getPayload())
.asString()
.isEqualTo("hello");
}5. 設置
讓我們設置應用程序,用於處理來自 RabbitMQ 消息代理的消息。
5.1. Binder 配置
如前所述,可以通過將 RabbitMQ 綁定庫添加到類路徑中,包括 此依賴項 來添加綁定庫。但是,如果未提供任何綁定實現,Spring 將使用通道之間的直接消息通信。
5.2. RabbitMQ 配置
為了將示例(參見第 3.1 節)配置為使用 RabbitMQ 綁定,我們需要更新位於 src/main/resources 目錄下的 application.yml 文件。
spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local_rabbit
output:
destination: queue.pretty.log.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: <host>
port: 5672
username: <username>
password: <password>
virtual-host: /input 綁定將使用名為 queue.log.messages 的交換,output 綁定將使用交換 “queue.pretty.log.messages”。 兩個綁定都將使用名為 local_rabbit 的 binder。
請注意,我們不需要在運行應用程序之前創建 RabbitMQ 交換和隊列。 當運行應用程序時,兩個交換都會自動創建。
要測試應用程序,我們可以使用 RabbitMQ 管理站點發布消息。 在交換 “queue.log.messages” 的 發佈消息 面板中,我們需要以 JSON 格式輸入請求。
5.3. 自定義消息轉換
對於這個代碼示例,我們添加另一個綁定——我們可以將其稱為 highlightLogs:
@Bean
Function<LogMessage, String> highlightLogs() {
return logMsg -> logMsg.message().toUpperCase();
}本次,我們的函數接收一個名為 LogMessage 的 Java 對象作為輸入:
record LogMessage(String message) {
}對於此類用例,Spring Cloud Stream 允許我們為特定內容類型應用自定義消息轉換。 讓我們定義一個自定義消息轉換器,用於在 contentType 標頭設置為 text/plain 時反序列化 LogMessage 對象:
@Component
class TextPlainMessageConverter extends AbstractMessageConverter {
public TextPlainMessageConverter() {
super(new MimeType("text", "plain"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (LogMessage.class == clazz);
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
String text = payload instanceof String ? (String) payload : new String((byte[]) payload);
return new LogMessage(text);
}
}最後,我們可以通過發送包含 text/plain 內容類型的消息來測試我們的轉換器。當我們運行測試時,我們期望消息將被處理,並且大寫字符串作為綁定輸出返回:
@Test
void whenHighlightingLogMessage_thenItsTransformedToUppercase() {
Message<String> msgIn = MessageBuilder.withPayload("hello")
.setHeader("contentType", "text/plain")
.build();
input.send(msgIn, "highlightLogs-in-0");
Message<byte[]> msgOut = output.receive(1000L, "highlightLogs-out-0");
assertThat(msgOut.getPayload())
.asString()
.isEqualTo("HELLO");
}5.4. 消費者組
當運行我們的應用程序的多個實例時,每次在輸入通道中收到一條新消息時,所有訂閲者都會收到通知。
通常情況下,我們需要對消息進行處理一次。Spring Cloud Stream 通過消費者組實現這一行為。
為了啓用此行為,每個消費者綁定可以使用 spring.cloud.stream.bindings.<CHANNEL>.group 屬性指定一個組名:
spring:
cloud:
stream:
bindings:
enrichLogMessage-in-0:
destination: queue.log.messages
group: test-group
# ...6. 基於消息的微服務
在本節中,我們將介紹在微服務環境中運行我們的 Spring Cloud Stream 應用程序所需的全部功能。
6.1. 擴展規模
當運行多個應用程序時,確保數據正確地分佈到各個消費者中至關重要。為此,Spring Cloud Stream 提供兩個屬性:
- spring.cloud.stream.instanceCount — 運行的應用程序數量
- spring.cloud.stream.instanceIndex — 當前應用程序的索引
例如,如果我們在上述 MyLoggerServiceApplication 應用程序中部署了兩個實例,則屬性 spring.cloud.stream.instanceCount 對於這兩個應用程序都應為 2,而屬性 spring.cloud.stream.instanceIndex 應該分別設置為 0 和 1。
這些屬性在通過 Spring Data Flow 部署 Spring Cloud Stream 應用程序時會自動設置,如本文所述。
6.2. 分區 (Partitioning)
領域事件可以被視為分區的消息。這在擴展存儲和提高應用程序性能時非常有用。
領域事件通常包含分區鍵,以便它們最終與相關消息位於同一分區中。
例如,如果我們希望將日誌消息按消息的第一個字母進行分區,這將產生兩個分區。
一個分區用於以“A-M”開頭的日誌消息,另一個分區用於“N-Z”開頭的日誌消息。 這可以使用以下兩個屬性進行配置:
- spring.cloud.stream.bindings.output.producer.partitionKeyExpression— 用於分區有效負載的表達式
- spring.cloud.stream.bindings.output.producer.partitionCount— 分組數量
有時,用於分區的表達式過於複雜,無法在一行中編寫。 對於這些情況,可以使用屬性 spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass 來編寫自定義分區策略。
6.3. 健康指示器
在微服務環境中,我們也需要檢測服務是否宕機或開始出現故障。Spring Cloud Stream 提供 management.health.binders.enabled 屬性,用於啓用健康指示器綁定。
當運行應用程序時,我們可以通過 http://<host>:<port>/health 查詢健康狀態。
7. 結論
在本教程中,我們介紹了 Spring Cloud Stream 的主要概念,並通過一些簡單的 RabbitMQ 示例展示瞭如何使用它。有關 Spring Cloud Stream 的更多信息,請參閲 這裏。