知識庫 / Spring / Spring Boot RSS 訂閱

一個支持多處理器和多寫入器的一體化讀取器(Spring Batch)

Spring Boot
HongKong
4
10:45 AM · Dec 06 ,2025

1. 引言

在本教程中,我們將探討如何實現一個 Spring Batch 作業,該作業包含一個讀取器、多個處理器和多個寫入器。這種方法在我們需要一次讀取數據,以不同的方式對其進行處理,然後將結果寫入多個目標時非常有用。

2. 設置 Spring Batch 項目

在開始之前,我們需要在我們的 pom.xml 文件中包含 Spring Boot Starter BatchSpring Boot Starter Data JPA 依賴項:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.5.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
    <version>3.4.2</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

這些依賴項引入了 Spring Batch 用於我們的作業處理,Spring Data JPA 用於數據庫操作,以及 H2 作為內存數據庫,用於開發目的。

2.1. 準備輸入 CSV 文件

在實施批量組件之前,我們需要一些樣本數據進行處理。讓我們創建一個名為 customers.csv 的簡單 CSV 文件,內容如下:

id,name,email,type
1,John,[email protected],A
2,Alice,[email protected],B
3,Bob,[email protected],A
4,Eve,[email protected],B

此文件包含客户記錄,其中包含四個字段:唯一標識符、姓名、電子郵件地址和一種指定類型,這將決定我們的處理路徑。我們將存儲此文件在項目中的 目錄下。

2.2. 創建數據模型

我們的批處理作業需要一個 Java 類來表示 CSV 文件中的客户數據。讓我們創建一個 Customer 實體類,該類映射到我們的數據庫表:

@Entity
public class Customer {
    @Id
    private Long id;
    private String name;
    private String email;
    private String type;
    
    // Constructors, getters, and setters
}

3. 實現 CSV 讀取器

我們可以現在創建用於從我們的 CSV 文件中讀取記錄的組件。 Spring Batch 通過 FlatFileItemReader 類,為平面文件讀取提供了極佳的支持:

@Bean
public FlatFileItemReader<Customer> customerReader() {
    return new FlatFileItemReaderBuilder<Customer>()
      .name("customerItemReader")
      .resource(new ClassPathResource("customers.csv"))
      .delimited()
      .names("id", "name", "email", "type")
      .fieldSetMapper(new BeanWrapperFieldSetMapper<Customer>() {{
        setTargetType(Customer.class);
      }})
      .build();
}

此配置創建一個讀取器,它逐行解析我們的 CSV 文件,並將每一條記錄映射到 客户對象。 該讀取器自動處理文件打開和關閉,並以塊的方式處理數據以提高內存效率。

names() 方法中指定的字段名稱必須與我們的 CSV 標題和 客户類屬性相匹配。

4. 創建條件處理器

我們將創建兩個獨立的處理器和一個路由機制,以選擇它們之間的差異。 這些處理器均實現了 Spring Batch 的 ItemProcessor 接口,該接口定義了一個單一的方法 process(),用於在將輸入數據寫入之前對其進行轉換:

public class TypeAProcessor implements ItemProcessor<Customer, Customer> {
    @Override
    public Customer process(Customer customer) {
        customer.setName(customer.getName().toUpperCase());
        customer.setEmail("A_" + customer.getEmail());
        return customer;
    }
}

public class TypeBProcessor implements ItemProcessor<Customer, Customer> {
    @Override
    public Customer process(Customer customer) {
        customer.setName(customer.getName().toLowerCase());
        customer.setEmail("B_" + customer.getEmail());
        return customer;
    }
}

TypeAProcessor 處理類型為 A 類型的客户,通過將他們的姓名轉換為大寫併為其電子郵件地址添加前綴。process() 方法接受一個 Customer 對象,對其進行轉換,並返回修改後的版本。

對於類型為 B 類型的客户,TypeBProcessor 應用不同的轉換,將姓名轉換為小寫並使用不同的 email 前綴。兩個處理器都實現了相同的 ItemProcessor 接口,因此它們可以在我們的處理管道中互換使用。

5. 實現處理器路由器

為了將我們的處理器連接到適當的記錄中,我們需要一個路由機制,該機制會檢查每個客户的類型字段:

public class CustomerProcessorRouter implements ItemProcessor<Customer, Customer> {
    private final TypeAProcessor typeAProcessor;
    private final TypeBProcessor typeBProcessor;

    public CustomerProcessorRouter(TypeAProcessor typeAProcessor, 
      TypeBProcessor typeBProcessor) {
        this.typeAProcessor = typeAProcessor;
        this.typeBProcessor = typeBProcessor;
    }

    @Override
    public Customer process(Customer customer) throws Exception {
        if ("A".equals(customer.getType())) {
            return typeAProcessor.process(customer);
        } else if ("B".equals(customer.getType())) {
            return typeBProcessor.process(customer);
        }
        return customer;
    }
}

我們的路由器類會檢查每個傳入的 Customer 對象,並根據 type 字段將其委派到適當的處理器。這種設計使我們的處理邏輯與 job 定義中的單一處理步驟保持分離,同時保持了邏輯的清晰性。

6. 配置多個寫入器

在根據類型對我們的數據進行不同的處理後,我們希望將結果寫入多個目標位置。我們將實現數據庫寫入器和平面文件寫入器。

6.1. 數據庫寫入器配置

我們首先創建數據庫寫入器組件,該組件將處理所有 JPA 操作:

@Bean
public JpaItemWriter<Customer> dbWriter(EntityManagerFactory entityManagerFactory) {
    JpaItemWriter<Customer> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManagerFactory);
    return writer;
}

這位作者使用 JPA 將我們處理過的 Customer 對象持久化到數據庫。在作業執行期間,這個 JpaItemWriter 將持久化我們處理過的 Customer 對象到配置的數據庫,處理所有必要的 JPA 操作,包括插入和更新。

6.2. 平文件寫入器配置

對於我們的次要輸出目的地,我們實現了平文件寫入器,該寫入器生成 CSV 文件:

@Bean
public FlatFileItemWriter<Customer> fileWriter() {
    return new FlatFileItemWriterBuilder<Customer>()
      .name("customerItemWriter")
      .resource(new FileSystemResource("output/processed_customers.txt"))
      .delimited()
      .delimiter(",")
      .names("id", "name", "email", "type")
      .build();
}

FlatFileItemWriter 配置為使用逗號作為分隔符,幷包含與我們的 Customer 實體屬性相匹配的顯式字段命名。 在作業執行期間,該寫入器將創建一個結構化的 CSV 文件,其中包含所有已處理的客户記錄,格式符合指定要求。

6.3. 組合寫入器(Composite Writer)

為了同時將數據寫入多個目標,我們將使用 Spring Batch 的 CompositeItemWriter

@Bean
public CompositeItemWriter<Customer> compositeWriter(
  JpaItemWriter<Customer> dbWriter,
  FlatFileItemWriter<Customer> fileWriter) {
    CompositeItemWriter<Customer> writer = new CompositeItemWriter<>();
    writer.setDelegates(List.of(dbWriter, fileWriter));
    return writer;
}

這個複合寫入器作為寫入器的委託,確保每個處理過的項目都寫入所有目標位置。委託的順序決定了寫入的順序。

7. 配置步驟和作業

現在,讓我們將所有內容整合在一起,通過創建步驟和作業配置來實現:

@Bean
public Job processCustomersJob(JobBuilderFactory jobs,
  StepBuilderFactory steps,
  FlatFileItemReader<Customer> reader,
  CustomerProcessorRouter processor,
  CompositeItemWriter<Customer> writer) {

    Step step = steps.get("processCustomersStep")
      .<Customer, Customer>chunk(10)
      .reader(reader)
      .processor(processor)
      .writer(writer)
      .build();

    return jobs.get("customerProcessingJob")
      .start(step)
      .build();
}

此作業配置定義了一個單步操作,它以 10 個客户的塊讀取客户數據,通過我們的路由器處理每個客户,並將結果同時寫入數據庫和平面文件。

8. 運行和測試作業

為了驗證我們的批處理作業按預期運行,我們編寫一個集成測試,該測試啓動作業並斷言不同客户類型的數據庫和輸出文件結果:

List<Customer> dbCustomers = jdbcTemplate.query(
    "SELECT id, name, email, type FROM customer WHERE type = 'A'",
    (rs, rowNum) -> new Customer(
        rs.getLong("id"),
        rs.getString("name"),
        rs.getString("email"),
        rs.getString("type"))
);

assertFalse(dbCustomers.isEmpty());

dbCustomers.forEach(c -> {
    assertEquals(c.getName(), c.getName().toUpperCase());
    assertTrue(c.getEmail().startsWith("A_"));
});

Path outputFile = Paths.get("output/processed_customers.txt");
assertTrue(Files.exists(outputFile));

List<String> lines = Files.readAllLines(outputFile);

boolean hasTypeB = lines.stream().anyMatch(line -> line.endsWith(",B"));
assertTrue(hasTypeB);

lines.forEach(line -> {
    String[] parts = line.split(",");
    if ("B".equals(parts[3])) {
        assertEquals(parts[1], parts[1].toLowerCase());
        assertTrue(parts[2].startsWith("B_"));
    }
});

從測試用例來看,首先我們查詢數據庫以驗證類型為 A 的客户已成功保存,且他們的姓名已轉換為大寫,電子郵件前綴為“A_”。接下來,我們還讀取輸出文件以確認類型為 B 的客户,他們的姓名已轉換為小寫,電子郵件前綴為“B_”。

9. 結論

在本文中,我們學習瞭如何使用單個讀取器但多個處理器和寫入器配置一個 Spring Batch 任務。我們從 CSV 文件中讀取數據,根據其內容將每個記錄路由到特定的處理器,最後將寫入任務委派給多個寫入器。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.