知識庫 / Spring RSS 訂閱

從 ItemReader 中訪問 Job 參數(Spring Batch)

Spring
HongKong
2
11:18 AM · Dec 06 ,2025

1. 概述

Spring Batch 是一個強大的 Java 批處理框架,因此成為數據處理活動和計劃任務運行的流行選擇。根據業務邏輯的複雜性,一個任務可以依賴於不同的配置值和動態參數。

在本文中,我們將探討如何使用 JobParameters 以及如何從關鍵批處理組件中訪問它們。

2. 演示環境搭建

我們將開發一個用於藥店服務的 Spring Batch。主要業務任務是查找即將到期的藥品、根據銷售情況計算新價格,並通知消費者關於即將到期藥品的提醒。此外,我們還將從內存中的 H2 數據庫中讀取數據,並將所有處理詳情寫入日誌,以簡化實現。

2.1. 依賴項

要啓動演示應用程序,我們需要添加 Spring Batch 和 H2 依賴項:

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.2.224</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.2.0</version>
</dependency>

我們可以在 Maven Central 倉庫中找到最新的 H2Spring Batch 版本。

2.2. 準備測試數據

首先,定義模式在 schema-all.sql 中:

DROP TABLE medicine IF EXISTS;

CREATE TABLE medicine  (
    med_id VARCHAR(36) PRIMARY KEY,
    name VARCHAR(30),
    type VARCHAR(30),
    expiration_date TIMESTAMP,
    original_price DECIMAL,
    sale_price DECIMAL
);

初始測試數據已提供在 data.sql 中:

INSERT INTO medicine VALUES ('ec278dd3-87b9-4ad1-858f-dfe5bc34bdb5', 'Lidocaine', 'ANESTHETICS', DATEADD('DAY', 120, CURRENT_DATE), 10, null);
INSERT INTO medicine VALUES ('9d39321d-34f3-4eb7-bb9a-a69734e0e372', 'Flucloxacillin', 'ANTIBACTERIALS', DATEADD('DAY', 40, CURRENT_DATE), 20, null);
INSERT INTO medicine VALUES ('87f4ff13-de40-4c7f-95db-627f309394dd', 'Amoxicillin', 'ANTIBACTERIALS', DATEADD('DAY', 70, CURRENT_DATE), 30, null);
INSERT INTO medicine VALUES ('acd99d6a-27be-4c89-babe-0edf4dca22cb', 'Prozac', 'ANTIDEPRESSANTS', DATEADD('DAY', 30, CURRENT_DATE), 40, null);

Spring Boot 將這些文件作為應用程序啓動的一部分運行,並且我們將使用這些測試數據在我們的測試執行中。

2.3. 醫藥領域類 (Medicine)

為了我們的服務,我們需要一個簡單的 Medicine實體類

@AllArgsConstructor
@Data
public class Medicine {
    private UUID id;
    private String name;
    private MedicineCategory type;
    private Timestamp expirationDate;
    private Double originalPrice;
    private Double salePrice;
}

ItemReader 使用 expirationDate 字段來計算藥物是否即將過期。salePrice 字段將在藥物接近過期日期時由 ItemProcessor 更新。

2.4. 應用屬性

應用程序需要 src/main/resources/application.properties 文件中多個屬性。

spring.batch.job.enabled=false
batch.medicine.cron=0 */1 * * * *
batch.medicine.alert_type=LOGS
batch.medicine.expiration.default.days=60
batch.medicine.start.sale.default.days=45
batch.medicine.sale=0.1

由於我們只配置一個任務,spring.batch.job.enabled 應設置為 false 以禁用初始任務的執行。默認情況下,Spring 在上下文中啓動後會以空參數運行該任務:

[main] INFO  o.s.b.a.b.JobLauncherApplicationRunner - Running default command line with: []
<div>
 <p><em>批次.medicine.cron</em> 屬性定義了計劃任務的 cron 表達式。根據定義的場景,我們應該每天運行該任務。但是,在我們的情況下,該任務每分鐘運行一次,以便能夠輕鬆地檢查處理行為。</p>
</div>
<p>其他屬性對於<em>InputReader</em>、<em>InputProcessor</em> 和<em>InputWriter</em> 來説是必需的,以便執行業務邏輯。</p>

3. 任務參數

Spring Batch 包含一個 JobParameters 類,用於存儲特定任務運行的運行時參數。該功能在各種情況下都非常有用。例如,它允許傳遞在特定運行期間生成的動態變量。此外,它還使創建可以根據客户端提供的參數啓動任務的控制器成為可能。

在我們的場景中,我們將使用此類來存儲應用程序參數和動態運行時參數。

3.1. StepScopeJobScope

除了常規 Spring 中已知的 Bean 作用域之外,Spring Batch 還引入了兩個額外的作用域:StepScopeJobScope。 通過這些作用域,可以為工作流中的每個步驟或作業創建唯一的 Bean。 Spring 確保與特定步驟/作業關聯的資源在整個生命週期中被隔離和獨立管理。

擁有此功能,我們可以輕鬆地控制上下文並共享特定運行所需的屬性,覆蓋讀取、處理和寫入部分。 為了能夠注入作業參數,我們需要使用 @StepScope@JobScope 註解來標註依賴的 Bean。

3.2. 在計劃執行中填充作業參數

讓我們定義 MedExpirationBatchRunner 類,該類將使用 cron 表達式啓動我們的作業(在本例中每 1 分鐘一次)。我們應該使用 @EnableScheduling 註解並定義適當的 @Scheduled 方法入口:

@Component
@EnableScheduling
public class MedExpirationBatchRunner {
    ...
    @Scheduled(cron = "${batch.medicine.cron}", zone = "GMT")
    public void runJob() {
        ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
        launchJob(now);
    }
}

為了手動啓動任務,我們應該使用 <em >JobLaucher</em> 類並提供 <em >JobParameter</em><em >JobLauncher#</em><em >run()</em> 方法中。 在我們的示例中,我們提供了從 <em >application.properties</em> 中提取的值,以及兩個運行特定的參數(任務觸發日期和跟蹤 ID):

public void launchJob(ZonedDateTime triggerZonedDateTime) {
    try {
        JobParameters jobParameters = new JobParametersBuilder()
          .addString(BatchConstants.TRIGGERED_DATE_TIME, triggerZonedDateTime.toString())
          .addString(BatchConstants.ALERT_TYPE, alertType)
          .addLong(BatchConstants.DEFAULT_EXPIRATION, defaultExpiration)
          .addLong(BatchConstants.SALE_STARTS_DAYS, saleStartDays)
          .addDouble(BatchConstants.MEDICINE_SALE, medicineSale)
          .addString(BatchConstants.TRACE_ID, UUID.randomUUID().toString())
          .toJobParameters();

        jobLauncher.run(medExpirationJob, jobParameters);
    } catch (Exception e) {
        log.error("Failed to run", e);
    }
}

配置參數後,我們有幾種方法可以在代碼中使用這些值。

3.3. 從 Bean 定義讀取作業參數

使用 SpEL,我們可以從配置類中的 Bean 定義訪問作業參數。Spring 將所有參數組合成一個 StringObject 映射:

@Bean
@StepScope
public MedicineProcessor medicineProcessor(@Value("#{jobParameters}") Map<String, Object> jobParameters) {
    ...
}

在方法內部,我們將使用 jobParameters 來初始化 MedicineProcessor 的正確字段。

3.4. 在服務中讀取作業參數

另一種選擇是在 ItemReader 本身中使用 setter 注入。 我們可以像從任何其他映射中一樣,通過 SpEL 表達式獲取確切的參數值:

@Setter
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> {

    @Value("#{jobParameters['DEFAULT_EXPIRATION']}")
    private long defaultExpiration;
}

我們需要確保 SpEL 中使用的鍵與參數初始化期間使用的鍵相同。

3.5. 通過前置步驟讀取作業參數

Spring Batch 提供了一個 StepExecutionListener 接口,允許我們監聽步驟執行階段:在步驟開始之前以及步驟完成後。我們可以利用此功能,訪問步驟開始之前的所有屬性,並執行任何自定義邏輯。最簡單的方法是使用 @BeforeStep 註解,它對應於 beforeStep() 方法,該方法來自 StepExecutionListener

@BeforeStep
public void beforeStep(StepExecution stepExecution) {
    JobParameters parameters = stepExecution.getJobExecution()
      .getJobParameters();
    ...
    log.info("Before step params: {}", parameters);
}

4. 任務配置

讓我們將所有部分組合起來,以獲得整體視圖。

讀者、處理器和寫入器需要兩個屬性:BatchConstants.TRIGGERED_DATE_TIMEBatchConstants.TRACE_ID

我們將使用相同的提取邏輯,用於所有步驟 Bean 定義中的常見參數:

private void enrichWithJobParameters(Map<String, Object> jobParameters, ContainsJobParameters container) {
    if (jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) != null) {
        container.setTriggeredDateTime(ZonedDateTime.parse(jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME)
          .toString()));
    }
    if (jobParameters.get(BatchConstants.TRACE_ID) != null) {
        container.setTraceId(jobParameters.get(BatchConstants.TRACE_ID).toString());
    }
}

總而言之,其他參數是特定於組件的,並且沒有通用的邏輯。

4.1. 配置 ItemReader

首先,我們需要配置 ItemReader 並豐富常用參數:

@Bean
@StepScope
public ExpiresSoonMedicineReader expiresSoonMedicineReader(JdbcTemplate jdbcTemplate, @Value("#{jobParameters}") Map<String, Object> jobParameters) {

    ExpiresSoonMedicineReader medicineReader = new ExpiresSoonMedicineReader(jdbcTemplate);
    enrichWithJobParameters(jobParameters, medicineReader);
    return medicineReader;
}

讓我們更詳細地瞭解一下精確讀取器的實現。<em>TriggeredDateTime</em><em>traceId</em> 參數在 Bean 構造過程中直接注入,<em>defaultExpiration</em> 參數通過 setter 注入由 Spring 注入。為了演示,我們已經在 <em>doOpen()</em> 方法中使用了一切:

public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> implements ContainsJobParameters {

    private ZonedDateTime triggeredDateTime;
    private String traceId;
    @Value("#{jobParameters['DEFAULT_EXPIRATION']}")
    private long defaultExpiration;

    private List<Medicine> expiringMedicineList;

    ...

    @Override
    protected void doOpen() {
        expiringMedicineList = jdbcTemplate.query(FIND_EXPIRING_SOON_MEDICINE, ps -> ps.setLong(1, defaultExpiration), (rs, row) -> getMedicine(rs));

        log.info("Trace = {}. Found {} meds that expires soon", traceId, expiringMedicineList.size());
        if (!expiringMedicineList.isEmpty()) {
            setMaxItemCount(expiringMedicineList.size());
        }
    }

    @PostConstruct
    public void init() {
        setName(ClassUtils.getShortName(getClass()));
    }

}

ItemReader 不應標記為 @Component。 此外,我們需要調用 setName() 方法來設置所需的讀取器名稱。

4.2. 配置 ItemProcessorItemWriter

ItemProcessorItemWriter 遵循與 ItemReader 相同的策略,因此它們不需要進行任何特定的配置即可訪問參數。 Bean 定義邏輯通過 enrichWithJobParameters() 方法初始化常用的參數。 其他用於單個類且不需要在所有組件中填充的參數,則由 Spring 通過 setter 注入在相應類中進行豐富。

我們應該標記所有依賴於屬性的 Bean 使用 @StepScope 註解。 否則,Spring 只會在上下文啓動時創建 Bean 一次,並且不會將參數的值注入。

4.3. 配置完整流程

我們無需採取任何特定操作來配置任務參數。因此,我們只需要將所有 Bean 組合起來:

@Bean
public Job medExpirationJob(JobRepository jobRepository,
    PlatformTransactionManager transactionManager,
    MedicineWriter medicineWriter,
    MedicineProcessor medicineProcessor,
    ExpiresSoonMedicineReader expiresSoonMedicineReader) {
    Step notifyAboutExpiringMedicine = new StepBuilder("notifyAboutExpiringMedicine", jobRepository).<Medicine, Medicine>chunk(10)
      .reader(expiresSoonMedicineReader)
      .processor(medicineProcessor)
      .writer(medicineWriter)
      .faultTolerant()
      .transactionManager(transactionManager)
      .build();

    return new JobBuilder("medExpirationJob", jobRepository)
      .incrementer(new RunIdIncrementer())
      .start(notifyAboutExpiringMedicine)
      .build();
}

5. 運行應用程序

讓我們運行一個完整的示例,並查看應用程序如何使用所有參數。我們需要從 SpringBatchExpireMedicationApplication 類啓動 Spring Boot 應用程序。

當計劃任務執行後,Spring 會記錄所有參數:

INFO  o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=medExpirationJob]] launched with the following parameters: [{'SALE_STARTS_DAYS':'{value=45, type=class java.lang.Long, identifying=true}','MEDICINE_SALE':'{value=0.1, type=class java.lang.Double, identifying=true}','TRACE_ID':'{value=e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, type=class java.lang.String, identifying=true}','ALERT_TYPE':'{value=LOGS, type=class java.lang.String, identifying=true}','TRIGGERED_DATE_TIME':'{value=2023-12-06T22:36:00.011436600Z, type=class java.lang.String, identifying=true}','DEFAULT_EXPIRATION':'{value=60, type=class java.lang.Long, identifying=true}'}]

首先,ItemReader 根據 DEFAULT_EXPIRATION 參數,記錄已發現的藥品信息:

INFO  c.b.b.job.ExpiresSoonMedicineReader - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. Found 2 meds that expires soon

第二,ItemProcessor 使用 SALE_STARTS_DAYSMEDICINE_SALE 參數來計算新的價格:

INFO  c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 18.0 for medicine 9d39321d-34f3-4eb7-bb9a-a69734e0e372
INFO  c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 36.0 for medicine acd99d6a-27be-4c89-babe-0edf4dca22cb

最後,ItemWriter 會將更新後的藥物記錄寫入同一追蹤的日誌:

INFO  c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=9d39321d-34f3-4eb7-bb9a-a69734e0e372, name=Flucloxacillin, type=ANTIBACTERIALS, expirationDate=2024-01-16 00:00:00.0, originalPrice=20.0, salePrice=18.0)
INFO  c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=acd99d6a-27be-4c89-babe-0edf4dca22cb, name=Prozac, type=ANTIDEPRESSANTS, expirationDate=2024-01-06 00:00:00.0, originalPrice=40.0, salePrice=36.0)
INFO  c.b.b.job.MedicineWriter - Finishing job started at 2023-12-07T11:58:00.014430400Z

6. 結論

在本文中,我們學習瞭如何在 Spring Batch 中使用 Job 參數。<em ItemReader</em>>, <em ItemProcessor</em>>, 和 <em ItemWriter</em>> 可以通過 Bean 初始化時手動添加參數,也可以通過 Spring 注入的<em @BeforeStep>` 或 setter 注入進行增強。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.