1. 簡介
Spring Batch 是一款強大的框架,通過提供可重用的組件和可靠的基礎設施,讓處理大量數據變得輕鬆。在實際應用中,應用程序通常需要同時執行多個任務,按照特定的執行順序運行,以優化性能並有效地管理依賴關係。
在本教程中,我們將探索在 Spring Batch 中運行多個任務的各種方法。
2. 理解 Spring Batch 作業
在 Spring Batch 的上下文中,作業是一個包含一系列步驟的容器,代表整個流程。每個作業都有一個唯一的標識符,可以由多個步驟組成,這些步驟按順序執行或基於某些條件執行。我們可以使用 XML 或 Java 以及 <em >JobLauncher</em> 來配置作業。
運行多個作業在以下場景中具有優勢:
- 並行處理
- 數據遷移和 ETL 流程
- 報告生成等
有效地管理多個作業對於實現最佳性能、可維護性和可擴展性至關重要。讓我們一起探索如何在 Spring Batch 中實現這些目標。
3. 配置
首先,讓我們配置我們的依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
<version>2.2.224</version>
</dependency>我們已添加了 spring-boot-starter-web,這是一個基本的 Spring Boot 依賴項,以及 spring-boot-starter-batch 用於批量處理,以及 h2 用於內存數據庫。
接下來,我們啓用批量處理並配置數據源:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public DataSource dataSource() {
return DataSourceBuilder.create()
.driverClassName("org.h2.Driver")
.url("jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1;")
.username("sa")
.password("")
.build();
}
@Bean
public DatabasePopulator databasePopulator(DataSource dataSource) {
ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(new ClassPathResource("org/springframework/batch/core/schema-h2.sql"));
populator.setContinueOnError(false);
populator.execute(dataSource);
return populator;
}
}現在,我們以兩個不同的示例工作來演示。每個工作將執行一項簡單的任務:
@Configuration
public class JobsConfig {
private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);
@Bean
public Job jobOne(JobRepository jobRepository, Step stepOne) {
return new JobBuilder("jobOne", jobRepository).start(stepOne)
.build();
}
@Bean
public Step stepOne(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("stepOne", jobRepository).tasklet((contribution, chunkContext) -> {
log.info("Hello");
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
@Bean
public Job jobTwo(JobRepository jobRepository, Step stepTwo) {
return new JobBuilder("jobTwo", jobRepository).start(stepTwo)
.build();
}
@Bean
public Step stepTwo(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("stepTwo", jobRepository).tasklet((contribution, chunkContext) -> {
log.info("World");
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
}@EnableBatchProcessing註解設置了 Spring Batch 的核心組件,例如JobLauncher、JobRepository和JobExplorer。
我們定義了兩個獨立的作業,jobOne和jobTwo,作為 Spring Bean。每個作業將擁有自己的獨特配置和步驟,我們將這些步驟定義在這些方法中。這些步驟是具有事務支持的簡單任務,用於記錄消息以確認每個步驟的執行。
讓我們確認作業的定義:
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
@Test
void givenJobsDefinitions_whenJobsLoaded_thenJobNamesShouldMatch() {
assertNotNull(jobOne, "jobOne should be defined");
assertEquals("jobOne", jobOne.getName());
assertNotNull(jobTwo, "jobTwo should be defined");
assertEquals("jobTwo", jobTwo.getName());
}4. 順序作業執行
如果我們的作業需要按順序執行,尤其是在它們相互依賴彼此的輸出時,則順序執行是最佳選擇。下面我們通過一個示例來了解其工作原理。
@Component
public class SequentialJobsConfig {
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
@Autowired
private JobLauncher jobLauncher;
public void runJobsSequentially() {
JobParameters jobParameters = new JobParametersBuilder().addString("ID", "Sequential 1")
.toJobParameters();
JobParameters jobParameters2 = new JobParametersBuilder().addString("ID", "Sequential 2")
.toJobParameters();
// Run jobs one after another
try {
jobLauncher.run(jobOne, jobParameters);
jobLauncher.run(jobTwo, jobParameters2);
} catch (Exception e) {
// handle exception
e.printStackTrace();
}
}
}我們定義了一個名為 SequentialJobsConfig 的組件,並將我們之前創建的兩個任務添加到該類中。隨後,我們使用 JobLauncher 運行這些任務。我們構建了 jobParameters,以確保每個任務實例都是唯一的,通過使用 addString() 方法添加 ID。這種方法允許我們控制執行流程並檢查每個任務的結果,然後再繼續到下一個任務。
讓我們檢查這些任務是否成功運行:
@Autowired
private SequentialJobsConfig sequentialJobsConfig;
@Test
void givenSequentialJobs_whenExecuted_thenRunJobsInOrder() {
assertDoesNotThrow(() -> sequentialJobsConfig.runJobsSequentially(), "Sequential job execution should execute");
}5. 並行作業執行
在某些情況下,我們有相互獨立作業,並行執行這些作業可以縮短執行時間。我們可以利用 Spring 的 TaskExecutor 接口來實現這一點:
@Component
public class ParallelJobService {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
public void runJobsInParallel() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.execute(() -> {
try {
jobLauncher.run(jobOne, new JobParametersBuilder().addString("ID", "Parallel 1")
.toJobParameters());
} catch (Exception e) {
e.printStackTrace();
}
});
taskExecutor.execute(() -> {
try {
jobLauncher.run(jobTwo, new JobParametersBuilder().addString("ID", "Parallel 2")
.toJobParameters());
} catch (Exception e) {
e.printStackTrace();
}
});
taskExecutor.close();
}
}在這種配置中,我們使用 Spring 的 SimpleAsyncTaskExecutor 來使用 JobLauncher 啓動任務。
但是,在採用並行方式時,我們需要考慮線程安全、資源爭用和事務管理等因素,以確保穩定高效的執行。
6. 使用作業調度
有時,我們不只是想運行多個作業,而是希望在特定時間或間隔內運行這些作業。這就是作業調度發揮作用的地方。這可以通過使用 Spring 的調度支持或外部調度器輕鬆實現。
6.1. 使用 Spring 的 @Scheduling
@Scheduled 註解允許一個方法(作業)在指定的時間間隔內重複執行。 這種方法需要啓用調度功能,通過@EnableScheduling 註解。
讓我們創建一個 ScheduledJobs 類,並添加所需的註解來配置我們的作業:
@Configuration
@EnableScheduling
public class ScheduledJobs {
private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
@Autowired
private JobLauncher jobLauncher;
@Scheduled(cron = "0 */1 * * * *") // Run every minute
public void runJob1() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
log.info("Executing sheduled job 1");
jobLauncher.run(jobOne, jobParameters);
}
@Scheduled(fixedRate = 1000 * 60 * 3) // Run every 3 minutes
public void runJob2() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
log.info("Executing sheduled job 2");
jobLauncher.run(jobTwo, jobParameters);
}
}在本示例中,我們使用了之前章節中創建的 jobs 類。我們配置了 jobOne 每分鐘運行,而 jobTwo 則每 3 分鐘運行一次。 @Scheduled 註解允許使用固定速率或 cron 表達式定義簡單的到複雜的調度模式。
6.2. 使用 Quartz 調度器
Quartz 調度器是用於在 Java 應用程序中安排任務的強大庫。 類似於 <em @Scheduling</em>>,Quartz 允許在特定時間間隔內運行多個作業。 為了使用 Quartz,我們需要添加 <a href="https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-quartz/3.3.2"><em>spring-boot-starter-quartz</em></a> 依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>3.3.2</version>
</dependency>
接下來,我們創建兩個任務,分別是 QuartzJobOne 和 QuartzJobTwo:
@Component
public class QuartzJobOne implements Job {
private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
log.info("Job One is executing from quartz");
} catch (Exception e) {
log.error("Error executing Job One: {}", e.getMessage(), e);
throw new JobExecutionException(e);
}
}
}
@Component
public class QuartzJobTwo implements Job {
private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
log.info("Job Two is executing from quartz");
} catch (Exception e) {
log.error("Error executing Job Two: {}", e.getMessage(), e);
throw new JobExecutionException(e);
}
}
}現在,讓我們定義兩個 Bean,JobDetail 和一個 Trigger 用於每個 Job:
@Configuration
public class QuartzConfig {
@Autowired
private Job quartzJobOne;
@Autowired
private Job quartzJobTwo;
@Bean
public JobDetail job1Detail() {
return JobBuilder.newJob().ofType(quartzJobOne.getClass())
.withIdentity("quartzJobOne", "group1")
.storeDurably()
.build();
}
@Bean
public JobDetail job2Detail() {
return JobBuilder.newJob().ofType(quartzJobTwo.getClass())
.withIdentity("quartzJobTwo", "group1")
.storeDurably()
.build();
}
@Bean
public Trigger job1Trigger(JobDetail job1Detail) {
return TriggerBuilder.newTrigger()
.forJob(job1Detail)
.withIdentity("quartzJobOneTrigger", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?"))
.build();
}
@Bean
public Trigger job2Trigger(JobDetail job2Detail) {
return TriggerBuilder.newTrigger()
.forJob(job2Detail)
.withIdentity("quartzJobTwoTrigger", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0/15 * * * * ?"))
.build();
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() {
SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
schedulerFactory.setJobDetails(job1Detail(), job2Detail());
schedulerFactory.setTriggers(job1Trigger(job1Detail()), job2Trigger(job2Detail()));
return schedulerFactory;
}
}我們使用 JobDetail 對象,並結合 JobBuilder,為我們的 Quartz 任務指定了任務類及其身份信息。 其次,我們創建了一個觸發器,並使用 cron 表達式定義了任務的執行時間,使其分別每 10 秒和 15 秒執行一次。
我們通過在 schedulerFactoryBean bean 中自動啓動我們的任務。 運行 Quartz 任務有多種方法,包括使用參數運行任務、使用日曆進行調度以及暫停和恢復任務。
Quartz 具有高度的靈活性,並支持複雜的調度場景。 但是,它需要額外的配置,並且比使用 @Scheduling 更加複雜。
7. 動態作業執行
我們已經探討了使用 Spring Batch 運行多個作業的一些方法,這些方法要求我們預先靜態配置和定義作業。然而,在某些情況下,我們可能需要根據運行時條件按需創建作業。我們可以使用傳統的基於塊(chunk)或基於任務(tasklet)的方法來實現這一點,當使用 Spring Batch 時。對於本示例,我們將使用基於塊的方法。
在基於塊的方法中,每個作業的數據從 ItemReader 中讀取,然後由 ItemProcessor 處理。讀取和處理後的塊隨後傳遞給 ItemWriter。
讓我們創建一個類 DynamicJobService 並定義負責運行我們作業的方法:
@Service
public class DynamicJobService {
private final JobRepository jobRepository;
private final JobLauncher jobLauncher;
private final PlatformTransactionManager transactionManager;
public DynamicJobService(JobRepository jobRepository, JobLauncher jobLauncher, PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.jobLauncher = jobLauncher;
this.transactionManager = transactionManager;
}
public void createAndRunJob(Map<String, List<String>> jobsData) throws Exception {
List<Job> jobs = new ArrayList<>();
// Create chunk-oriented jobs
for (Map.Entry<String, List<String>> entry : jobsData.entrySet()) {
if (entry.getValue() instanceof List) {
jobs.add(createJob(entry.getKey(), entry.getValue()));
}
}
// Run all jobs
for (Job job : jobs) {
JobParameters jobParameters = new JobParametersBuilder().addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}
private Job createJob(String jobName, List<String> data) {
return new JobBuilder(jobName, jobRepository).start(createStep(data))
.build();
}
private Step createStep(List<String> data) {
return new StepBuilder("step", jobRepository).<String, String> chunk(10, transactionManager)
.reader(new ListItemReader<>(data))
.processor(item -> item.toUpperCase())
.writer(items -> items.forEach(System.out::println))
.build();
}
}在上面的示例中,我們創建了一個名為 createAndRunJob 的方法,它根據 jobsData 生成並啓動任務。以下是執行過程:
reader() 方法逐個讀取輸入列表中的項目。每個項目都傳遞給 processor(),該方法將項目的第一個字母轉換為大寫。處理後的項目然後收集到一個塊中,塊大小定義為 10。當塊已滿或沒有更多數據時,所有塊中的項目都傳遞給 writer()。隨後,writer 將塊中的所有項目打印到控制枱,並且此過程會重複,直到所有項目都已處理完畢。
讓我們看看該服務在行動中:
@Autowired
private DynamicJobService dynamicJobService;
@Test
void givenJobData_whenJobsCreated_thenJobsRunSeccessfully() throws Exception {
Map<String, List<String>> jobsData = new HashMap<>();
jobsData.put("chunkJob1", Arrays.asList("data1", "data2", "data3"));
jobsData.put("chunkJob2", Arrays.asList("data4", "data5", "data6"));
assertDoesNotThrow(() -> dynamicJobService.createAndRunJob(jobsData), "Dynamic job creation and execution should run successfully");
}我們創建並傳遞了兩個任務到 createAndRunJob 方法,每個任務都包含任務標識和數據。
在實際應用中,我們很可能需要執行更復雜的處理邏輯。如果內置實現無法滿足我們的特定要求,則最好創建自定義的 ItemReader、ItemProcessor 和 ItemWriter 實現。
8. 結論
在本文中,我們探討了使用 Spring Batch 運行多個任務的一些方法。通過理解本文中使用的基本示例,我們可以設計出更高效、可擴展且易於維護的批處理系統。
無論我們應該使用哪種方法,都應該取決於哪種方法最適合我們的特定需求。