1. 簡介
在 Spring Batch 中,<em CompositeItemReader</em> 是一個將多個 <em ItemReader</em> 實例組合成單個讀取器的工具。這在我們需要從多個來源讀取數據或以特定順序處理數據時尤其有用。例如,我們可能同時想要從數據庫和文件讀取記錄,或者以特定順序處理來自兩個不同表的記錄。
<em CompositeItemReader</em> 簡化了在批處理作業中處理多個讀取器的任務,確保了高效和靈活的數據處理。在本教程中,我們將學習如何實現一個 <em CompositeItemReader</em>>,並查看示例和測試用例以驗證其行為。
2. 理解
通過委託給 實例列表來完成讀取過程。 它按照定義的順序讀取每個讀者的項目,確保數據以序列化的方式處理。
這在以下場景中尤其有用:
- 從多個數據庫或表讀取數據
- 組合文件和數據庫中的數據
- 以特定順序處理來自不同來源的數據
此外, 屬於 包,並在 Spring Batch 5.2.0 版本中引入。
3. 實現 CompositeItemReader
下面我們通過一個示例,演示如何從兩個不同的數據源讀取數據:一個平面文件和一個數據庫。目標是將產品數據從這兩個源組合成一個單一的流,以便進行批量處理。一些產品存儲在平面文件中,而另一些產品存儲在數據庫中,以確保所有可用的記錄一起處理。
3.1. 創建 Product 類</h3
在設置讀取器之前,我們需要一個 Product 類,用於表示正在處理的數據結構。該類封裝了產品的詳細信息,例如其 ID、名稱、庫存可用性和價格。 我們將在從 CSV 文件和數據庫中讀取數據時使用該模型,以確保數據處理的一致性。
Product 類作為讀取器和批處理作業之間的傳輸對象 (DTO):
public class Product {
private Long productId;
private String productName;
private Integer stock;
private BigDecimal price;
public Product(Long productId, String productName, Integer stock, BigDecimal price) {
this.productId = productId;
this.productName = productName;
this.stock = stock;
this.price = price;
}
// Getters and Setters
}產品類代表將在我們的批處理作業中處理的每個記錄。 鑑於我們的數據模型已準備就緒,我們將為 CSV 文件和數據庫創建單獨的 ItemReader 組件。
3.2. 使用產品數據平文件讀取器
該讀取器使用 <em >FlatFileItemReader</em > 從 CSV 文件中獲取數據。我們配置它讀取分隔文件(<em >products.csv</em >>)並將其字段映射到 <em >Product</em >> 類:
@Bean
public FlatFileItemReader<Product> fileReader() {
return new FlatFileItemReaderBuilder<Product>()
.name("fileReader")
.resource(new ClassPathResource("products.csv"))
.delimited()
.names("productId", "productName", "stock", "price")
.linesToSkip(1)
.targetType(Product.class)
.build();
}在這裏,delimited() 方法確保數據字段使用分隔符(默認情況下為逗號)進行分隔。 names() 方法定義與 Product 類屬性相匹配的列名,而 targetType(Product.class) 方法將字段映射到類屬性。
3.3. 產品數據讀取器
接下來,我們定義了一個 JdbcCursorItemReader 來從名為 products 的數據庫表中檢索產品數據。 該讀取器執行一個 SQL 查詢以檢索產品詳情並將它們映射到我們的 Product 類。
以下是數據庫讀取器的實現:
@Bean
public JdbcCursorItemReader<Product> dbReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Product>()
.name("dbReader")
.dataSource(dataSource())
.sql("SELECT productid, productname, stock, price FROM products")
.rowMapper((rs, rowNum) -> new Product(
rs.getLong("productid"),
rs.getString("productname"),
rs.getInt("stock"),
rs.getBigDecimal("price")))
.build();
}JdbcCursorItemReader 通過使用遊標,一次讀取數據庫中的一行記錄,從而使其在批量處理中高效。rowMapper() 函數將結果集中的每一列映射到 Product 類中的相應字段。
4. 使用 CompositeItemReader 合併讀取器
現在,我們已經配置了 CSV 和數據庫讀取器以讀取產品數據,我們可以使用 CompositeItemReader 將它們集成:
@Bean
public CompositeItemReader<Product> compositeReader() {
return new CompositeItemReader<>(Arrays.asList(fileReader(), dbReader()));
}通過配置我們的 CompositeItemReader,我們可以按順序處理來自多個來源的數據。
最初,FlatFileItemReader 從 CSV 文件中讀取產品記錄,將每一行解析為 Product 對象。 在所有文件中的行都已處理完畢後,JdbcCursorItemReader 啓動並直接從數據庫中檢索產品數據。
5. 配置批量作業
一旦我們定義了 CSV 文件和數據庫的讀者,下一步是配置批量作業本身。在 Spring Batch 中,一個作業由多個步驟組成,其中每個步驟處理數據處理管道中的特定部分:
@Bean
public Job productJob(JobRepository jobRepository, Step step) {
return new JobBuilder("productJob", jobRepository)
.start(step)
.build();
}
@Bean
public Step step(ItemReader compositeReader, ItemWriter productWriter) {
return new StepBuilder("productStep", jobRepository)
.<Product, Product>chunk(10, transactionManager)
.reader(compositeReader)
.writer(productWriter)
.build();
}在這種情況下,我們的任務包含一個單一步驟,它讀取產品數據,以10個塊進行處理,並將數據寫入目標輸出。
productJob Bean 負責定義批處理作業。它使用 productStep 開始執行,該步驟配置為處理產品數據。
通過這種配置,我們的批處理作業首先使用 CompositeItemReader 從多個來源讀取產品數據,以10個塊進行處理,並使用 productWriter() 將轉換或過濾後的數據寫入。這確保了平穩高效的批處理管道。
6. 運行批處理作業
現在我們已經配置了讀取器和作業,下一步是運行批處理作業並觀察 CompositeItemReader 的行為。我們將在一個 Spring Boot 應用程序中運行該作業,以查看它如何處理來自 CSV 文件和數據庫的數據。
為了程序化觸發批處理作業,我們需要使用 JobLauncher。這允許我們啓動作業並監控其進度:
@Bean
public CommandLineRunner runJob() {
return args -> {
try {
jobLauncher.run(productJob, new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters());
} catch (Exception e) {
// handle exception
}
};
}在此示例中,我們創建一個 CommandLineRunner Bean,用於在應用程序啓動時運行任務。這通過使用 JobLauncher 調用 productJob。我們還添加了具有時間戳的唯一 JobParameters,以確保任務每次運行都是唯一的。
7. 測試複合項目讀取器
為了確保CompositeItemReader 正常工作,我們將測試其功能,以確保它能夠正確地從 CSV 和數據庫源中讀取產品信息。
7.1. 準備測試數據
我們將首先準備一個包含產品數據的 CSV 文件,該文件將作為 CompositeItemReader 的輸入。
productId,productName,stock,price
101,Apple,50,1.99然後,我們還插入一條記錄到 products 表中:
@BeforeEach
public void setUp() {
jdbcTemplate.update("INSERT INTO products (productid, productname, stock, price) VALUES (?, ?, ?, ?)",
102, "Banana", 30, 1.49);
}7.2. 測試順序讀取順序
現在,我們將測試 CompositeItemReader 以驗證它以正確的順序處理產品,從 CSV 文件和數據庫源讀取。 在此測試中,我們從 CSV 文件中讀取一個產品,然後是數據庫,並斷言值與我們的預期相符:
@Test
public void givenTwoReaders_whenRead_thenProcessProductsInOrder() throws Exception {
StepExecution stepExecution = new StepExecution(
"testStep",
new JobExecution(1L, new JobParameters()),
1L);
ExecutionContext executionContext = stepExecution.getExecutionContext();
compositeReader.open(executionContext);
Product product1 = compositeReader.read();
assertNotNull(product1);
assertEquals(101, product1.getProductId());
assertEquals("Apple", product1.getProductName());
Product product2 = compositeReader.read();
assertNotNull(product2);
assertEquals(102, product2.getProductId());
assertEquals("Banana", product2.getProductName());
}7.3. 測試單個讀取器返回空結果的情況
本節測試當使用多個讀取器時,CompositeItemReader 中一個讀取器返回 null 的行為。 這對於確保 CompositeItemReader 跳過任何不返回數據的讀取器,並繼續到下一個讀取器,直到找到有效數據至關重要。
@Test
public void givenMultipleReader_whenOneReaderReturnNull_thenProcessDataFromNextReader() throws Exception {
ItemStreamReader<Product> emptyReader = mock(ItemStreamReader.class);
when(emptyReader.read()).thenReturn(null);
ItemStreamReader<Product> validReader = mock(ItemStreamReader.class);
when(validReader.read()).thenReturn(new Product(103L, "Cherry", 20, BigDecimal.valueOf(2.99)), null);
CompositeItemReader<Product> compositeReader = new CompositeItemReader<>(
Arrays.asList(emptyReader, validReader));
Product product = compositeReader.read();
assertNotNull(product);
assertEquals(103, product.getProductId());
assertEquals("Cherry", product.getProductName());
}8. 結論
在本文中,我們學習瞭如何實現和測試一個 <em >CompositeItemReader</em >>,它允許我們以特定順序處理來自多個數據源的數據。通過將讀取器鏈接在一起,我們可以以特定順序處理來自文件、數據庫或其他數據源的數據。