1. 簡介
Spring Cloud Data Flow 是一種雲原生編程和運維模型,用於構建可組合的數據微服務。
通過Spring Cloud Data Flow,開發者可以創建和編排數據管道,用於常見的用例,例如數據攝取、實時分析和數據導入/導出。
這些數據管道分為兩種類型:流式數據管道和批量數據管道。
在第一種情況下,無限量的數據通過消息中間件進行消費或生產。在第二種情況下,短壽命的任務處理一個有限的數據集,然後終止。
本文將重點介紹流式處理。
2. 架構概述
該架構的關鍵組件包括 應用程序、數據流服務器、跳過服務器以及目標運行時。
此外,我們通常還會在這架構中包含 數據流殼和消息代理。
讓我們更詳細地瞭解這些組件。
2.1 應用
通常,流式數據管道包括從外部系統消費事件、數據處理和多語言持久化。這些階段通常在 Spring Cloud 術語中被稱為 源 (Source)、處理器 (Processor) 和 匯流器 (Sink)。
- 源: 是消費 事件 的應用程序。
- 處理器: 從 源 消費數據,對其進行處理,並將處理後的數據發送到管道中的下一個應用程序。
- 匯流器: 既可以從 源 或 處理器 消費數據,也可以將數據寫入所需的持久化層。
這些應用程序可以以兩種方式打包:
- 託管在 Maven 倉庫、文件、 HTTP 或其他 Spring 實現中的 Spring Boot uber-jar (本文將使用此方法)。
- Docker
Spring Cloud Data Flow 團隊已經提供了許多用於常見用例的源、處理器和匯流器應用程序(例如 JDBC、 HDFS、 HTTP、路由器)。
2.2 運行時環境
為了使這些應用程序能夠執行,需要配置運行時環境。支持的運行時環境包括:
- Cloud Foundry
- Kubernetes
- 本地服務器(用於本文章的開發,將在本文章中進行使用)
2.3. 數據流服務器
負責將應用程序部署到運行時環境的組件是 數據流服務器。針對每個目標運行時環境,提供相應的 數據流服務器 可執行 JAR 包。
數據流服務器 負責解釋:
- 一個流式 DSL,它描述了數據通過多個應用程序的邏輯流。
- 一個部署清單,它描述了應用程序在運行時環境中的映射關係。
2.4. 跳過服務器 (Skipper Server)
跳過服務器負責:
- 將流部署到一種或多種平台。
- 通過基於狀態機(blue/green)的更新策略,在一種或多種平台上升級和回滾流。
- 存儲每個流的 manifest 文件歷史記錄。
2.5. 數據流殼
數據流殼是數據流服務器的客户端。該殼允許我們執行必要的 DSL 命令,以便與服務器進行交互。
例如,描述從 HTTP 源到 JDBC 目標的數據流的 DSL 將被編寫為“http | jdbc”。這些名稱在 DSL 中已與 數據流服務器 註冊,並映射到可以在 Maven 或 Docker 倉庫中託管的應用程序工件。
Spring 還提供了一個圖形化界面,名為 Flo,用於創建和監控流式數據管道。但是,它的使用不屬於本文討論的內容。
2.6. 消息代理
正如我們在上一節示例中看到的,管道符號用於數據流的定義。管道符號代表通過消息中間件,兩個應用程序之間的通信。
這意味着我們需要在目標環境中啓動消息代理。
支持的消息中間件包括:
- Apache Kafka
- RabbitMQ
現在我們對架構組件有了大致的瞭解——是時候構建我們的第一個流式處理管道。
3. 安裝消息代理
正如我們所見,流水中的應用程序需要消息中間件進行通信。對於本文,我們將使用 RabbitMQ。
有關完整的安裝説明,您可以按照 官方網站上的指示進行操作。
4. 本地數據流服務器和 Skipper 服務器
使用以下命令下載 Spring Cloud Data Flow Server 和 Spring Cloud Skipper Server:
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-server/2.11.2/spring-cloud-dataflow-server-2.11.2.jar
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-skipper-server/2.11.2/spring-cloud-skipper-server-2.11.2.jar
現在您需要啓動構成服務器的應用程序:
Skipper:
在您下載 Skipper 的目錄下,使用 java -jar 命令啓動服務器,如下所示:
java -jar spring-cloud-skipper-server-2.11.2.jar應用程序將在 7577 端口啓動。
數據流:
在另一個終端窗口中,並在您下載數據流的目錄中,使用 java -jar 命令運行服務器,如下所示:
java -jar spring-cloud-dataflow-server-2.11.2.jar應用程序將在 9393 端口啓動。
5. The Data Flow Shell
下載 Spring Cloud Data Shell,請使用以下命令:
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.11.2/spring-cloud-dataflow-shell-2.11.2.jar現在啓動 Spring Cloud Data Flow Shell,請使用以下命令:
java -jar spring-cloud-dataflow-shell-2.11.2.jar在shell運行後,我們可以輸入help命令到提示符中,以查看可以執行的所有完整命令列表。
6. 源代碼應用
在 On Initializr 上,我們將創建一個簡單的應用程序並添加一個 Stream Rabbit 依賴項,具體為 spring-cloud-starter-stream-rabbit:。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>我們隨後將在 Spring Boot 主類中添加 @EnableBinding(Source.class) 註解:
@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeSourceApplication.class, args);
}
}現在我們需要定義需要處理的數據源。該數據源可以是任何潛在的無限工作負載,例如物聯網傳感器數據、7x24小時事件處理或在線交易數據導入。
在我們的示例應用程序中,我們使用一個Poller,每 10 秒產生一個事件(為了簡化,新時間戳)。
@InboundChannelAdapter 註解將消息發送到源的輸出通道,並將返回值作為消息的負載:
@Bean
@InboundChannelAdapter(
value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}
我們的數據源已準備就緒。
7. 處理器應用程序
接下來,我們將創建一個應用程序並添加 Stream Rabbit 依賴項。
然後,我們將向 Spring Boot 主類添加 @EnableBinding(Processor.class) 註解:
@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeProcessorApplication.class, args);
}
}接下來,我們需要定義一個方法來處理來自源應用程序的數據。
為了定義一個轉換器,我們需要使用 @Transformer 註解對該方法進行標註:
@Transformer(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
String date = dateFormat.format(timestamp);
return date;
}它將來自“input”通道的時間戳轉換為格式化的日期,並將該日期發送到“output”通道。
8. Sink 應用程序
最後創建的應用程序是 Sink 應用程序。
再次,前往 Spring Initializr 並選擇一個 Group 和一個 Artifact 名稱。 下載項目後,添加一個 Stream Rabbit 依賴項。
然後,將 @EnableBinding(Sink.class) 註解添加到 Spring Boot 主類中:
@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowLoggingSinkApplication.class, args);
}
}現在我們需要一種方法來攔截來自處理器的消息。
要做到這一點,我們需要將 @StreamListener(Sink.INPUT) 註解添加到我們的方法中:
@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
logger.info("Received: " + date);
}該方法僅將格式化後的時間戳打印到日誌文件中。
9. 註冊流應用
Spring Cloud Data Flow Shell 允許我們使用 app register 命令與應用註冊表註冊流應用。
我們必須提供一個唯一的名稱、應用程序類型和可以解析為應用程序 artifact 的 URI。對於類型,請指定 "source“、"processor“ 或 "sink“。
當提供使用 maven 方案的 URI 時,格式應符合以下要求:
maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>要註冊之前創建的 Source, Processor, 和 Sink 應用程序,請轉到 Spring Cloud Data Flow Shell,並在提示符下執行以下命令:
app register --name time-source --type source --uri file://local machine path to/time-source-0.0.1-SNAPSHOT.jar
app register --name time-processor --type processor --uri file://local machine path to/time-processor-0.0.1-SNAPSHOT.jar
app register --name log-sink --type sink --uri file://local machine path to/log-sink-0.0.1-SNAPSHOT.jar10. 創建並部署流
要創建新的流定義,請轉到 Spring Cloud Data Flow Shell,並執行以下 shell 命令:
stream create --name time-to-log --definition 'time-source | time-processor | log-sink'這定義了一個名為 time-to-log 的流,基於 DSL 表達式 ‘time-source | time-processor | log-sink’。
要部署該流,請執行以下 shell 命令:
stream deploy --name time-to-logData Flow Server 會解析時間源、時間處理器和日誌接收器到 Maven 座標,並利用這些座標啓動流中的時間源、時間處理器和日誌接收器應用。
如果流已正確部署,您將在Data Flow Server 的日誌中看到模塊已啓動並連接在一起:
2024-04-15 15:15:27.153 INFO 23568 — [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager : Getting status for org.springframework.cloud.skipper.domain.Release@4de8f9c3 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:32.156 INFO 23568 — [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager : Getting status for org.springframework.cloud.skipper.domain.Release@4d72121e using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:37.157 INFO 23568 — [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager : Getting status for org.springframework.cloud.skipper.domain.Release@9a35173 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:42.160 INFO 23568 — [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager : Getting status for org.springframework.cloud.skipper.domain.Release@1ec5d911 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v111. 結果審查
在本示例中,源端每秒將當前時間戳發送為消息,處理器對其進行格式化,日誌接收器則使用日誌框架輸出格式化的時間戳。
日誌文件位於 的日誌輸出中,如上所示。
要查看結果,我們可以使用 tail 命令查看日誌:
tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:2112. 結論
本文介紹瞭如何使用 Spring Cloud Data Flow 構建流式數據管道。
此外,我們瞭解了 Source、Processor 和 Sink 應用在流式處理中的作用,以及如何使用 Data Flow Shell 將這些模塊連接到 Data Flow Server。