1. 概述
在本教程中,我們將探討 Spring Batch 的一個實用、代碼導向的入門介紹。Spring Batch 是一種為任務的穩健執行而設計的處理框架。
其當前版本 5.2.0 支持 Spring 6.2.0 和 Java 17+。
這裏列舉了一些 該框架的有趣且實用的用例。
2. 工作流程基礎
Spring Batch 遵循傳統的批處理架構,其中一個作業倉庫負責調度和與作業進行交互。
一個作業可以包含多個步驟。並且每個步驟通常遵循讀取數據、處理數據和寫入數據的序列。
當然,框架會在這裏完成大部分繁重的工作,尤其是在處理作業的低級持久化工作方面,使用 h2 作為作業倉庫。
2.1. 示例使用場景
我們將要解決的簡單使用場景是,將財務交易數據從 CSV 格式遷移到 XML 格式。
輸入文件具有非常簡單的結構。
它包含每行一條交易記錄,由用户名、用户 ID、交易日期和金額組成:
username, userid, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 234113. Maven POM
本項目所需依賴項包括:Spring Batch Core,Spring Object/XML Marshalling (OXM),以及 H2 數據庫:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>6.2.0</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.3.232</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>5.2.0</version>
</dependency>4. 自動創建 Spring Batch 模式
當使用 Spring Batch 時,我們可以使用預打包的 SQL 初始化腳本來在啓動時自動創建模式。 此外,當我們使用嵌入式 H2 數據庫時,Spring Boot 會自動運行相應的 SQL 初始化腳本以初始化數據庫。 但是,當我們使用其他受支持的數據庫時,需要配置 Spring Boot 屬性,以便它自動檢測數據庫並在啓動時運行相應的 SQL 初始化腳本。
我們可以配置 Spring Boot 的 <em >application.properties</em > 文件以啓用自動數據庫初始化:
spring.batch.jdbc.initialize-schema=always當然,以下是翻譯後的內容:
或者,我們還可以配置 Spring Boot 的 application.yml 文件以啓用自動數據庫初始化:
spring:
batch:
jdbc:
initialize-schema: "always"此外,我們不應為 BatchConfig 類添加 @EnableBatchProcessing 註解。 這樣做是為了讓 Spring Boot 負責配置 Spring Batch,包括為自動配置的數據源創建 Batch 模式。
相反,我們可以通過在 Spring Boot 的 application.properties 文件中將同一屬性設置為 never 來關閉嵌入式 H2 數據庫中 Spring Batch 模式的自動創建:
spring.batch.jdbc.initialize-schema=never再次,我們也可以在 Spring Boot 的 application.yml文件中禁用自動數據庫初始化:
spring:
batch:
jdbc:
initialize-schema: "never"5. Spring Batch 和 Job 配置
以下展示了基本的 Spring Batch 配置,以及針對 CSV 到 XML 功能的 Job 描述。
基於 Java 的 Job 配置:
@Profile("spring")
public class SpringBatchConfig {
@Value("input/record.csv")
private Resource inputCsv;
@Value("file:xml/output.xml")
private Resource outputXml;
@Bean
public ItemReader<Transaction> itemReader()
throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = { "username", "userid", "transactiondate", "amount" };
tokenizer.setNames(tokens);
reader.setResource(inputCsv);
DefaultLineMapper<Transaction> lineMapper =
new DefaultLineMapper<Transaction>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
public ItemProcessor<Transaction, Transaction> itemProcessor() {
return new CustomItemProcessor();
}
@Bean
public ItemWriter<Transaction> itemWriter(Marshaller marshaller)
throws MalformedURLException {
StaxEventItemWriter<Transaction> itemWriter =
new StaxEventItemWriter<Transaction>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(outputXml);
return itemWriter;
}
@Bean
public Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(new Class[] { Transaction.class });
return marshaller;
}
@Bean
protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager,
ItemReader<Transaction> reader, ItemProcessor<Transaction, Transaction> processor,
ItemWriter<Transaction> writer) {
return new StepBuilder("step1", jobRepository)
.<Transaction, Transaction> chunk(10, transactionManager)
.reader(reader).processor(processor).writer(writer).build();
}
@Bean(name = "firstBatchJob")
public Job job(JobRepository jobRepository, @Qualifier("step1") Step step1) {
return new JobBuilder("firstBatchJob", jobRepository).preventRestart().start(step1).build();
}
public DataSource dataSource() {
EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
return builder.setType(EmbeddedDatabaseType.H2)
.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
.addScript("classpath:org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean(name = "transactionManager")
public PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}
@Bean(name = "jobRepository")
public JobRepository getJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
factory.setTransactionManager(getTransactionManager());
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean(name = "jobLauncher")
public JobLauncher getJobLauncher() throws Exception {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}以及基於XML的配置:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="input/record.csv" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="username,userid,transactiondate,amount" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.baeldung.batch.service.RecordFieldSetMapper" />
</property>
</bean>
</property>
<property name="linesToSkip" value="1" />
</bean>
<bean id="itemProcessor" class="com.baeldung.batch.service.CustomItemProcessor" />
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml" />
<property name="marshaller" ref="marshaller" />
<property name="rootTagName" value="transactionRecord" />
</bean>
<bean id="marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound">
<list>
<value>com.baeldung.batch.model.Transaction</value>
</list>
</property>
</bean>
<batch:job id="firstBatchJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
processor="itemProcessor" commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<!-- connect to H2 database -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="org.h2.Driver" />
<property name="url" value="jdbc:h2:file:~/repository" />
<property name="username" value="" />
<property name="password" value="" />
</bean>
<!-- stored job-meta in database -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="transactionManager" ref="transactionManager" />
<property name="databaseType" value="h2" />
</bean>
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>現在我們已經有了整個配置,讓我們將其分解並開始討論。
5.1. 讀取數據並使用 ItemReader 創建對象
首先,我們配置了 cvsFileItemReader,它將從 record.csv 讀取數據並將其轉換為 Transaction 對象:
@SuppressWarnings("restriction")
@XmlRootElement(name = "transactionRecord")
public class Transaction {
private String username;
private int userId;
private LocalDateTime transactionDate;
private double amount;
/* getters and setters for the attributes */
@Override
public String toString() {
return "Transaction [username=" + username + ", userId=" + userId
+ ", transactionDate=" + transactionDate + ", amount=" + amount
+ "]";
}
}為了實現這一點,它使用了一個自定義映射器:
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/M/yyy");
Transaction transaction = new Transaction();
transaction.setUsername(fieldSet.readString("username"));
transaction.setUserId(fieldSet.readInt(1));
transaction.setAmount(fieldSet.readDouble(3));
String dateString = fieldSet.readString(2);
transaction.setTransactionDate(LocalDate.parse(dateString, formatter).atStartOfDay());
return transaction;
}
}5.2. 使用 ItemProcessor 處理數據
我們創建了自己的 自定義ItemProcessor,CustomItemProcessor,它不處理與交易對象相關的任何內容。
它僅負責將來自讀取器(reader)的原始對象傳遞給寫入器(writer):
public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {
public Transaction process(Transaction item) {
return item;
}
}5.3. 使用 ItemWriter 將對象寫入文件系統
最後,我們將這個 事務 寫入到位於 xml/output.xml 的 XML 文件中:
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml" />
<property name="marshaller" ref="marshaller" />
<property name="rootTagName" value="transactionRecord" />
</bean>5.4. 配置批量作業
將所有內容連接起來,使用 <em >batch:job</em> 語法即可完成作業配置。
請注意 <em >commit-interval</em>。它指在提交批量作業到 <em >itemWriter</em> 之前,需要保存在內存中的事務數量。
它會在該點(或直到遇到輸入數據的末尾)保持事務在內存中的狀態。
以下是 Java Bean 和對應的 XML 配置:
@Bean
protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager,
@Qualifier("itemProcessor") ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) {
return new StepBuilder("step1", jobRepository)
.<Transaction, Transaction> chunk(10, transactionManager)
.reader(itemReader(inputCsv))
.processor(processor)
.writer(writer)
.build();
}<batch:job id="firstBatchJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter" processor="itemProcessor" commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job><div>
</div>
5.5. 運行批處理作業
現在,讓我們設置並運行所有內容:
@Profile("spring")
public class App {
public static void main(String[] args) {
// Spring Java config
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(SpringConfig.class);
context.register(SpringBatchConfig.class);
context.refresh();
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("firstBatchJob");
System.out.println("Starting the batch job");
try {
JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Job Status : " + execution.getStatus());
System.out.println("Job completed");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Job failed");
}
}
}我們使用 -Dspring.profiles.active=spring 屬性運行 Spring 應用程序。
在下一部分,我們將配置示例,使其在 Spring Boot 應用程序中運行。
6. 以特定順序運行批次作業
在 Spring Batch 中,可以通過創建父作業來指定多個子作業的執行順序。 這在以下情況下特別有用:一個作業的輸出作為另一個作業的輸入,或者由於業務邏輯需要,作業需要按順序執行。
為了演示目的,我們將創建兩個步驟,它們只是簡單地記錄消息:
@Bean
public Step firstStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("firstStep", jobRepository)
.<String, String>chunk(1, transactionManager)
.reader(new IteratorItemReader<>(Stream.of("Data from Step 1").iterator()))
.processor(item -> {
System.out.println("Processing: " + item);
return item;
})
.writer(items -> items.forEach(System.out::println))
.build();
}
@Bean
public Step secondStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("secondStep", jobRepository)
.<String, String>chunk(1, transactionManager)
.reader(new IteratorItemReader<>(Stream.of("Data from Step 2").iterator()))
.processor(item -> {
System.out.println("Processing: " + item);
return item;
})
.writer(items -> items.forEach(System.out::println))
.build();
}<div>
<p>接下來,我們創建一個父作業,以預定義的順序執行這些步驟。父作業從 <em >firstStep</em> 開始,然後依次執行 <em >secondStep</em>。</p>
<p>我們可以使用 <em >JobBuilder</em> 來定義步驟的順序:</p>
</div>
@Bean(name = "parentJob")
public Job parentJob(JobRepository jobRepository,
@Qualifier("firstStep") Step firstStep,
@Qualifier("secondStep") Step secondStep) {
return new JobBuilder("parentJob", jobRepository)
.start(firstStep)
.next(secondStep)
.build();
}現在,讓我們使用應用程序上下文中 JobLauncher 運行該任務。當這些步驟執行時,我們將會看到類似於以下內容的日誌:
Processing: Data from Step 1
Data from Step 1
Processing: Data from Step 2
Data from Step 2通過遵循這些步驟,我們可以有效地在指定順序中運行多個 Spring Batch 任務,而無需依賴複雜的項處理或讀取機制。
7. Spring Boot 配置
在本節中,我們將創建一個 Spring Boot 應用程序,並將之前的 Spring Batch Config 轉換為在 Spring Boot 環境中運行。實際上,這與之前的 Spring Batch 示例大致相當。
7.1. Maven 依賴
讓我們首先在 Spring Boot 應用程序的 <pom.xml> 中聲明 <a href="https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-batch"><em>spring-boot-starter-batch</em></a> 依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.4.0</version>
</dependency>我們需要一個數據庫來存儲 Spring Batch 任務的信息。
在本教程中,我們使用內存中的 H2 數據庫,如之前配置的。
7.2. Spring Boot 配置
我們使用 <em >@Profile</em> 註解來區分 Spring 和 Spring Boot 配置。我們在應用程序中設置 <em >spring-boot</em> 屬性,以啓用 Spring Boot 配置。
@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
SpringApplication springApp = new SpringApplication(SpringBatchApplication.class);
springApp.setAdditionalProfiles("spring-boot");
springApp.run(args);
}
}7.3. Spring Batch Job 配置
我們使用與 SpringBatchConfig 類相同的批處理作業配置:
@Configuration
public class SpringBootBatchConfig {
@Value("input/record.csv")
private Resource inputCsv;
@Value("input/recordWithInvalidData.csv")
private Resource invalidInputCsv;
@Value("file:xml/output.xml")
private Resource outputXml;
// ...
}從 Spring Boot 3.0 開始,@EnableBatchProcessing 註解已不再推薦使用。我們手動聲明 JobRepository、JobLauncher 和 TransactionManager Bean。
此外,JobBuilderFactory 和 StepBuilderFactory 已被廢棄,建議使用 JobBuilder 和 StepBuilder 類來定義 Job/Step 命名器。
8. 結論
在本文中,我們學習瞭如何使用 Spring Batch 以及如何在簡單用例中使用它。
我們看到了如何輕鬆開發我們的批量處理流水線,以及如何自定義讀取、處理和寫入等不同階段。