1. 概述
Spring Batch 是一種強大的框架,用於開發健壯的批處理應用程序。 在我們之前的教程中,我們介紹了 Spring Batch。
在本教程中,我們將在此基礎上,學習如何設置並創建一個基本的基於 Spring Boot 的批處理應用程序。
2. Maven 依賴
首先,我們將 <a href="https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-batch">spring-boot-starter-batch添加到我們的pom.xml` 中:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.0.0</version>
</dependency>我們還會添加 h2 依賴,它可從 Maven Central 獲取:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.214</version>
<scope>runtime</scope>
</dependency>3. 定義一個簡單的 Spring Batch 任務
我們將構建一個從 CSV 文件導入咖啡列表、使用自定義處理器進行轉換,並將最終結果存儲在內存數據庫中的任務.
3.1. 快速入門
讓我們從定義應用程序的入口點開始:
@SpringBootApplication
public class SpringBootBatchProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootBatchProcessingApplication.class, args);
}
}如我們所見,這是一個標準的 Spring Boot 應用。為了儘可能使用默認配置值,我們將使用一套非常輕量級的應用程序配置屬性。
我們將定義這些屬性在我們的 文件中:
file.input=coffee-list.csv該屬性包含我們咖啡列表的位置信息。每一行包含咖啡的品牌、產地和一些特點。
Blue Mountain,Jamaica,Fruity
Lavazza,Colombia,Strong
Folgers,America,Smokey如你所見,這是一個扁平的 CSV 文件,這意味着 Spring 可以無需任何特殊定製即可處理它。
接下來,我們將添加一個 SQL 腳本 schema-all.sql,用於創建我們的 coffee 表以存儲數據:
DROP TABLE coffee IF EXISTS;
CREATE TABLE coffee (
coffee_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
brand VARCHAR(20),
origin VARCHAR(20),
characteristics VARCHAR(30)
);Spring Boot 將在啓動時自動運行此腳本。
3.2. 咖啡 領域類
隨後,我們需要一個簡單的領域類來存儲我們的咖啡項目:
public class Coffee {
private String brand;
private String origin;
private String characteristics;
public Coffee(String brand, String origin, String characteristics) {
this.brand = brand;
this.origin = origin;
this.characteristics = characteristics;
}
// getters and setters
}如前所述,我們的 Coffee 對象包含三個屬性:品牌、產地以及其他 特性。
4. 任務配置
現在我們將深入瞭解關鍵組件——任務配置。我們將逐步構建配置,並沿途解釋每個部分:
@Configuration
public class BatchConfiguration {
@Value("${file.input}")
private String fileInput;
// ...
}首先,我們將從標準的 Spring @Configuration 類開始。請注意,Spring Boot 3.0 中,@EnableBatchProcessing 已被不推薦使用。JobBuilderFactory 和 StepBuilderFactory 已被棄用,建議使用 JobBuilder 和 StepBuilder 類,並使用 job 或 step builder 的名稱。
在初始配置的最後一部分,我們將包含對先前聲明的 file.input 屬性的引用。
4.1. 為我們的任務定義讀取器和寫入器
現在我們可以繼續在我們的配置中定義一個讀取器 Bean:
@Bean
public FlatFileItemReader reader() {
return new FlatFileItemReaderBuilder().name("coffeeItemReader")
.resource(new ClassPathResource(fileInput))
.delimited()
.names(new String[] { "brand", "origin", "characteristics" })
.fieldSetMapper(new BeanWrapperFieldSetMapper() {{
setTargetType(Coffee.class);
}})
.build();
}簡而言之,上述定義的 查找名為 的文件,並將每一行數據解析為 對象。
同樣,我們將定義一個 writer bean:
@Bean
public JdbcBatchItemWriter writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)")
.dataSource(dataSource)
.build();
}本次,我們將包含用於將單個咖啡項目插入到數據庫中的 SQL 語句,該語句由我們的 Coffee 對象的 Java Bean 屬性驅動。
4.2. 將工作流程整合到一起
最後,我們需要添加實際的工作步驟和配置:
@Bean
public Job importUserJob(JobRepository jobRepository, JobCompletionNotificationListener listener, Step step1) {
return new JobBuilder("importUserJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, JdbcBatchItemWriter writer) {
return new StepBuilder("step1", jobRepository)
.<Coffee, Coffee> chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
@Bean
public CoffeeItemProcessor processor() {
return new CoffeeItemProcessor();
}正如我們所見,我們的任務相對簡單,只包含一個步驟,定義在 step1 方法中。
讓我們來查看這個步驟在做什麼:
- 首先,我們配置這個步驟,使其一次寫入最多十條記錄,使用 chunk(10) 聲明。
- 然後我們使用我們的 reader bean 讀取咖啡數據,該 bean 通過 reader 方法設置。
- 接下來,我們將每個咖啡項目傳遞給一個自定義處理器,並在其中應用自定義業務邏輯。
- 最後,我們使用之前看到的 writer 將每個咖啡項目寫入數據庫。
另一方面,我們的 importUserJob 包含我們的工作定義,其中包含一個 id,使用內置的 RunIdIncrementer 類。 我們還設置了一個 JobCompletionNotificationListener,我們將使用它在作業完成時收到通知
為了完成我們的工作配置,我們將列出每個步驟(儘管這個作業只有一個步驟)。我們現在擁有一個完全配置好的作業。
5. 自定義咖啡處理器
現在,讓我們詳細瞭解一下我們在工作配置中先前定義的自定義處理器:
public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> {
private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class);
@Override
public Coffee process(final Coffee coffee) throws Exception {
String brand = coffee.getBrand().toUpperCase();
String origin = coffee.getOrigin().toUpperCase();
String chracteristics = coffee.getCharacteristics().toUpperCase();
Coffee transformedCoffee = new Coffee(brand, origin, chracteristics);
LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee);
return transformedCoffee;
}
}特別值得注意的是,ItemProcessor 接口為我們提供了在作業執行過程中應用特定業務邏輯的機制。
為了保持簡潔,我們將定義CoffeeItemProcessor,它接受一個輸入Coffee 對象並將其所有屬性轉換為大寫。
6. 任務完成
我們還將編寫一個 JobCompletionNotificationListener,以便在我們的任務完成後提供反饋:
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
LOGGER.info("!!! JOB FINISHED! Time to verify the results");
String query = "SELECT brand, origin, characteristics FROM coffee";
jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3)))
.forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee));
}
}在上述示例中,我們覆蓋了 afterJob 方法,並檢查了作業是否成功完成。 此外,我們執行了一個簡單的查詢以確認每種咖啡項目是否已成功存儲在數據庫中。
7. 運行我們的任務
現在我們已經準備好運行我們的任務,接下來就是最有趣的部分。讓我們開始運行我們的任務:
...
17:41:16.336 [main] INFO c.b.b.JobCompletionNotificationListener -
!!! JOB FINISHED! Time to verify the results
17:41:16.336 [main] INFO c.b.b.JobCompletionNotificationListener -
Found < Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] > in the database.
17:41:16.337 [main] INFO c.b.b.JobCompletionNotificationListener -
Found < Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] > in the database.
17:41:16.337 [main] INFO c.b.b.JobCompletionNotificationListener -
Found < Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] > in the database.
...
正如我們所見,我們的任務已成功運行,並且每個咖啡項目都已按照預期存儲在數據庫中。
8. 虛擬線程集成
隨着 Spring Batch 5.1 的發佈以及 JDK 21 中 Project Loom 虛擬線程的引入,併發處理方式得到了顯著增強。虛擬線程提供了一種輕量級、高性能的替代方案,用於傳統線程,能夠實現並行任務的可擴展性和高效執行。
我們可以利用虛擬線程來處理各種並行處理場景,例如運行併發步驟或並行化單個步驟的執行。這得益於 Spring Framework 6.1 中的 <em>VirtualThreadTaskExecutor</em> 實現,它使用虛擬線程來實現 <em>TaskExecutor</em>。
首先,在 <em>pom.xml</em> 文件中添加 <em>spring-context</em> 和 <em><a href="https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core/">spring-batch-core</a></em>:
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>6.1.0</version>
</dependency>一旦我們完成了依賴項的配置,就必須在 Spring Boot 上下文中創建一個 VirtualThreadExecutor Bean。這個執行器創建和管理虛擬線程:
@Bean
public VirtualThreadTaskExecutor taskExecutor() {
return new VirtualThreadTaskExecutor("virtual-thread-executor");
}為了啓用虛擬線程的並行處理,只需在批處理步驟中配置 VirtualThreadExecutor 即可:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, JdbcBatchItemWriter<Coffee> writer, VirtualThreadTaskExecutor taskExecutor) {
return new StepBuilder("step1", jobRepository)
.<Coffee, Coffee> chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.taskExecutor(taskExecutor)
.build();
}讓我們使用虛擬線程配置執行任務:
20:41:32.134 [main] INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [step1]
20:41:32.242 [virtual-thread-executor2] INFO c.baeldung.batch.CoffeeItemProcessor - Converting ( Coffee [brand=Blue Mountain, origin=Jamaica, characteristics=Fruity] ) into ( Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] )
20:41:32.242 [virtual-thread-executor1] INFO c.baeldung.batch.CoffeeItemProcessor - Converting ( Coffee [brand=Folgers, origin=America, characteristics=Smokey] ) into ( Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] )
20:41:32.242 [virtual-thread-executor0] INFO c.baeldung.batch.CoffeeItemProcessor - Converting ( Coffee [brand=Lavazza, origin=Colombia, characteristics=Strong] ) into ( Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] )
20:41:32.263 [main] INFO o.s.batch.core.step.AbstractStep - Step: [step1] executed in 128ms
如日誌所示,它正在使用虛擬線程來處理處理器邏輯。
9. 結論
在本文中,我們學習瞭如何使用 Spring Boot 創建一個簡單的 Spring Batch 任務。
我們首先定義了一些基本配置。然後,我們解釋瞭如何添加文件讀取器和數據庫寫入器。最後,我們演示瞭如何應用自定義處理邏輯,並驗證我們的任務是否成功執行。