知識庫 / Spring / Spring Cloud RSS 訂閱

使用 Spring Cloud Data Flow 進行流處理入門

Spring Cloud
HongKong
9
02:47 PM · Dec 06 ,2025

1. 簡介

Spring Cloud Data Flow 是一種雲原生編程和運維模型,用於構建可組合的數據微服務。

通過Spring Cloud Data Flow,開發者可以創建和編排數據管道,用於常見的用例,例如數據攝取、實時分析和數據導入/導出。

這些數據管道分為兩種類型:流式數據管道和批量數據管道。

在第一種情況下,無限量的數據通過消息中間件進行消費或生產。在第二種情況下,短壽命的任務處理一個有限的數據集,然後終止。

本文將重點介紹流式處理。

2. 架構概述

該架構的關鍵組件包括 應用程序數據流服務器跳過服務器以及目標運行時。

此外,我們通常還會在這架構中包含 數據流殼消息代理

讓我們更詳細地瞭解這些組件。

2.1 應用

通常,流式數據管道包括從外部系統消費事件、數據處理和多語言持久化。這些階段通常在 Spring Cloud 術語中被稱為 (Source)、處理器 (Processor) 和 匯流器 (Sink)。

  1. 源: 是消費 事件 的應用程序。
  2. 處理器: 消費數據,對其進行處理,並將處理後的數據發送到管道中的下一個應用程序。
  3. 匯流器: 既可以從 處理器 消費數據,也可以將數據寫入所需的持久化層。

這些應用程序可以以兩種方式打包:

  • 託管在 Maven 倉庫、文件、 HTTP 或其他 Spring 實現中的 Spring Boot uber-jar (本文將使用此方法)。
  • Docker

Spring Cloud Data Flow 團隊已經提供了許多用於常見用例的源、處理器和匯流器應用程序(例如 JDBC、 HDFS、 HTTP、路由器)。

2.2 運行時環境

為了使這些應用程序能夠執行,需要配置運行時環境。支持的運行時環境包括:

  • Cloud Foundry
  • Kubernetes
  • 本地服務器(用於本文章的開發,將在本文章中進行使用)

2.3. 數據流服務器

負責將應用程序部署到運行時環境的組件是 數據流服務器。針對每個目標運行時環境,提供相應的 數據流服務器 可執行 JAR 包。

數據流服務器 負責解釋:

  • 一個流式 DSL,它描述了數據通過多個應用程序的邏輯流。
  • 一個部署清單,它描述了應用程序在運行時環境中的映射關係。

2.4. 跳過服務器 (Skipper Server)

跳過服務器負責:

  • 將流部署到一種或多種平台。
  • 通過基於狀態機(blue/green)的更新策略,在一種或多種平台上升級和回滾流。
  • 存儲每個流的 manifest 文件歷史記錄。

2.5. 數據流殼

數據流殼是數據流服務器的客户端。該殼允許我們執行必要的 DSL 命令,以便與服務器進行交互。

例如,描述從 HTTP 源到 JDBC 目標的數據流的 DSL 將被編寫為“http | jdbc”。這些名稱在 DSL 中已與 數據流服務器 註冊,並映射到可以在 Maven 或 Docker 倉庫中託管的應用程序工件。

Spring 還提供了一個圖形化界面,名為 Flo,用於創建和監控流式數據管道。但是,它的使用不屬於本文討論的內容。

2.6. 消息代理

正如我們在上一節示例中看到的,管道符號用於數據流的定義。管道符號代表通過消息中間件,兩個應用程序之間的通信。

這意味着我們需要在目標環境中啓動消息代理。

支持的消息中間件包括:

  • Apache Kafka
  • RabbitMQ

現在我們對架構組件有了大致的瞭解——是時候構建我們的第一個流式處理管道。

3. 安裝消息代理

正如我們所見,流水中的應用程序需要消息中間件進行通信。對於本文,我們將使用 RabbitMQ

有關完整的安裝説明,您可以按照 官方網站上的指示進行操作。

4. 本地數據流服務器和 Skipper 服務器

使用以下命令下載 Spring Cloud Data Flow Server 和 Spring Cloud Skipper Server:

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-server/2.11.2/spring-cloud-dataflow-server-2.11.2.jar 

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-skipper-server/2.11.2/spring-cloud-skipper-server-2.11.2.jar

現在您需要啓動構成服務器的應用程序:

 

Skipper

在您下載 Skipper 的目錄下,使用 java -jar 命令啓動服務器,如下所示:

java -jar spring-cloud-skipper-server-2.11.2.jar

應用程序將在 7577 端口啓動。

 

數據流:

在另一個終端窗口中,並在您下載數據流的目錄中,使用 java -jar 命令運行服務器,如下所示:

java -jar spring-cloud-dataflow-server-2.11.2.jar

應用程序將在 9393 端口啓動。

5. The Data Flow Shell

下載 Spring Cloud Data Shell,請使用以下命令:

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.11.2/spring-cloud-dataflow-shell-2.11.2.jar

現在啓動 Spring Cloud Data Flow Shell,請使用以下命令:

java -jar spring-cloud-dataflow-shell-2.11.2.jar

在shell運行後,我們可以輸入help命令到提示符中,以查看可以執行的所有完整命令列表。

6. 源代碼應用

在 On Initializr 上,我們將創建一個簡單的應用程序並添加一個 Stream Rabbit 依賴項,具體為 spring-cloud-starter-stream-rabbit:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

我們隨後將在 Spring Boot 主類中添加 @EnableBinding(Source.class) 註解:

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeSourceApplication.class, args);
    }
}

現在我們需要定義需要處理的數據源。該數據源可以是任何潛在的無限工作負載,例如物聯網傳感器數據、7x24小時事件處理或在線交易數據導入。

在我們的示例應用程序中,我們使用一個Poller,每 10 秒產生一個事件(為了簡化,新時間戳)。

@InboundChannelAdapter 註解將消息發送到源的輸出通道,並將返回值作為消息的負載:

@Bean
@InboundChannelAdapter(
  value = Source.OUTPUT, 
  poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
    return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}

我們的數據源已準備就緒。

7. 處理器應用程序

接下來,我們將創建一個應用程序並添加 Stream Rabbit 依賴項。

然後,我們將向 Spring Boot 主類添加 @EnableBinding(Processor.class) 註解:

@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeProcessorApplication.class, args);
    }
}

接下來,我們需要定義一個方法來處理來自源應用程序的數據。

為了定義一個轉換器,我們需要使用 @Transformer 註解對該方法進行標註:

@Transformer(inputChannel = Processor.INPUT, 
  outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
    String date = dateFormat.format(timestamp);
    return date;
}

它將來自“input”通道的時間戳轉換為格式化的日期,並將該日期發送到“output”通道。

8. Sink 應用程序

最後創建的應用程序是 Sink 應用程序。

再次,前往 Spring Initializr 並選擇一個 Group 和一個 Artifact 名稱。 下載項目後,添加一個 Stream Rabbit 依賴項。

然後,將 @EnableBinding(Sink.class) 註解添加到 Spring Boot 主類中:

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {

    public static void main(String[] args) {
	SpringApplication.run(
          SpringDataFlowLoggingSinkApplication.class, args);
    }
}

現在我們需要一種方法來攔截來自處理器的消息。

要做到這一點,我們需要將 @StreamListener(Sink.INPUT) 註解添加到我們的方法中:

@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
    logger.info("Received: " + date);
}

該方法僅將格式化後的時間戳打印到日誌文件中。

9. 註冊流應用

Spring Cloud Data Flow Shell 允許我們使用 app register 命令與應用註冊表註冊流應用。

我們必須提供一個唯一的名稱、應用程序類型和可以解析為應用程序 artifact 的 URI。對於類型,請指定 "source“、"processor“ 或 "sink“。

當提供使用 maven 方案的 URI 時,格式應符合以下要求:

maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>

要註冊之前創建的 Source, Processor, 和 Sink 應用程序,請轉到 Spring Cloud Data Flow Shell,並在提示符下執行以下命令:

app register --name time-source --type source --uri file://local machine path to/time-source-0.0.1-SNAPSHOT.jar

app register --name time-processor --type processor --uri file://local machine path to/time-processor-0.0.1-SNAPSHOT.jar

app register --name log-sink --type sink --uri file://local machine path to/log-sink-0.0.1-SNAPSHOT.jar

10. 創建並部署流

要創建新的流定義,請轉到 Spring Cloud Data Flow Shell,並執行以下 shell 命令:

stream create --name time-to-log --definition 'time-source | time-processor | log-sink'

這定義了一個名為 time-to-log 的流,基於 DSL 表達式 ‘time-source | time-processor | log-sink’

要部署該流,請執行以下 shell 命令:

stream deploy --name time-to-log

Data Flow Server 會解析時間源時間處理器日誌接收器到 Maven 座標,並利用這些座標啓動流中的時間源時間處理器日誌接收器應用。

如果流已正確部署,您將在Data Flow Server 的日誌中看到模塊已啓動並連接在一起:

2024-04-15 15:15:27.153  INFO 23568 — [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager        : Getting status for org.springframework.cloud.skipper.domain.Release@4de8f9c3 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:32.156  INFO 23568 — [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager        : Getting status for org.springframework.cloud.skipper.domain.Release@4d72121e using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:37.157  INFO 23568 — [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager        : Getting status for org.springframework.cloud.skipper.domain.Release@9a35173 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:42.160  INFO 23568 — [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager        : Getting status for org.springframework.cloud.skipper.domain.Release@1ec5d911 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1

11. 結果審查

在本示例中,源端每秒將當前時間戳發送為消息,處理器對其進行格式化,日誌接收器則使用日誌框架輸出格式化的時間戳。

日誌文件位於 的日誌輸出中,如上所示。

要查看結果,我們可以使用 tail 命令查看日誌:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. 結論

本文介紹瞭如何使用 Spring Cloud Data Flow 構建流式數據管道。

此外,我們瞭解了 SourceProcessorSink 應用在流式處理中的作用,以及如何使用 Data Flow Shell 將這些模塊連接到 Data Flow Server

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.