知識庫 / Spring / Spring Cloud RSS 訂閱

Spring Cloud Data Flow 中的 ETL

Spring Cloud
HongKong
5
01:38 PM · Dec 06 ,2025

1. 概述

Spring Cloud Data Flow 是一款雲原生工具包,用於構建實時數據管道和批量處理流程。 Spring Cloud Data Flow 適用於各種數據處理用例,例如簡單的導入/導出、ETL 處理、事件流和預測分析。

在本教程中,我們將學習一個實時提取、轉換和加載 (ETL) 的示例,該示例使用流式管道從 JDBC 數據庫提取數據,將其轉換為簡單的 POJO,並將其加載到 MongoDB 中。

2. ETL 與事件流處理

ETL(提取、轉換和加載)通常被認為是批量加載數據從多個數據庫和系統到通用數據倉庫的過程。在這些數據倉庫中,可以進行大量的分析處理,而不會影響系統的整體性能。

然而,新的趨勢正在改變這種做法。ETL 在將數據傳輸到數據倉庫和數據湖中仍然發揮着作用。

如今,這可以通過 事件流架構中的流來實現,藉助 Spring Cloud Data Flow

3. Spring Cloud Data Flow

使用 Spring Cloud Data Flow (SCDF),開發者可以創建數據管道,分為兩種模式:

  1. 使用 Spring Cloud Stream 創建持久運行的實時流式應用
  2. 使用 Spring Cloud Task 創建短生命週期的批量任務應用

在本文中,我們將介紹第一種模式,即基於 Spring Cloud Stream 的持久運行的流式應用。

3.1. Spring Cloud Stream 應用

SCDF 流式管道由步驟組成,其中每個步驟都是使用 Spring Boot 風格構建的 Spring Cloud Stream 微框架應用程序。 這些應用程序通過消息中間件(如 Apache Kafka 或 RabbitMQ)集成。

這些應用程序被分為源、處理器和匯(sink)三種類型。與 ETL 流程相比,我們可以説源是“提取”,處理器是“轉換”,匯是“加載”部分。

在某些情況下,我們可以在一個或多個步驟的管道中使用應用程序啓動器。這意味着我們不需要為步驟實現新的應用程序,而是配置現有的應用程序啓動器。

應用程序啓動器的列表可以在這裏找到這裏

3.2. Spring Cloud Data Flow 服務器

Spring Cloud Data Flow 服務器是架構的最後一塊。SCDF 服務器使用 Spring Cloud Deployer Specification 進行應用程序和流水線的部署。該規範支持 SCDF 通過部署到各種現代運行時(如 Kubernetes、Apache Mesos、Yarn 和 Cloud Foundry)來實現雲原生版本。

此外,我們還可以將流水線作為本地部署運行。

關於 SCDF 架構的更多信息,請參見此處。

4. 環境搭建

在開始之前,我們需要選擇這個複雜部署的各個部分。首先需要定義的組件是 SCDF Server。

對於測試,我們將使用 SCDF Server Local 進行本地開發。對於生產部署,我們可以稍後選擇雲原生運行時,例如 SCDF Server Kubernetes。 完整的服務器運行時列表可以在這裏找到 這裏

現在,讓我們檢查運行此服務器的系統要求。

4.1. 系統要求

為了運行 SCDF 服務器,我們需要定義和設置兩個依賴項:

  • 消息中間件,
  • 關係型數據庫管理系統 (RDBMS)。

對於消息中間件,我們將使用 RabbitMQ,並將 PostgreSQL 作為 RDBMS 用於存儲我們的管道流定義。

為了運行 RabbitMQ,請從 這裏 下載最新版本,並使用默認配置或運行以下 Docker 命令:

docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

作為最後設置步驟,請安裝並運行 PostgreSQL RDBMS 在默認端口 5432 上。之後,創建一個數據庫,讓 SCDF 可以使用以下腳本存儲其流定義:

CREATE DATABASE dataflow;

4.2. 本地 SCDF 數據流服務器

為了運行本地 SCDF 服務器,可以選擇使用 <a href="https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#getting-started-deploying-spring-cloud-dataflow-docker">docker-compose</a> 啓動服務器,或者將其作為 Java 應用程序運行。

以下我們以 Java 應用程序的方式運行本地 SCDF 服務器。 為了配置應用程序,需要將配置定義為 Java 應用程序參數。 系統路徑需要 Java 8。

為了託管 jar 文件和依賴項,需要為本地 SCDF 服務器創建一個 home 文件夾,並將本地 SCDF 服務器的最新版本下載到該文件夾中。 您可以在這裏下載本地 SCDF 服務器的最新版本 這裏

另外,需要創建一個 lib 文件夾並將其中的 JDBC 驅動程序放入其中。 PostgreSQL 驅動程序的最新版本可以在這裏下載 這裏

最後,讓我們運行本地 SCDF 服務器:

$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \
    --spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \
    --spring.datasource.username=postgres_username \
    --spring.datasource.password=postgres_password \
    --spring.datasource.driver-class-name=org.postgresql.Driver \
    --spring.rabbitmq.host=127.0.0.1 \
    --spring.rabbitmq.port=5672 \
    --spring.rabbitmq.username=guest \
    --spring.rabbitmq.password=guest

我們可以通過查看以下 URL 來檢查其運行狀態:

http://localhost:9393/dashboard

4.3. Spring Cloud Data Flow Shell

Spring Cloud Data Flow Shell 是一個 命令行工具,可簡化應用程序和流程的組合和部署。這些 Shell 命令在 Spring Cloud Data Flow Server 的 REST API 上運行。

將最新版本的 jar 下載到您的 SCDF 目錄中,該目錄位於 這裏。 下載完成後,請運行以下命令(請根據需要更新版本):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>

如果而不是“dataflow:>”,而是得到“server-unknown:>”在最後一行,則説明您沒有在localhost上運行SCDF Server。在這種情況下,請運行以下命令以連接到另一個主機:

server-unknown:>dataflow config server http://{host}

現在,Shell 已連接到 SCDF 服務器,我們可以運行我們的命令。

在 Shell 中首先需要執行的操作是導入應用程序啓動器。請查閲 RabbitMQ+Maven 在 Spring Boot 2.0.x 中的最新版本 這裏,並運行以下命令(請再次更新版本,例如“Darwin-SR1”,如果需要):

$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven

要檢查已安裝的應用程序,請運行以下 Shell 命令:

$ dataflow:> app list

因此,我們應該看到一個包含所有已安裝應用程序的表格。

此外,SCDF 提供了一個圖形界面,名為Flo,可以通過以下地址訪問:http://localhost:9393/dashboard。然而,該工具的使用不屬於本文檔的範圍。

5. 構建 ETL 管道

現在,讓我們創建我們的流式管道。為此,我們將使用 JDBC 源應用程序啓動器從我們的關係數據庫中提取信息。

此外,我們還將創建一個自定義處理器來轉換信息結構,以及一個自定義目標以將數據加載到 MongoDB 中。

5.1. 從關係數據庫中提取數據 – 準備關係數據庫

讓我們創建一個名為 crm 的數據庫,以及一個名為 customer 的表。

CREATE DATABASE crm;
CREATE TABLE customer (
    id bigint NOT NULL,
    imported boolean DEFAULT false,
    customer_name character varying(50),
    PRIMARY KEY(id)
)

請注意,我們使用了一個標誌 imported,用於存儲哪些記錄已導入。如果需要,我們也可以將此信息存儲在另一個表中。

現在,讓我們插入一些數據:

INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);

5.2. 轉換 – 映射 JDBC 字段到 MongoDB 字段結構

對於轉換步驟,我們將從源表中將字段 customer_name 翻譯為新的字段 name。 可以在這裏進行其他轉換,但為了使示例簡短,我們保持不變。

要做到這一點,我們將創建一個名為 customer-transform 的新項目。 最簡單的方法是使用 Spring Initializr 網站創建項目。 在訪問網站後,選擇一個 Group 和 Artifact 名稱。 我們將分別使用 com.customercustomer-transform

完成這些步驟後,單擊“生成項目”按鈕下載項目。 然後,解壓縮項目並將其導入您最喜歡的 IDE 中,並將以下依賴項添加到 pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

現在,我們將開始編寫字段名稱轉換的代碼。為此,我們將創建 Customer 類作為適配器。該類將通過 setName() 方法接收 customer_name,並通過 getName 方法輸出其值。

@JsonProperty 註解將在從 JSON 轉換為 Java 時進行轉換:

public class Customer {

    private Long id;

    private String name;

    @JsonProperty("customer_name")
    public void setName(String name) {
        this.name = name;
    }

    @JsonProperty("name")
    public String getName() {
        return name;
    }

    // Getters and Setters
}

處理器需要接收來自輸入的數據,進行轉換,並將結果綁定到輸出通道。我們來創建一個類來實現這個功能:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;

@EnableBinding(Processor.class)
public class CustomerProcessorConfiguration {

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Customer convertToPojo(Customer payload) {

        return payload;
    }
}

在上述代碼中,我們可以觀察到轉換是自動發生的。輸入接收數據並將其作為JSON進行解析,然後使用Customer對象中的set方法進行解析。

相反,輸出則使用get方法將數據序列化為JSON。

5.3. 加載 – Sink 在 MongoDB

類似於轉換步驟,我們將創建一個新的 Maven 項目,現在項目名稱為 customer-mongodb-sink。 再次訪問 Spring Initializr,選擇 Group 為 com.customer,選擇 Artifact 為 customer-mongodb-sink。 然後,在依賴項搜索框中輸入 “MongoDB” 並下載項目。

接下來,解壓縮並將其導入到您最喜歡的 IDE 中。

然後,添加與 customer-transform 項目相同的額外依賴項。

現在,我們將創建一個新的 Customer 類,用於在這一步接收輸入:

import org.springframework.data.mongodb.core.mapping.Document;

@Document(collection="customer")
public class Customer {

    private Long id;
    private String name;

    // Getters and Setters
}

為了將客户(Customer)保存到數據庫,我們將創建一個 Listener 類,該類將使用 CustomerRepository 保存客户實體:

@EnableBinding(Sink.class)
public class CustomerListener {

    @Autowired
    private CustomerRepository repository;

    @StreamListener(Sink.INPUT)
    public void save(Customer customer) {
        repository.save(customer);
    }
}

並且,CustomerRepository 在這裏是一個 MongoRepository,來自 Spring Data:

import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface CustomerRepository extends MongoRepository<Customer, Long> {

}

5.4. 流定義

現在,兩個自定義應用程序已準備好在 SCDF 服務器上進行註冊。 要完成此操作,請使用 Maven 命令 mvn install 編譯這兩個項目。

然後,我們使用 Spring Cloud Data Flow Shell 註冊它們:

app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT

最後,讓我們檢查應用程序是否存儲在 SCDF 中,在 shell 中運行應用程序列表命令:

app list

因此,我們應該在結果表中看到這兩個應用程序。

5.4.1. 流式管道領域特定語言 – DSL

一個 DSL 定義了應用程序之間的配置和數據流。 SCDF DSL 簡潔易懂。 在第一個詞中,我們定義應用程序的名稱,然後是配置。

此外,語法受到 Unix 的啓發,採用管道語法(也稱為“管道”)來連接多個應用程序:管道語法

http --port=8181 | log

這會創建一個在 8181 端口上運行的 HTTP 應用,將接收到的任何 body 負載發送到日誌中。

現在,讓我們看看如何創建 JDBC Source 的 DSL 流定義。

5.4.2. JDBC 源流定義

JDBC 源的關鍵配置包括 查詢語句更新語句查詢語句 將選擇未讀記錄,而 更新語句 將更改標誌以防止當前記錄被重新讀取。

此外,我們將定義 JDBC 源以 30 秒的固定延遲進行輪詢,並最多輪詢 1000 行記錄。 最後,我們將定義連接配置,例如驅動程序、用户名、密碼和連接 URL:

jdbc 
    --query='SELECT id, customer_name FROM public.customer WHERE imported = false'
    --update='UPDATE public.customer SET imported = true WHERE id in (:id)'
    --max-rows-per-poll=1000
    --fixed-delay=30 --time-unit=SECONDS
    --driver-class-name=org.postgresql.Driver
    --url=jdbc:postgresql://localhost:5432/crm
    --username=postgres
    --password=postgres

更多 JDBC Source 配置屬性可以在這裏找到:這裏

5.4.3. 客户 MongoDB 接收流定義

由於我們未在 application.properties 中定義 customer-mongodb-sink 的連接配置,我們將通過 DSL 參數進行配置。

我們的應用程序完全基於 MongoDataAutoConfiguration。 您可以查看其他可能的配置 這裏。 基本上,我們將定義 spring.data.mongodb.uri

customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main

5.4.4. 創建並部署流

首先,要創建最終流定義,請返回 Shell 並執行以下命令(沒有換行符,它們僅用於提高可讀性):

stream create --name jdbc-to-mongodb 
  --definition "jdbc 
  --query='SELECT id, customer_name FROM public.customer WHERE imported=false' 
  --fixed-delay=30 
  --max-rows-per-poll=1000 
  --update='UPDATE customer SET imported=true WHERE id in (:id)' 
  --time-unit=SECONDS 
  --password=postgres 
  --driver-class-name=org.postgresql.Driver 
  --username=postgres 
  --url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink 
  --spring.data.mongodb.uri=mongodb://localhost/main"

此流DSL定義了一個名為 jdbc-to-mongodb 的流。接下來,我們將通過其名稱部署該流:

stream deploy --name jdbc-to-mongodb

最後,我們應該在日誌輸出中看到所有可用的日誌的位置:

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. 結論

在本文中,我們詳細演示了使用 Spring Cloud Data Flow 構建 ETL 數據管道的一個完整示例。

值得注意的是,我們配置了應用程序啓動器,使用 Spring Cloud Data Flow Shell 構建 ETL 流式管道,並實現了自定義應用程序,用於讀取、轉換和寫入數據。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.