1. 簡介
Spring Batch 提供兩種不同的方式來實施作業:使用任務(Tasklets)和塊(Chunks)。
在本文中,我們將學習如何使用一個簡單的實際示例配置和實現這兩種方法。
2. 依賴項
讓我們從添加所需的依賴項開始:
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>5.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>5.0.0</version>
<scope>test</scope>
</dependency>要獲取最新版本的 spring-batch-core 和 spring-batch-test,請參閲 Maven Central。
3. Our Use Case
Let’s consider a CSV file with the following content:
Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992每一行的第一個位置代表個人的姓名,第二個位置代表其出生日期。
我們的用例是生成另一個包含每個人姓名和年齡的CSV文件:
Mae Hodges,45
Gary Potter,64
Betty Wise,49
Wayne Rose,40
Adam Caldwell,22
Lucille Phillips,25現在我們對領域有了清晰的認識,接下來我們將使用兩種方法構建解決方案。我們首先將從任務片段開始。
4. Tasklets Approach
4.1. 引言與設計
任務塊(Tasklets)旨在在一個步驟中執行單個任務。我們的工作將包含多個步驟,這些步驟會依次執行。 每個步驟都應僅執行一個明確定義的任務。
我們的工作將包含三個步驟:
- 讀取輸入 CSV 文件中的行。
- 為輸入 CSV 文件中的每個人計算年齡。
- 將每個人的姓名和年齡寫入一個新的輸出 CSV 文件。
現在,我們已經對整體情況有了清晰的認識,讓我們為每個步驟創建一個類。
LinesReader 將負責從輸入文件讀取數據:
public class LinesReader implements Tasklet {
// ...
}LinesProcessor 將計算文件中每位人員的年齡:
public class LinesProcessor implements Tasklet {
// ...
}最後,<em >LinesWriter </em > 將負責將姓名和年齡寫入輸出文件。
public class LinesWriter implements Tasklet {
// ...
}此時,我們所有的步驟都實現了 Tasklet 接口。這將會迫使我們實現其 execute 方法:
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
// ...
}此方法是我們將添加每個步驟的邏輯的地方。在開始編寫代碼之前,我們先配置我們的工作。
4.2. 配置
我們需要添加配置到 Spring 的應用程序上下文。 在添加了先前章節中創建類標準的 Bean 聲明後,我們準備好創建我們的作業定義:
@Configuration
public class TaskletsConfig {
@Bean
protected Step readLines(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("readLines", jobRepository)
.tasklet(linesReader(), transactionManager)
.build();
}
@Bean
protected Step processLines(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("processLines", jobRepository)
.tasklet(linesProcessor(), transactionManager)
.build();
}
@Bean
protected Step writeLines(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("writeLines", jobRepository)
.tasklet(linesWriter(), transactionManager)
.build();
}
@Bean
public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("taskletsJob", jobRepository)
.start(readLines(jobRepository, transactionManager))
.next(processLines(jobRepository, transactionManager))
.next(writeLines(jobRepository, transactionManager))
.build();
}
// ...
}這意味着我們的 “taskletsJob” 將包含三個步驟。第一個步驟 (readLines) 將執行 bean 中定義的 tasklet,並進入下一個步驟:ProcessLines 將執行 bean 中定義的 tasklet,並進入最終步驟:writeLines。
我們的工作流程已定義,我們已準備好添加一些邏輯!
4.3. 模型和工具
我們將要處理 CSV 文件中的行,因此我們將創建一個名為 Line 的類。
public class Line implements Serializable {
private String name;
private LocalDate dob;
private Long age;
// standard constructor, getters, setters and toString implementation
}請注意,Line實現了Serializable。這是因為Line將作為DTO用於在步驟之間傳輸數據。根據Spring Batch的要求,在步驟之間傳輸的數據對象必須是可序列化的。
另一方面,我們可以開始考慮讀取和寫入行。
為此,我們將使用OpenCSV:
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.8</version>
</dependency>在 Maven Central 查找最新版本的 OpenCSV。
一旦 OpenCSV 包含進來,我們還將創建一個 FileUtils 類。它將提供讀取和寫入 CSV 行的方法:
public class FileUtils {
public Line readLine() throws Exception {
if (CSVReader == null)
initReader();
String[] line = CSVReader.readNext();
if (line == null)
return null;
return new Line(
line[0],
LocalDate.parse(
line[1],
DateTimeFormatter.ofPattern("MM/dd/yyyy")));
}
public void writeLine(Line line) throws Exception {
if (CSVWriter == null)
initWriter();
String[] lineStr = new String[2];
lineStr[0] = line.getName();
lineStr[1] = line
.getAge()
.toString();
CSVWriter.writeNext(lineStr);
}
// ...
}請注意,readLine 作為 OpenCSV 的 readNext 方法的封裝,並返回一個 Line 對象。
同樣,writeLine 封裝了 OpenCSV 的 writeNext,接收一個 Line 對象。該類的完整實現可以在 GitHub 項目中找到。
此時,我們就可以開始逐步實現每個步驟。
4.4. LinesReader
Let’s go ahead and complete our LinesReader class:
public class LinesReader implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesReader.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
lines = new ArrayList<>();
fu = new FileUtils(
"taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Lines Reader initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
Line line = fu.readLine();
while (line != null) {
lines.add(line);
logger.debug("Read line: " + line.toString());
line = fu.readLine();
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
logger.debug("Lines Reader ended.");
return ExitStatus.COMPLETED;
}
}LinesReader 的 execute 方法會創建一個 FileUtils 實例,該實例基於輸入文件路徑。然後,它會添加行到一個列表,直到沒有更多行可讀取。
我們的類也實現了 StepExecutionListener,它提供了兩個額外的成員方法:beforeStep 和 afterStep。我們將使用這些方法在execute 運行之前和之後初始化和關閉相關資源。
如果我們查看afterStep 代碼,會注意到將結果列表(lines)放入 job 的上下文中,以便在下一步中使用:
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);此時,我們的第一步已經完成了其職責:將 CSV 行加載到內存中的 List 中。現在,我們繼續進行第二步,並對其進行處理。
4.5. LinesProcessor
LinesProcessor 也會實現 StepExecutionListener 接口,並且當然,它還會實現 Tasklet。 這意味着它將實現 beforeStep、execute 和 afterStep 方法。
public class LinesProcessor implements Tasklet, StepExecutionListener {
private Logger logger = LoggerFactory.getLogger(
LinesProcessor.class);
private List<Line> lines;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
logger.debug("Lines Processor initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
long age = ChronoUnit.YEARS.between(
line.getDob(),
LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Lines Processor ended.");
return ExitStatus.COMPLETED;
}
}很容易理解,它從工作上下文加載 行列表並計算每人的年齡。
無需將另一個結果列表添加到上下文中,因為修改發生在來自前一步驟的同一對象上。
我們已準備好進行最後一步。
4.6. LinesWriter
LinesWriter 的任務是遍歷 行 列表,並將姓名和年齡寫入輸出文件。
public class LinesWriter implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
fu = new FileUtils("output.csv");
logger.debug("Lines Writer initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Lines Writer ended.");
return ExitStatus.COMPLETED;
}
}我們已經完成了這項工作的實施! 讓我們創建一個測試來運行它並查看結果。
4.7. 運行作業
要運行作業,我們將創建一個測試:
@SpringBatchTest
@EnableAutoConfiguration
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenTaskletsJob_whenJobEnds_thenStatusCompleted()
throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}ContextConfiguration 註解指向包含我們工作定義的 Spring 上下文配置類。
在運行測試之前,我們需要添加幾個額外的 Bean:
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
@Bean
public JobRepository jobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
factory.setTransactionManager(transactionManager());
return factory.getObject();
}
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.sqlite.JDBC");
dataSource.setUrl("jdbc:sqlite:repository.sqlite");
return dataSource;
}
@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}一切準備就緒!請立即運行測試。
任務完成後,output.csv 包含預期內容,日誌顯示執行流程:
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized.
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended.
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized.
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.現在,Tasklets 就完成了。接下來我們就可以轉向 Chunk 方式。
5. 塊式方法
5.1. 引言與設計
正如其名稱所示,這種方法通過處理數據塊來執行操作。也就是説,它不會一次性讀取、處理和寫入所有行,而是會一次讀取、處理和寫入固定數量的記錄(塊)。
然後,它將重複這個循環,直到文件中沒有更多數據。
因此,流程會略有不同:
- 當存在行時:
- 執行 X 數量的行:
- 讀取一行
- 處理一行
- 寫入 X 數量的行。
- 執行 X 數量的行:
因此,我們還需要創建 三個用於分塊處理的方法:
public class LineReader {
// ...
}public class LineProcessor {
// ...
}public class LinesWriter {
// ...
}在開始實施之前,讓我們配置我們的工作項。
5.2. 配置
工作定義也會有所不同:
@Configuration
public class ChunksConfig {
@Bean
public ItemReader<Line> itemReader() {
return new LineReader();
}
@Bean
public ItemProcessor<Line, Line> itemProcessor() {
return new LineProcessor();
}
@Bean
public ItemWriter<Line> itemWriter() {
return new LinesWriter();
}
@Bean
protected Step processLines(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<Line> reader,
ItemProcessor<Line, Line> processor, ItemWriter<Line> writer) {
return new StepBuilder("processLines", jobRepository).<Line, Line> chunk(2, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("chunksJob", jobRepository)
.start(processLines(jobRepository, transactionManager, itemReader(), itemProcessor(), itemWriter()))
.build();
}
}在這種情況下,只有一個步驟,僅執行一個任務片段。
然而,這個任務片段 定義了一個讀取器、一個寫入器和一個處理器,它們將在數據塊上執行操作。
請注意,提交間隔指示在每個數據塊中要處理的數據量。我們的任務將以兩行數據為單位進行讀取、處理和寫入。
現在,我們準備好添加我們的數據塊邏輯了!
5.3. LineReader
LineReader 將負責讀取一條記錄並返回一個 Line 實例,其中包含其內容。
為了成為一個讀取器,我們的類必須實現 ItemReader 接口:
public class LineReader implements ItemReader<Line> {
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null)
logger.debug("Read line: " + line.toString());
return line;
}
}這段代碼非常簡單,它只讀取一行並返回它。我們還將實現 StepExecutionListener ,用於此類的最終版本:
public class LineReader implements
ItemReader<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LineReader.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Line Reader initialized.");
}
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null) logger.debug("Read line: " + line.toString());
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
logger.debug("Line Reader ended.");
return ExitStatus.COMPLETED;
}
}需要注意的是,beforeStep 和 afterStep 分別在整個步驟的開始和結束之前執行。
5.4. LineProcessor
LineProcessor 邏輯與 LineReader 幾乎相同。
然而,在此情況下,我們將實現 ItemProcessor 及其 process() 方法。
public class LineProcessor implements ItemProcessor<Line, Line> {
private Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
}Process() 方法接受一個輸入行,對其進行處理並返回一個輸出行。 此外,我們還將實現 StepExecutionListener:
public class LineProcessor implements
ItemProcessor<Line, Line>, StepExecutionListener {
private Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public void beforeStep(StepExecution stepExecution) {
logger.debug("Line Processor initialized.");
}
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug(
"Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Line Processor ended.");
return ExitStatus.COMPLETED;
}
}5.5. LinesWriter
與 reader 和 processor 類似,LinesWriter 會寫入一個完整的行塊,以便接收一個包含行的 List,即 Lines 的列表。
public class LinesWriter implements
ItemWriter<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("output.csv");
logger.debug("Line Writer initialized.");
}
@Override
public void write(List<? extends Line> lines) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Line Writer ended.");
return ExitStatus.COMPLETED;
}
}LinesWriter 代碼本身就能説明一切。同時,我們已經準備好測試我們的工作。
5.6. 運行作業
我們將創建一個新的測試,與我們為任務方法(tasklets)方法創建的測試相同:
@SpringBatchTest
@EnableAutoConfiguration
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenChunksJob_whenJobEnds_thenStatusCompleted()
throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}在完成上述 ChunksConfig 和 TaskletsConfig 的配置,我們就可以開始運行測試了!
任務完成後,我們可以看到 output.csv 再次包含預期的結果,日誌則描述了流程:
[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized.
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized.
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended.
[main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.我們得到了相同的結果,並且流程有所不同。日誌清晰地展示了這種方法下任務的執行過程。
6. 結論
不同的上下文會顯示一種方法或另一種方法的必要性。 雖然 Tasklets 在“一次完成一項任務”的場景下感覺更自然,但 chunks 則提供了一種簡單的方法來處理分頁讀取或我們不想將大量數據保存在內存中的情況。