知識庫 / Spring / Spring Cloud RSS 訂閱

Spring Cloud Stream 簡介

Spring Cloud
HongKong
5
02:13 PM · Dec 06 ,2025

1. 概述

Spring Cloud Stream 是建立在 Spring Boot 和 Spring Integration 之上的一個框架,它 幫助您創建基於事件或消息的微服務

在本文中,我們將介紹 Spring Cloud Stream 的概念和構造,並提供一些簡單的示例。

2. Maven 依賴項

為了開始,我們需要將 Spring Cloud Starter Stream 與 RabbitMQ 消息代理集成 Maven 依賴項添加到我們的 pom.xml 中:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

我們還將添加 spring-cloud-stream-test-binder 依賴項,以啓用 JUnit 支持:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-binder</artifactId>
    <scope>test</scope>
</dependency>

最後,我們將使用 spring-cloud-dependencies 進行依賴管理,並根據 兼容性矩陣 選擇合適的版本:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2024.0.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

3. 主要概念

微服務架構遵循“智能端點和啞管道”原則。端點之間的通信由如 RabbitMQ 或 Apache Kafka 這樣的消息中間件驅動。 服務通過這些端點或通道發佈領域事件

讓我們探討構成 Spring Cloud Stream 框架的主要概念,以及我們必須瞭解的關鍵範式,以便構建基於消息的應用程序。

3.1. 構建塊 (Constructs)

通過將 <em >spring-cloud-stream</em> 依賴項添加到類路徑中,我們自動連接到消息代理,並通過 Spring Cloud Stream “綁定器” 進行連接。 此外,我們還可以將 Java 函數定義為 Spring Bean,以處理傳入的消息:

@SpringBootApplication
public class LogEnricherApplication {

    public static void main(String[] args) {
        SpringApplication.run(LogEnricherApplication.class, args);
    }

    @Bean
    public Function<String, String> enrichLogMessage() {
        return value -> "[%s] - %s".formatted("Baeldung", value);
    }
}

讓我們來查看一下這些概念的定義:

  • Bindings — 是一系列 Java 函數,用於處理、轉換或發送消息
  • Binder — 消息中間件實現,例如 Kafka 或 RabbitMQ
  • Channel — 代表消息中間件與應用程序之間的通信管道
  • Message Schemas — 用於消息的序列化和反序列化,這些模式可以靜態地從位置讀取或動態加載,支持領域對象類型的演化
  • StreamListeners — 是 Bean 中的消息處理方法,將在從通道接收到消息後自動調用,在 MessageConverter 完成中間件特定事件與領域對象類型或 POJO 之間的序列化/反序列化之後

3.2. 通信模式

指定目的地消息通過 發佈-訂閲 消息模式進行傳遞。發佈者將消息分類到主題中,每個主題都通過名稱進行標識。訂閲者表達對一個或多個主題的興趣。中間件過濾消息,並將感興趣的主題傳遞給訂閲者。

現在,訂閲者可以被分組。一個 消費者組 是由一個或多個訂閲者或消費者組成的,這些訂閲者或消費者通過 組 ID 進行標識,在其中,來自主題或主題分區的消息以負載均衡的方式進行傳遞。

4. 編程模型

本節介紹了構建 Spring Cloud Stream 應用程序的基礎知識。

4.1. 功能測試

該測試支持允許我們利用 binder 實現與 Spring Cloud Stream 渠道的交互:

@EnableTestBinder
@SpringBootTest
class LogEnricherApplicationUnitTest {

    @Autowired
    private InputDestination input;

    @Autowired
    private OutputDestination output;

    @Test
    void whenSendingLogMessage_thenMessageIsEnrichedWithPrefix() {
        // ...
    }
}

讓我們向上述 enrichLogMessage 服務發送一條消息,並檢查響應是否包含文本前綴 “Baeldung”

@Test
void whenSendingLogMessage_thenItsEnrichedWithPrefix() {
    input.send(MessageBuilder.withPayload("hello world").build(), "enrichLogMessage-in-0");

    Message<byte[]> message = output.receive(1000L, "enrichLogMessage-out-0");

    assertThat(message.getPayload())
      .asString()
      .isEqualTo("[Baeldung] - hello world");
}

如我們所見,輸入綁定使用函數名後跟 “-in-“ 後綴和索引命名。 類似地,輸出綁定使用 “-out-“ 後綴和索引。 “in”“out” 表示綁定的類型,索引通常為單個輸入和輸出函數中為 0,僅適用於具有多個輸入或輸出的函數。

4.2. 綁定目的地

我們可以使用 Spring Cloud Stream 配置將綁定映射到自定義目的地——例如主題或隊列名稱。例如,讓我們更新 application.yml 並使用隊列 “queue.log.messages”“queue.pretty.log.messages” 作為我們服務的輸入和輸出:

spring:
  cloud:
    stream:
      bindings:
        enrichLogMessage-in-0:
          destination: queue.log.messages
        enrichLogMessage-out-0:
          destination: queue.pretty.log.messages

現在,我們可以更新我們的測試,並使用目標名稱而不是綁定名稱:

@Test
void whenSendingLogMessage_thenItsEnrichedWithPrefix() {
    input.send(MessageBuilder.withPayload("hello world").build(), "queue.log.messages");

    Message<byte[]> message = output.receive(1000L, "queue.pretty.log.messages");

    assertThat(message.getPayload())
      .asString()
      .isEqualTo("[Baeldung] - hello world");
}

因此,我們的應用程序將監聽 “queue.log.messages”,處理這些消息,並將結果發佈到 “queue.pretty.log.messages”

4.3. 事件路由

在 Spring Cloud Stream 中,事件路由涉及管理事件在源頭和目標之間的流向。其主要目的是根據事件生產者,將事件路由到特定的訂閲者或指定的目標。

例如,假設我們只想對超過十個字符的日誌消息進行增強,而將較短的消息保持不變。要實現這一點,讓我們定義一個返回 <em >Message&lt;String&gt;</em > 的函數,從而可以自定義消息元數據:

@Bean
public Function<String, Message<String>> processLogs() {
    return log -> {
        boolean shouldBeEnriched = log.length() > 10;
        // ...
    };
}

之後,我們將使用 spring.cloud.stream.sendto.destination 鍵在消息頭中指定新的目標目的地。 在我們的例子中,如果需要對日誌進行增強,我們將路由它到 enrichLogMessage-in-0 綁定。 否則,我們將直接將消息發送到輸出隊列:

@Bean
public Function<String, Message<String>> processLogs() {
    return log -> {
        boolean shouldBeEnriched = log.length() > 10;
        String destination = shouldBeEnriched ? "enrichLogMessage-in-0" : "queue.pretty.log.messages";

        return MessageBuilder.withPayload(log)
          .setHeader("spring.cloud.stream.sendto.destination", destination)
          .build();
    };
}

現在,我們需要更新配置以啓用事件路由。由於我們聲明瞭多個函數作為 Bean,我們還應該通過 spring.cloud.function.definition 屬性包含它們:

spring:
  cloud:
    function:
     definition: enrichLogMessage;processLogs
    stream:
      function.routing.enabled: true
      bindings: 
        # ...

好的,以下是翻譯後的內容:

這就完成了!讓我們測試我們的代碼——如果我們將 “hello world” 字符串發送到 processLog 綁定,我們應該期望它會被更新併發布到 “queue.pretty.log.message”

@Test
void whenProcessingLongLogMessage_thenItsEnrichedWithPrefix() {
    input.send(MessageBuilder.withPayload("hello world").build(), "processLogs-in-0");

    Message<byte[]> message = output.receive(1000L, "queue.pretty.log.messages");

    assertThat(message.getPayload())
      .asString()
      .isEqualTo("[Baeldung] - hello world");
}

另一方面,如果我們將一個少於十個字符的字符串發送到 log 中,我們期望它將被髮布到 “queue.pretty.log.message”,而不會進行任何修改:

@Test
void whenProcessingShortLogMessage_thenItsNotEnrichedWithPrefix() {
    input.send(MessageBuilder.withPayload("hello").build(), "processLogs-in-0");

    Message<byte[]> messgae= output.receive(1000L, "queue.pretty.log.messages");

    assertThat(messgae.getPayload())
      .asString()
      .isEqualTo("hello");
}

5. 設置

讓我們設置應用程序,用於處理來自 RabbitMQ 消息代理的消息。

5.1. Binder 配置

如前所述,可以通過將 RabbitMQ 綁定庫添加到類路徑中,包括 此依賴項 來添加綁定庫。但是,如果未提供任何綁定實現,Spring 將使用通道之間的直接消息通信。

5.2. RabbitMQ 配置

為了將示例(參見第 3.1 節)配置為使用 RabbitMQ 綁定,我們需要更新位於 src/main/resources 目錄下的 application.yml 文件。

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
        output:
          destination: queue.pretty.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: 5672
                username: <username>
                password: <password>
                virtual-host: /

input 綁定將使用名為 queue.log.messages 的交換,output 綁定將使用交換 “queue.pretty.log.messages”。 兩個綁定都將使用名為 local_rabbit 的 binder。

請注意,我們不需要在運行應用程序之前創建 RabbitMQ 交換和隊列。 當運行應用程序時,兩個交換都會自動創建

要測試應用程序,我們可以使用 RabbitMQ 管理站點發布消息。 在交換 “queue.log.messages”發佈消息 面板中,我們需要以 JSON 格式輸入請求。

5.3. 自定義消息轉換

對於這個代碼示例,我們添加另一個綁定——我們可以將其稱為 highlightLogs

@Bean
Function<LogMessage, String> highlightLogs() {
    return logMsg -> logMsg.message().toUpperCase();
}

本次,我們的函數接收一個名為 LogMessage 的 Java 對象作為輸入:

record LogMessage(String message) {
}

對於此類用例,Spring Cloud Stream 允許我們為特定內容類型應用自定義消息轉換。 讓我們定義一個自定義消息轉換器,用於在 contentType 標頭設置為 text/plain 時反序列化 LogMessage 對象:

@Component
class TextPlainMessageConverter extends AbstractMessageConverter {

    public TextPlainMessageConverter() {
        super(new MimeType("text", "plain"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (LogMessage.class == clazz);
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        String text = payload instanceof String ? (String) payload : new String((byte[]) payload);
        return new LogMessage(text);
    }
}

最後,我們可以通過發送包含 text/plain 內容類型的消息來測試我們的轉換器。當我們運行測試時,我們期望消息將被處理,並且大寫字符串作為綁定輸出返回:

@Test
void whenHighlightingLogMessage_thenItsTransformedToUppercase() {
    Message<String> msgIn = MessageBuilder.withPayload("hello")
            .setHeader("contentType", "text/plain")
            .build();
    input.send(msgIn, "highlightLogs-in-0");

    Message<byte[]> msgOut = output.receive(1000L, "highlightLogs-out-0");
    assertThat(msgOut.getPayload())
            .asString()
            .isEqualTo("HELLO");
}

5.4. 消費者組

當運行我們的應用程序的多個實例時,每次在輸入通道中收到一條新消息時,所有訂閲者都會收到通知

通常情況下,我們需要對消息進行處理一次。Spring Cloud Stream 通過消費者組實現這一行為。

為了啓用此行為,每個消費者綁定可以使用 spring.cloud.stream.bindings.<CHANNEL>.group 屬性指定一個組名:

spring:
  cloud:
    stream:
      bindings:
        enrichLogMessage-in-0:
          destination: queue.log.messages
          group: test-group
        # ...

6. 基於消息的微服務

在本節中,我們將介紹在微服務環境中運行我們的 Spring Cloud Stream 應用程序所需的全部功能。

6.1. 擴展規模

當運行多個應用程序時,確保數據正確地分佈到各個消費者中至關重要。為此,Spring Cloud Stream 提供兩個屬性:

  • spring.cloud.stream.instanceCount — 運行的應用程序數量
  • spring.cloud.stream.instanceIndex — 當前應用程序的索引

例如,如果我們在上述 MyLoggerServiceApplication 應用程序中部署了兩個實例,則屬性 spring.cloud.stream.instanceCount 對於這兩個應用程序都應為 2,而屬性 spring.cloud.stream.instanceIndex 應該分別設置為 0 和 1。

這些屬性在通過 Spring Data Flow 部署 Spring Cloud Stream 應用程序時會自動設置,如本文所述。

6.2. 分區 (Partitioning)

領域事件可以被視為分區的消息。這在擴展存儲和提高應用程序性能時非常有用。

領域事件通常包含分區鍵,以便它們最終與相關消息位於同一分區中。

例如,如果我們希望將日誌消息按消息的第一個字母進行分區,這將產生兩個分區。

一個分區用於以“A-M”開頭的日誌消息,另一個分區用於“N-Z”開頭的日誌消息。 這可以使用以下兩個屬性進行配置:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression— 用於分區有效負載的表達式
  • spring.cloud.stream.bindings.output.producer.partitionCount— 分組數量

有時,用於分區的表達式過於複雜,無法在一行中編寫。 對於這些情況,可以使用屬性 spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass 來編寫自定義分區策略。

6.3. 健康指示器

在微服務環境中,我們也需要檢測服務是否宕機或開始出現故障。Spring Cloud Stream 提供 management.health.binders.enabled 屬性,用於啓用健康指示器綁定。

當運行應用程序時,我們可以通過 http://<host>:<port>/health 查詢健康狀態。

7. 結論

在本教程中,我們介紹了 Spring Cloud Stream 的主要概念,並通過一些簡單的 RabbitMQ 示例展示瞭如何使用它。有關 Spring Cloud Stream 的更多信息,請參閲 這裏

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.