知識庫 / Spring / Spring Cloud RSS 訂閱

Spring Cloud Data Flow 與 Apache Spark

Spring Cloud
HongKong
5
01:24 PM · Dec 06 ,2025

1. 簡介

Spring Cloud Data Flow 是構建數據集成和實時數據處理管道的工具包。

在此,管道指的是使用 Spring Cloud StreamSpring Cloud Task 框架構建的 Spring Boot 應用程序。

在本教程中,我們將演示如何使用 Spring Cloud Data Flow 與 Apache Spark 結合使用。

2. 本地數據流服務器

首先,我們需要運行數據流服務器,以便部署我們的任務。

要本地運行數據流服務器,我們需要創建一個新的項目,並添加 spring-cloud-starter-dataflow-server-local 依賴項

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

之後,我們需要在服務器的主類中添加 @EnableDataFlowServer</em/> 註解:

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

運行此應用程序後,我們將擁有一個本地的數據流服務器,監聽端口 9393。

3. 創建項目

我們將創建一個 Spark 作業,作為獨立本地應用程序運行,這樣就不需要任何集羣來運行它。

3.1. 依賴項

首先,我們將添加 Spark 依賴項

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.4.0</version>
</dependency>

3.2. 創建工作項

對於我們的工作項,我們來近似計算圓周率:

public class PiApproximation {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");
        JavaSparkContext context = new JavaSparkContext(conf);
        int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
        int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;

        List<Integer> xs = IntStream.rangeClosed(0, n)
          .mapToObj(element -> Integer.valueOf(element))
          .collect(Collectors.toList());

        JavaRDD<Integer> dataSet = context.parallelize(xs, slices);

        JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
           double x = Math.random() * 2 - 1;
           double y = Math.random() * 2 - 1;
           return (x * x + y * y ) < 1 ? 1: 0;
        });

        int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);

        System.out.println("The pi was estimated as:" + count / n);

        context.stop();
    }
}

4. 數據流殼 (Data Flow Shell)

數據流殼 (Data Flow Shell) 是一款應用程序,將 使我們能夠與服務器進行交互。殼 (Shell) 使用 DSL 命令來描述數據流。

要使用數據流殼,我們需要創建一個項目,以便運行它。首先,我們需要添加 spring-cloud-dataflow-shell 依賴項

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

在添加依賴後,我們可以創建運行我們的數據流殼的類:

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
     
    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

5. 項目部署

為了部署我們的項目,我們將使用 Apache Spark 中提供的“任務運行器”,該工具有三個版本:clusteryarnclient。 我們將使用本地 client 版本。

任務運行器是運行我們的 Spark 作業的工具。

要執行此操作,首先需要使用 Data Flow Shell 註冊我們的任務

app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT

該任務允許我們指定多個不同的參數,其中一些是可選的,但另一些參數對於正確部署 Spark 作業是必需的:

  • spark.app-class, 提交作業的主類
  • spark.app-jar, 包含作業的胖 JAR 的路徑
  • spark.app-name, 作業使用的名稱
  • spark.app-args, 將傳遞給作業的參數

我們可以使用註冊的任務spark-client 提交我們的作業,請務必提供所需的參數:

task create spark1 --definition "spark-client \
  --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
  --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

請注意,spark.app-jar 是包含我們作業的胖 JAR 的路徑。

在任務成功創建後,我們可以使用以下命令運行它:

task launch spark1

這將觸發我們的任務執行。

6. 概述

在本教程中,我們演示瞭如何使用 Spring Cloud Data Flow 框架處理數據,並結合 Apache Spark。有關 Spring Cloud Data Flow 框架的更多信息,請參閲 文檔

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.