知識庫 / Spring RSS 訂閱

Spring Batch 入門指南

Spring
HongKong
2
02:51 PM · Dec 06 ,2025

1. 概述

在本教程中,我們將探討 Spring Batch 的一個實用、代碼導向的入門介紹。Spring Batch 是一種為任務的穩健執行而設計的處理框架。

其當前版本 5.2.0 支持 Spring 6.2.0 和 Java 17+。

這裏列舉了一些 該框架的有趣且實用的用例。

2. 工作流程基礎

Spring Batch 遵循傳統的批處理架構,其中一個作業倉庫負責調度和與作業進行交互。

一個作業可以包含多個步驟。並且每個步驟通常遵循讀取數據、處理數據和寫入數據的序列。

當然,框架會在這裏完成大部分繁重的工作,尤其是在處理作業的低級持久化工作方面,使用 h2 作為作業倉庫。

2.1. 示例使用場景

我們將要解決的簡單使用場景是,將財務交易數據從 CSV 格式遷移到 XML 格式。

輸入文件具有非常簡單的結構。

它包含每行一條交易記錄,由用户名、用户 ID、交易日期和金額組成:

username, userid, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411

3. Maven POM

本項目所需依賴項包括:Spring Batch CoreSpring Object/XML Marshalling (OXM),以及 H2 數據庫

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-oxm</artifactId>
    <version>6.2.0</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.3.232</version>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-core</artifactId>
    <version>5.2.0</version>
</dependency>

4. 自動創建 Spring Batch 模式

當使用 Spring Batch 時,我們可以使用預打包的 SQL 初始化腳本來在啓動時自動創建模式。 此外,當我們使用嵌入式 H2 數據庫時,Spring Boot 會自動運行相應的 SQL 初始化腳本以初始化數據庫。 但是,當我們使用其他受支持的數據庫時,需要配置 Spring Boot 屬性,以便它自動檢測數據庫並在啓動時運行相應的 SQL 初始化腳本。

我們可以配置 Spring Boot 的 <em >application.properties</em > 文件以啓用自動數據庫初始化:

spring.batch.jdbc.initialize-schema=always

當然,以下是翻譯後的內容:

或者,我們還可以配置 Spring Boot 的 application.yml 文件以啓用自動數據庫初始化:

spring:
  batch:
    jdbc:
      initialize-schema: "always"

此外,我們不應為 BatchConfig 類添加 @EnableBatchProcessing 註解。 這樣做是為了讓 Spring Boot 負責配置 Spring Batch,包括為自動配置的數據源創建 Batch 模式。

相反,我們可以通過在 Spring Boot 的 application.properties 文件中將同一屬性設置為 never 來關閉嵌入式 H2 數據庫中 Spring Batch 模式的自動創建:

spring.batch.jdbc.initialize-schema=never

再次,我們也可以在 Spring Boot 的 application.yml文件中禁用自動數據庫初始化:

spring:
  batch:
    jdbc:
      initialize-schema: "never"

5. Spring Batch 和 Job 配置

以下展示了基本的 Spring Batch 配置,以及針對 CSV 到 XML 功能的 Job 描述。

基於 Java 的 Job 配置:

@Profile("spring")
public class SpringBatchConfig {

    @Value("input/record.csv")
    private Resource inputCsv;

    @Value("file:xml/output.xml")
    private Resource outputXml;

    @Bean
    public ItemReader<Transaction> itemReader()
      throws UnexpectedInputException, ParseException {
        FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        String[] tokens = { "username", "userid", "transactiondate", "amount" };
        tokenizer.setNames(tokens);
        reader.setResource(inputCsv);
        DefaultLineMapper<Transaction> lineMapper = 
          new DefaultLineMapper<Transaction>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
        reader.setLineMapper(lineMapper);
        return reader;
    }

    @Bean
    public ItemProcessor<Transaction, Transaction> itemProcessor() {
        return new CustomItemProcessor();
    }

    @Bean
    public ItemWriter<Transaction> itemWriter(Marshaller marshaller)
      throws MalformedURLException {
        StaxEventItemWriter<Transaction> itemWriter = 
          new StaxEventItemWriter<Transaction>();
        itemWriter.setMarshaller(marshaller);
        itemWriter.setRootTagName("transactionRecord");
        itemWriter.setResource(outputXml);
        return itemWriter;
    }

    @Bean
    public Marshaller marshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setClassesToBeBound(new Class[] { Transaction.class });
        return marshaller;
    }

    @Bean
    protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, 
      ItemReader<Transaction> reader, ItemProcessor<Transaction, Transaction> processor, 
      ItemWriter<Transaction> writer) {
        return new StepBuilder("step1", jobRepository)
          .<Transaction, Transaction> chunk(10, transactionManager)
          .reader(reader).processor(processor).writer(writer).build();
    }

    @Bean(name = "firstBatchJob")
    public Job job(JobRepository jobRepository, @Qualifier("step1") Step step1) {
        return new JobBuilder("firstBatchJob", jobRepository).preventRestart().start(step1).build();
    }
    
    public DataSource dataSource() {
     EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
     return builder.setType(EmbeddedDatabaseType.H2)
       .addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
       .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
       .build();
    }
    
    @Bean(name = "transactionManager")
    public PlatformTransactionManager getTransactionManager() {
        return new ResourcelessTransactionManager();
    }
    
    @Bean(name = "jobRepository")
    public JobRepository getJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource());
        factory.setTransactionManager(getTransactionManager());
        factory.afterPropertiesSet();
        return factory.getObject();
    }
    
    @Bean(name = "jobLauncher")
    public JobLauncher getJobLauncher() throws Exception {
       TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
       jobLauncher.setJobRepository(getJobRepository());
       jobLauncher.afterPropertiesSet();
       return jobLauncher;
    }
}

以及基於XML的配置:

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="input/record.csv" />
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="names" value="username,userid,transactiondate,amount" />
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="com.baeldung.batch.service.RecordFieldSetMapper" />
            </property>
        </bean>
    </property>
    <property name="linesToSkip" value="1" />
</bean>

<bean id="itemProcessor" class="com.baeldung.batch.service.CustomItemProcessor" />

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" value="file:xml/output.xml" />
    <property name="marshaller" ref="marshaller" />
    <property name="rootTagName" value="transactionRecord" />
</bean>

<bean id="marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
    <property name="classesToBeBound">
        <list>
            <value>com.baeldung.batch.model.Transaction</value>
        </list>
    </property>
</bean>

<batch:job id="firstBatchJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="itemProcessor" commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>


<!-- connect to H2 database -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.h2.Driver" />
    <property name="url" value="jdbc:h2:file:~/repository" />
    <property name="username" value="" />
    <property name="password" value="" />
</bean>

<!-- stored job-meta in database -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <property name="dataSource" ref="dataSource" />
    <property name="transactionManager" ref="transactionManager" />
    <property name="databaseType" value="h2" />
</bean>

<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>

現在我們已經有了整個配置,讓我們將其分解並開始討論。

5.1. 讀取數據並使用 ItemReader 創建對象

首先,我們配置了 cvsFileItemReader,它將從 record.csv 讀取數據並將其轉換為 Transaction 對象:

@SuppressWarnings("restriction")
@XmlRootElement(name = "transactionRecord")
public class Transaction {
    private String username;
    private int userId;
    private LocalDateTime transactionDate;
    private double amount;

    /* getters and setters for the attributes */

    @Override
    public String toString() {
        return "Transaction [username=" + username + ", userId=" + userId
          + ", transactionDate=" + transactionDate + ", amount=" + amount
          + "]";
    }
}

為了實現這一點,它使用了一個自定義映射器:

public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
 
    public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/M/yyy");
        Transaction transaction = new Transaction();
 
        transaction.setUsername(fieldSet.readString("username"));
        transaction.setUserId(fieldSet.readInt(1));
        transaction.setAmount(fieldSet.readDouble(3));
        String dateString = fieldSet.readString(2);
        transaction.setTransactionDate(LocalDate.parse(dateString, formatter).atStartOfDay());
        return transaction;
    }
}

5.2. 使用 ItemProcessor 處理數據

我們創建了自己的 自定義ItemProcessorCustomItemProcessor,它不處理與交易對象相關的任何內容。

它僅負責將來自讀取器(reader)的原始對象傳遞給寫入器(writer):

public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {

    public Transaction process(Transaction item) {
        return item;
    }
}

5.3. 使用 ItemWriter 將對象寫入文件系統

最後,我們將這個 事務 寫入到位於 xml/output.xml 的 XML 文件中:

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" value="file:xml/output.xml" />
    <property name="marshaller" ref="marshaller" />
    <property name="rootTagName" value="transactionRecord" />
</bean>

5.4. 配置批量作業

將所有內容連接起來,使用 <em >batch:job</em> 語法即可完成作業配置。

請注意 <em >commit-interval</em>。它指在提交批量作業到 <em >itemWriter</em> 之前,需要保存在內存中的事務數量。

它會在該點(或直到遇到輸入數據的末尾)保持事務在內存中的狀態。

以下是 Java Bean 和對應的 XML 配置:

@Bean
protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager,
  @Qualifier("itemProcessor") ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) {
    return new StepBuilder("step1", jobRepository)
      .<Transaction, Transaction> chunk(10, transactionManager)
      .reader(itemReader(inputCsv))
      .processor(processor)
      .writer(writer)
      .build();
}
<batch:job id="firstBatchJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter" processor="itemProcessor" commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>
<div>
</div>

5.5. 運行批處理作業

現在,讓我們設置並運行所有內容:

@Profile("spring")
public class App {
    public static void main(String[] args) {
        // Spring Java config
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(SpringConfig.class);
        context.register(SpringBatchConfig.class);
        context.refresh();
        
        JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("firstBatchJob");
        System.out.println("Starting the batch job");
        try {
            JobExecution execution = jobLauncher.run(job, new JobParameters());
            System.out.println("Job Status : " + execution.getStatus());
            System.out.println("Job completed");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Job failed");
        }
    }
}

我們使用 -Dspring.profiles.active=spring 屬性運行 Spring 應用程序。

在下一部分,我們將配置示例,使其在 Spring Boot 應用程序中運行。

6. 以特定順序運行批次作業

在 Spring Batch 中,可以通過創建父作業來指定多個子作業的執行順序。 這在以下情況下特別有用:一個作業的輸出作為另一個作業的輸入,或者由於業務邏輯需要,作業需要按順序執行。

為了演示目的,我們將創建兩個步驟,它們只是簡單地記錄消息:

@Bean
public Step firstStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("firstStep", jobRepository)
      .<String, String>chunk(1, transactionManager)
      .reader(new IteratorItemReader<>(Stream.of("Data from Step 1").iterator()))
      .processor(item -> {
          System.out.println("Processing: " + item);
          return item;
      })
      .writer(items -> items.forEach(System.out::println))
      .build();
}

@Bean
public Step secondStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("secondStep", jobRepository)
      .<String, String>chunk(1, transactionManager)
      .reader(new IteratorItemReader<>(Stream.of("Data from Step 2").iterator()))
      .processor(item -> {
          System.out.println("Processing: " + item);
          return item;
      })
      .writer(items -> items.forEach(System.out::println))
      .build();
}
<div>
<p>接下來,我們創建一個父作業,以預定義的順序執行這些步驟。父作業從 <em >firstStep</em> 開始,然後依次執行 <em >secondStep</em>。</p>
<p>我們可以使用 <em >JobBuilder</em> 來定義步驟的順序:</p>
</div>
@Bean(name = "parentJob")
public Job parentJob(JobRepository jobRepository,
    @Qualifier("firstStep") Step firstStep,
    @Qualifier("secondStep") Step secondStep) {
    return new JobBuilder("parentJob", jobRepository)
      .start(firstStep)
      .next(secondStep)
      .build();
}

現在,讓我們使用應用程序上下文中 JobLauncher 運行該任務。當這些步驟執行時,我們將會看到類似於以下內容的日誌:

Processing: Data from Step 1
Data from Step 1

Processing: Data from Step 2
Data from Step 2

通過遵循這些步驟,我們可以有效地在指定順序中運行多個 Spring Batch 任務,而無需依賴複雜的項處理或讀取機制。

7. Spring Boot 配置

在本節中,我們將創建一個 Spring Boot 應用程序,並將之前的 Spring Batch Config 轉換為在 Spring Boot 環境中運行。實際上,這與之前的 Spring Batch 示例大致相當。

7.1. Maven 依賴

讓我們首先在 Spring Boot 應用程序的 <pom.xml> 中聲明 <a href="https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-batch"><em>spring-boot-starter-batch</em></a> 依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.4.0</version>
</dependency>

我們需要一個數據庫來存儲 Spring Batch 任務的信息

在本教程中,我們使用內存中的 H2 數據庫,如之前配置的。

7.2. Spring Boot 配置

我們使用 <em >@Profile</em> 註解來區分 Spring 和 Spring Boot 配置。我們在應用程序中設置 <em >spring-boot</em> 屬性,以啓用 Spring Boot 配置。

@SpringBootApplication
public class SpringBatchApplication {

    public static void main(String[] args) {
        SpringApplication springApp = new SpringApplication(SpringBatchApplication.class);
        springApp.setAdditionalProfiles("spring-boot");
        springApp.run(args);
    }

}

7.3. Spring Batch Job 配置

我們使用與 SpringBatchConfig 類相同的批處理作業配置:

@Configuration
public class SpringBootBatchConfig {

    @Value("input/record.csv")
    private Resource inputCsv;

    @Value("input/recordWithInvalidData.csv")
    private Resource invalidInputCsv;

    @Value("file:xml/output.xml")
    private Resource outputXml;

    // ...
}

從 Spring Boot 3.0 開始,@EnableBatchProcessing 註解已不再推薦使用。我們手動聲明 JobRepositoryJobLauncherTransactionManager Bean。

此外,JobBuilderFactoryStepBuilderFactory 已被廢棄,建議使用 JobBuilderStepBuilder 類來定義 Job/Step 命名器。

8. 結論

在本文中,我們學習瞭如何使用 Spring Batch 以及如何在簡單用例中使用它。

我們看到了如何輕鬆開發我們的批量處理流水線,以及如何自定義讀取、處理和寫入等不同階段。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.