知識庫 / Spring / Spring Cloud RSS 訂閱

Spring Cloud Data Flow 批處理

Spring Cloud
HongKong
3
02:47 PM · Dec 06 ,2025

1. 概述

在系列文章的第一篇中,我們介紹了 Spring Cloud Data Flow 的架構組件及其如何用於創建流式數據管道。

與流式管道不同,其中處理無限量的數據,批量處理則易於創建短壽命的服務,任務在按需執行

2. 本地數據流服務器和 Shell

本地數據流服務器是一個負責部署應用程序的組件,而數據流 Shell則允許我們使用 DSL 命令與服務器進行交互。

在上一篇文章中,我們使用 Spring Initilizr 將它們都設置為 Spring Boot 應用程序。

通過在服務器的主類中添加 註解,以及在 Shell 的主類中添加 註解,它們就準備好被啓動,只需執行以下操作:

mvn spring-boot:run

服務器將在9393端口啓動,並提供一個 shell,您可以通過提示符與之交互。

您可以在之前的文章中找到關於如何獲取和使用 Local Data Flow Server 以及其 shell 客户端的詳細信息。

3. 批處理應用程序

與服務器和 shell 類似,我們可以使用 Spring Initializr 來設置一個 根 Spring Boot 批處理應用程序。

訪問網站後,只需選擇一個 、一個 藝術品 名稱並從依賴項搜索框中選擇 雲任務

完成這些步驟後,單擊 生成項目 按鈕以開始下載 Maven 藝術品。

該藝術品預先配置幷包含基本代碼。讓我們看看如何對其進行編輯以構建我們的批處理應用程序。

3.1. Maven 依賴

首先,我們添加一些 Maven 依賴。由於這是一個批處理應用程序,我們需要導入來自 Spring Batch 項目 的庫:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

此外,由於 Spring Cloud Task 使用關係型數據庫存儲執行任務的結果,因此我們需要添加對 RDBMS 驅動器的依賴:

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
</dependency>

我們選擇使用 Spring 提供的 H2 內存數據庫。這為我們提供了一種簡單的方法來啓動開發。但是,在生產環境中,您需要配置自己的 DataSource

請注意,工件的版本將繼承自 Spring Boot 的父級 pom.xml 文件。

3.2. 主類

要啓用所需功能,需要將 <em @EnableTask</em><em @EnableBatchProcessing</em> 註解添加到 <em Spring Boot’s</em> 主類中。該類級別的註解告訴 Spring Cloud Task 進行一切啓動:

@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class BatchJobApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchJobApplication.class, args);
    }
}

3.3. 任務配置

最後,讓我們配置一個任務——在本例中,將一個 String 打印到日誌文件:

@Configuration
public class JobConfiguration {

    private static Log logger
      = LogFactory.getLog(JobConfiguration.class);

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
          .start(stepBuilderFactory.get("jobStep1")
          .tasklet(new Tasklet() {
            
              @Override
              public RepeatStatus execute(StepContribution contribution, 
                ChunkContext chunkContext) throws Exception {
                
                logger.info("Job was run");
                return RepeatStatus.FINISHED;
              }
        }).build()).build();
    }
}

關於如何配置和定義作業的詳細信息超出了本文的範圍。欲瞭解更多信息,請參閲我們的《Spring Batch 簡介》文章。

最後,我們的應用程序已準備就緒。讓我們將其安裝到我們的本地 Maven 倉庫中。為此,請進入項目的根目錄並執行以下命令:

mvn clean install

現在是時候將應用程序放入 Data Flow Server 中。

4. 註冊應用程序

要將應用程序註冊到 App 註冊表中,我們需要提供一個唯一的名稱、應用程序類型和可以解析到應用程序工件的 URI。

轉到 Spring Cloud Data Flow Shell,並在提示符下發出命令:

app register --name batch-job --type task 
  --uri maven://com.baeldung.spring.cloud:batch-job:jar:0.0.1-SNAPSHOT

5. 創建任務

可以使用以下命令創建任務定義:

task create myjob --definition batch-job

這會創建一個名為 myjob 的任務,指向之前註冊的 batch-job 應用程序。

可以使用以下命令獲取當前任務定義的列表:

task list

6. 啓動任務

要啓動任務,可以使用以下命令:

task launch myjob

一旦任務啓動,任務的狀態將存儲在關係型數據庫中。我們可以使用以下命令檢查任務執行的狀態:

task execution list

7. 結果審查

在本示例中,任務僅打印字符串到日誌文件中。日誌文件位於 <em Data Flow Server 的日誌輸出中顯示的目錄下。

要查看結果,可以使用“tail”命令查看日誌:

tail -f PATH_TO_LOG\spring-cloud-dataflow-2385233467298102321\myjob-1472827120414\myjob
[...] --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [jobStep1]
[...] --- [main] o.b.spring.cloud.JobConfiguration: Job was run
[...] --- [main] o.s.b.c.l.support.SimpleJobLauncher:
  Job: [SimpleJob: [name=job]] completed with the following parameters: 
    [{}] and the following status: [COMPLETED]

8. 結論

在本文中,我們展示瞭如何通過使用 Spring Cloud Data Flow 來處理批量處理。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.