知識庫 / Spring / Spring Boot RSS 訂閱

如何運行 Spring Batch 中的多個作業

Spring Boot
HongKong
4
10:53 AM · Dec 06 ,2025

1. 簡介

Spring Batch 是一款強大的框架,通過提供可重用的組件和可靠的基礎設施,讓處理大量數據變得輕鬆。在實際應用中,應用程序通常需要同時執行多個任務,按照特定的執行順序運行,以優化性能並有效地管理依賴關係。

在本教程中,我們將探索在 Spring Batch 中運行多個任務的各種方法。

2. 理解 Spring Batch 作業

在 Spring Batch 的上下文中,作業是一個包含一系列步驟的容器,代表整個流程。每個作業都有一個唯一的標識符,可以由多個步驟組成,這些步驟按順序執行或基於某些條件執行。我們可以使用 XML 或 Java 以及 <em >JobLauncher</em> 來配置作業。

運行多個作業在以下場景中具有優勢:

  • 並行處理
  • 數據遷移和 ETL 流程
  • 報告生成等

有效地管理多個作業對於實現最佳性能、可維護性和可擴展性至關重要。讓我們一起探索如何在 Spring Batch 中實現這些目標。

3. 配置

首先,讓我們配置我們的依賴項:

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.3.2</version> 
</dependency> 
<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-web</artifactId> 
    <version>3.3.2</version>
</dependency> 
<dependency> 
    <groupId>com.h2database</groupId> 
    <artifactId>h2</artifactId> 
    <scope>runtime</scope>
    <version>2.2.224</version> 
</dependency>

我們已添加了 spring-boot-starter-web,這是一個基本的 Spring Boot 依賴項,以及 spring-boot-starter-batch 用於批量處理,以及 h2 用於內存數據庫。

接下來,我們啓用批量處理並配置數據源:

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    @Bean
    public DataSource dataSource() {
        return DataSourceBuilder.create()
          .driverClassName("org.h2.Driver")
          .url("jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1;")
          .username("sa")
          .password("")
          .build();
    }

    @Bean
    public DatabasePopulator databasePopulator(DataSource dataSource) {
        ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(new ClassPathResource("org/springframework/batch/core/schema-h2.sql"));
        populator.setContinueOnError(false);
        populator.execute(dataSource);
        return populator;
    }
}

現在,我們以兩個不同的示例工作來演示。每個工作將執行一項簡單的任務:

@Configuration
public class JobsConfig {
    private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);

    @Bean
    public Job jobOne(JobRepository jobRepository, Step stepOne) {
        return new JobBuilder("jobOne", jobRepository).start(stepOne)
          .build();
    }

    @Bean
    public Step stepOne(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("stepOne", jobRepository).tasklet((contribution, chunkContext) -> {
              log.info("Hello");
              return RepeatStatus.FINISHED;
          }, transactionManager)
          .build();
    }

    @Bean
    public Job jobTwo(JobRepository jobRepository, Step stepTwo) {
        return new JobBuilder("jobTwo", jobRepository).start(stepTwo)
          .build();
    }

    @Bean
    public Step stepTwo(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("stepTwo", jobRepository).tasklet((contribution, chunkContext) -> {
              log.info("World");
              return RepeatStatus.FINISHED;
          }, transactionManager)
          .build();
    }
}

@EnableBatchProcessing註解設置了 Spring Batch 的核心組件,例如JobLauncherJobRepositoryJobExplorer

我們定義了兩個獨立的作業,jobOnejobTwo,作為 Spring Bean。每個作業將擁有自己的獨特配置和步驟,我們將這些步驟定義在這些方法中。這些步驟是具有事務支持的簡單任務,用於記錄消息以確認每個步驟的執行。

讓我們確認作業的定義:

@Autowired
private Job jobOne;

@Autowired
private Job jobTwo;

@Test
void givenJobsDefinitions_whenJobsLoaded_thenJobNamesShouldMatch() {
    assertNotNull(jobOne, "jobOne should be defined");
    assertEquals("jobOne", jobOne.getName());

    assertNotNull(jobTwo, "jobTwo should be defined");
    assertEquals("jobTwo", jobTwo.getName());
}

4. 順序作業執行

如果我們的作業需要按順序執行,尤其是在它們相互依賴彼此的輸出時,則順序執行是最佳選擇。下面我們通過一個示例來了解其工作原理。

@Component
public class SequentialJobsConfig {
    @Autowired
    private Job jobOne;

    @Autowired
    private Job jobTwo;

    @Autowired
    private JobLauncher jobLauncher;

    public void runJobsSequentially() {
        JobParameters jobParameters = new JobParametersBuilder().addString("ID", "Sequential 1")
          .toJobParameters();

        JobParameters jobParameters2 = new JobParametersBuilder().addString("ID", "Sequential 2")
          .toJobParameters();

        // Run jobs one after another
        try {
            jobLauncher.run(jobOne, jobParameters);
            jobLauncher.run(jobTwo, jobParameters2);
        } catch (Exception e) {
            // handle exception
            e.printStackTrace();
        }
    }
}

我們定義了一個名為 SequentialJobsConfig 的組件,並將我們之前創建的兩個任務添加到該類中。隨後,我們使用 JobLauncher 運行這些任務。我們構建了 jobParameters,以確保每個任務實例都是唯一的,通過使用 addString() 方法添加 ID。這種方法允許我們控制執行流程並檢查每個任務的結果,然後再繼續到下一個任務。

讓我們檢查這些任務是否成功運行:

@Autowired
private SequentialJobsConfig sequentialJobsConfig;

@Test
void givenSequentialJobs_whenExecuted_thenRunJobsInOrder() {
    assertDoesNotThrow(() -> sequentialJobsConfig.runJobsSequentially(), "Sequential job execution should execute");
}

5. 並行作業執行

在某些情況下,我們有相互獨立作業,並行執行這些作業可以縮短執行時間。我們可以利用 Spring 的 TaskExecutor 接口來實現這一點:

@Component
public class ParallelJobService {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job jobOne;

    @Autowired
    private Job jobTwo;

    public void runJobsInParallel() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();

        taskExecutor.execute(() -> {
            try {
                jobLauncher.run(jobOne, new JobParametersBuilder().addString("ID", "Parallel 1")
                  .toJobParameters());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        taskExecutor.execute(() -> {
            try {
                jobLauncher.run(jobTwo, new JobParametersBuilder().addString("ID", "Parallel 2")
                  .toJobParameters());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        taskExecutor.close();
    }
}

在這種配置中,我們使用 Spring 的 SimpleAsyncTaskExecutor 來使用 JobLauncher 啓動任務。

但是,在採用並行方式時,我們需要考慮線程安全、資源爭用和事務管理等因素,以確保穩定高效的執行。

6. 使用作業調度

有時,我們不只是想運行多個作業,而是希望在特定時間或間隔內運行這些作業。這就是作業調度發揮作用的地方。這可以通過使用 Spring 的調度支持或外部調度器輕鬆實現。

6.1. 使用 Spring 的 @Scheduling

@Scheduled 註解允許一個方法(作業)在指定的時間間隔內重複執行。 這種方法需要啓用調度功能,通過@EnableScheduling 註解。

讓我們創建一個 ScheduledJobs 類,並添加所需的註解來配置我們的作業:

@Configuration
@EnableScheduling
public class ScheduledJobs {
    private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);

    @Autowired
    private Job jobOne;

    @Autowired
    private Job jobTwo;

    @Autowired
    private JobLauncher jobLauncher;

    @Scheduled(cron = "0 */1 * * * *")  // Run every minute
    public void runJob1() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
          .addString("jobID", String.valueOf(System.currentTimeMillis()))
          .toJobParameters();

        log.info("Executing sheduled job 1");
        jobLauncher.run(jobOne, jobParameters);
    }

    @Scheduled(fixedRate = 1000 * 60 * 3)  // Run every 3 minutes
    public void runJob2() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
          .addString("jobID", String.valueOf(System.currentTimeMillis()))
          .toJobParameters();

        log.info("Executing sheduled job 2");
        jobLauncher.run(jobTwo, jobParameters);
    }

}

在本示例中,我們使用了之前章節中創建的 jobs 類。我們配置了 jobOne 每分鐘運行,而 jobTwo 則每 3 分鐘運行一次。 @Scheduled 註解允許使用固定速率或 cron 表達式定義簡單的到複雜的調度模式。

6.2. 使用 Quartz 調度器

Quartz 調度器是用於在 Java 應用程序中安排任務的強大庫。 類似於 <em @Scheduling</em>>,Quartz 允許在特定時間間隔內運行多個作業。 為了使用 Quartz,我們需要添加 <a href="https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-quartz/3.3.2"><em>spring-boot-starter-quartz</em></a> 依賴項:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
    <version>3.3.2</version>
</dependency>

接下來,我們創建兩個任務,分別是 QuartzJobOneQuartzJobTwo

@Component
public class QuartzJobOne implements Job {
    private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            log.info("Job One is executing from quartz");
        } catch (Exception e) {
            log.error("Error executing Job One: {}", e.getMessage(), e);
            throw new JobExecutionException(e);
        }
    }
}
@Component
public class QuartzJobTwo implements Job {
    private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            log.info("Job Two is executing from quartz");
        } catch (Exception e) {
            log.error("Error executing Job Two: {}", e.getMessage(), e);
            throw new JobExecutionException(e);
        }
    }
}

現在,讓我們定義兩個 Bean,JobDetail 和一個 Trigger 用於每個 Job:

@Configuration
public class QuartzConfig {
    @Autowired
    private Job quartzJobOne;

    @Autowired
    private Job quartzJobTwo;

    @Bean
    public JobDetail job1Detail() {
        return JobBuilder.newJob().ofType(quartzJobOne.getClass())
          .withIdentity("quartzJobOne", "group1")
          .storeDurably()
          .build();
    }

    @Bean
    public JobDetail job2Detail() {
        return JobBuilder.newJob().ofType(quartzJobTwo.getClass())
          .withIdentity("quartzJobTwo", "group1")
          .storeDurably()
          .build();
    }

    @Bean
    public Trigger job1Trigger(JobDetail job1Detail) {
        return TriggerBuilder.newTrigger()
          .forJob(job1Detail)
          .withIdentity("quartzJobOneTrigger", "group1")
          .withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?"))
          .build();
    }

    @Bean
    public Trigger job2Trigger(JobDetail job2Detail) {
        return TriggerBuilder.newTrigger()
          .forJob(job2Detail)
          .withIdentity("quartzJobTwoTrigger", "group1")
          .withSchedule(CronScheduleBuilder.cronSchedule("0/15 * * * * ?"))
          .build();
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() {
        SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
        schedulerFactory.setJobDetails(job1Detail(), job2Detail());
        schedulerFactory.setTriggers(job1Trigger(job1Detail()), job2Trigger(job2Detail()));
        return schedulerFactory;
    }
}

我們使用 JobDetail 對象,並結合 JobBuilder,為我們的 Quartz 任務指定了任務類及其身份信息。 其次,我們創建了一個觸發器,並使用 cron 表達式定義了任務的執行時間,使其分別每 10 秒和 15 秒執行一次。

我們通過在 schedulerFactoryBean bean 中自動啓動我們的任務。 運行 Quartz 任務有多種方法,包括使用參數運行任務、使用日曆進行調度以及暫停和恢復任務。

Quartz 具有高度的靈活性,並支持複雜的調度場景。 但是,它需要額外的配置,並且比使用 @Scheduling 更加複雜。

7. 動態作業執行

我們已經探討了使用 Spring Batch 運行多個作業的一些方法,這些方法要求我們預先靜態配置和定義作業。然而,在某些情況下,我們可能需要根據運行時條件按需創建作業。我們可以使用傳統的基於塊(chunk)或基於任務(tasklet)的方法來實現這一點,當使用 Spring Batch 時。對於本示例,我們將使用基於塊的方法。

在基於塊的方法中,每個作業的數據從 ItemReader 中讀取,然後由 ItemProcessor 處理。讀取和處理後的塊隨後傳遞給 ItemWriter

讓我們創建一個類 DynamicJobService 並定義負責運行我們作業的方法:

@Service
public class DynamicJobService {
    private final JobRepository jobRepository;
    private final JobLauncher jobLauncher;
    private final PlatformTransactionManager transactionManager;

    public DynamicJobService(JobRepository jobRepository, JobLauncher jobLauncher, PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.jobLauncher = jobLauncher;
        this.transactionManager = transactionManager;
    }

    public void createAndRunJob(Map<String, List<String>> jobsData) throws Exception {
        List<Job> jobs = new ArrayList<>();

        // Create chunk-oriented jobs
        for (Map.Entry<String, List<String>> entry : jobsData.entrySet()) {
            if (entry.getValue() instanceof List) {
                jobs.add(createJob(entry.getKey(), entry.getValue()));
            }
        }

        // Run all jobs
        for (Job job : jobs) {
            JobParameters jobParameters = new JobParametersBuilder().addString("jobID", String.valueOf(System.currentTimeMillis()))
              .toJobParameters();
            jobLauncher.run(job, jobParameters);
        }
    }

    private Job createJob(String jobName, List<String> data) {
        return new JobBuilder(jobName, jobRepository).start(createStep(data))
          .build();
    }

    private Step createStep(List<String> data) {
        return new StepBuilder("step", jobRepository).<String, String> chunk(10, transactionManager)
          .reader(new ListItemReader<>(data))
          .processor(item -> item.toUpperCase())
          .writer(items -> items.forEach(System.out::println))
          .build();
    }
}

在上面的示例中,我們創建了一個名為 createAndRunJob 的方法,它根據 jobsData 生成並啓動任務。以下是執行過程:

reader() 方法逐個讀取輸入列表中的項目。每個項目都傳遞給 processor(),該方法將項目的第一個字母轉換為大寫。處理後的項目然後收集到一個塊中,塊大小定義為 10。當塊已滿或沒有更多數據時,所有塊中的項目都傳遞給 writer()。隨後,writer 將塊中的所有項目打印到控制枱,並且此過程會重複,直到所有項目都已處理完畢。

讓我們看看該服務在行動中:

@Autowired
private DynamicJobService dynamicJobService;

@Test
void givenJobData_whenJobsCreated_thenJobsRunSeccessfully() throws Exception {
    Map<String, List<String>> jobsData = new HashMap<>();
    jobsData.put("chunkJob1", Arrays.asList("data1", "data2", "data3"));
    jobsData.put("chunkJob2", Arrays.asList("data4", "data5", "data6"));
    assertDoesNotThrow(() -> dynamicJobService.createAndRunJob(jobsData), "Dynamic job creation and execution should run successfully");
}

我們創建並傳遞了兩個任務到 createAndRunJob 方法,每個任務都包含任務標識和數據。

在實際應用中,我們很可能需要執行更復雜的處理邏輯。如果內置實現無法滿足我們的特定要求,則最好創建自定義的 ItemReaderItemProcessorItemWriter 實現。

8. 結論

在本文中,我們探討了使用 Spring Batch 運行多個任務的一些方法。通過理解本文中使用的基本示例,我們可以設計出更高效、可擴展且易於維護的批處理系統。

無論我們應該使用哪種方法,都應該取決於哪種方法最適合我們的特定需求。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.