1. 概述
Spring Batch 是一個強大的 Java 批處理框架,因此成為數據處理活動和計劃任務運行的流行選擇。根據業務邏輯的複雜性,一個任務可以依賴於不同的配置值和動態參數。
在本文中,我們將探討如何使用 JobParameters 以及如何從關鍵批處理組件中訪問它們。
2. 演示環境搭建
我們將開發一個用於藥店服務的 Spring Batch。主要業務任務是查找即將到期的藥品、根據銷售情況計算新價格,並通知消費者關於即將到期藥品的提醒。此外,我們還將從內存中的 H2 數據庫中讀取數據,並將所有處理詳情寫入日誌,以簡化實現。
2.1. 依賴項
要啓動演示應用程序,我們需要添加 Spring Batch 和 H2 依賴項:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.2.0</version>
</dependency>我們可以在 Maven Central 倉庫中找到最新的 H2 和 Spring Batch 版本。
2.2. 準備測試數據
首先,定義模式在 schema-all.sql 中:
DROP TABLE medicine IF EXISTS;
CREATE TABLE medicine (
med_id VARCHAR(36) PRIMARY KEY,
name VARCHAR(30),
type VARCHAR(30),
expiration_date TIMESTAMP,
original_price DECIMAL,
sale_price DECIMAL
);初始測試數據已提供在 data.sql 中:
INSERT INTO medicine VALUES ('ec278dd3-87b9-4ad1-858f-dfe5bc34bdb5', 'Lidocaine', 'ANESTHETICS', DATEADD('DAY', 120, CURRENT_DATE), 10, null);
INSERT INTO medicine VALUES ('9d39321d-34f3-4eb7-bb9a-a69734e0e372', 'Flucloxacillin', 'ANTIBACTERIALS', DATEADD('DAY', 40, CURRENT_DATE), 20, null);
INSERT INTO medicine VALUES ('87f4ff13-de40-4c7f-95db-627f309394dd', 'Amoxicillin', 'ANTIBACTERIALS', DATEADD('DAY', 70, CURRENT_DATE), 30, null);
INSERT INTO medicine VALUES ('acd99d6a-27be-4c89-babe-0edf4dca22cb', 'Prozac', 'ANTIDEPRESSANTS', DATEADD('DAY', 30, CURRENT_DATE), 40, null);Spring Boot 將這些文件作為應用程序啓動的一部分運行,並且我們將使用這些測試數據在我們的測試執行中。
2.3. 醫藥領域類 (Medicine)
為了我們的服務,我們需要一個簡單的 Medicine實體類:
@AllArgsConstructor
@Data
public class Medicine {
private UUID id;
private String name;
private MedicineCategory type;
private Timestamp expirationDate;
private Double originalPrice;
private Double salePrice;
}ItemReader 使用 expirationDate 字段來計算藥物是否即將過期。salePrice 字段將在藥物接近過期日期時由 ItemProcessor 更新。
2.4. 應用屬性
應用程序需要 src/main/resources/application.properties 文件中多個屬性。
spring.batch.job.enabled=false
batch.medicine.cron=0 */1 * * * *
batch.medicine.alert_type=LOGS
batch.medicine.expiration.default.days=60
batch.medicine.start.sale.default.days=45
batch.medicine.sale=0.1
由於我們只配置一個任務,spring.batch.job.enabled 應設置為 false 以禁用初始任務的執行。默認情況下,Spring 在上下文中啓動後會以空參數運行該任務:
[main] INFO o.s.b.a.b.JobLauncherApplicationRunner - Running default command line with: []<div>
<p><em>批次.medicine.cron</em> 屬性定義了計劃任務的 cron 表達式。根據定義的場景,我們應該每天運行該任務。但是,在我們的情況下,該任務每分鐘運行一次,以便能夠輕鬆地檢查處理行為。</p>
</div>
<p>其他屬性對於<em>InputReader</em>、<em>InputProcessor</em> 和<em>InputWriter</em> 來説是必需的,以便執行業務邏輯。</p>
3. 任務參數
Spring Batch 包含一個 JobParameters 類,用於存儲特定任務運行的運行時參數。該功能在各種情況下都非常有用。例如,它允許傳遞在特定運行期間生成的動態變量。此外,它還使創建可以根據客户端提供的參數啓動任務的控制器成為可能。
在我們的場景中,我們將使用此類來存儲應用程序參數和動態運行時參數。
3.1. StepScope 和 JobScope
除了常規 Spring 中已知的 Bean 作用域之外,Spring Batch 還引入了兩個額外的作用域:StepScope 和 JobScope。 通過這些作用域,可以為工作流中的每個步驟或作業創建唯一的 Bean。 Spring 確保與特定步驟/作業關聯的資源在整個生命週期中被隔離和獨立管理。
擁有此功能,我們可以輕鬆地控制上下文並共享特定運行所需的屬性,覆蓋讀取、處理和寫入部分。 為了能夠注入作業參數,我們需要使用 @StepScope 或 @JobScope 註解來標註依賴的 Bean。
3.2. 在計劃執行中填充作業參數
讓我們定義 MedExpirationBatchRunner 類,該類將使用 cron 表達式啓動我們的作業(在本例中每 1 分鐘一次)。我們應該使用 @EnableScheduling 註解並定義適當的 @Scheduled 方法入口:
@Component
@EnableScheduling
public class MedExpirationBatchRunner {
...
@Scheduled(cron = "${batch.medicine.cron}", zone = "GMT")
public void runJob() {
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
launchJob(now);
}
}為了手動啓動任務,我們應該使用 <em >JobLaucher</em> 類並提供 <em >JobParameter</em> 在 <em >JobLauncher#</em> 的 <em >run()</em> 方法中。 在我們的示例中,我們提供了從 <em >application.properties</em> 中提取的值,以及兩個運行特定的參數(任務觸發日期和跟蹤 ID):
public void launchJob(ZonedDateTime triggerZonedDateTime) {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addString(BatchConstants.TRIGGERED_DATE_TIME, triggerZonedDateTime.toString())
.addString(BatchConstants.ALERT_TYPE, alertType)
.addLong(BatchConstants.DEFAULT_EXPIRATION, defaultExpiration)
.addLong(BatchConstants.SALE_STARTS_DAYS, saleStartDays)
.addDouble(BatchConstants.MEDICINE_SALE, medicineSale)
.addString(BatchConstants.TRACE_ID, UUID.randomUUID().toString())
.toJobParameters();
jobLauncher.run(medExpirationJob, jobParameters);
} catch (Exception e) {
log.error("Failed to run", e);
}
}配置參數後,我們有幾種方法可以在代碼中使用這些值。
3.3. 從 Bean 定義讀取作業參數
使用 SpEL,我們可以從配置類中的 Bean 定義訪問作業參數。Spring 將所有參數組合成一個 String 到 Object 映射:
@Bean
@StepScope
public MedicineProcessor medicineProcessor(@Value("#{jobParameters}") Map<String, Object> jobParameters) {
...
}在方法內部,我們將使用 jobParameters 來初始化 MedicineProcessor 的正確字段。
3.4. 在服務中讀取作業參數
另一種選擇是在 ItemReader 本身中使用 setter 注入。 我們可以像從任何其他映射中一樣,通過 SpEL 表達式獲取確切的參數值:
@Setter
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> {
@Value("#{jobParameters['DEFAULT_EXPIRATION']}")
private long defaultExpiration;
}我們需要確保 SpEL 中使用的鍵與參數初始化期間使用的鍵相同。
3.5. 通過前置步驟讀取作業參數
Spring Batch 提供了一個 StepExecutionListener 接口,允許我們監聽步驟執行階段:在步驟開始之前以及步驟完成後。我們可以利用此功能,訪問步驟開始之前的所有屬性,並執行任何自定義邏輯。最簡單的方法是使用 @BeforeStep 註解,它對應於 beforeStep() 方法,該方法來自 StepExecutionListener。
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
JobParameters parameters = stepExecution.getJobExecution()
.getJobParameters();
...
log.info("Before step params: {}", parameters);
}4. 任務配置
讓我們將所有部分組合起來,以獲得整體視圖。
讀者、處理器和寫入器需要兩個屬性:BatchConstants.TRIGGERED_DATE_TIME 和 BatchConstants.TRACE_ID。
我們將使用相同的提取邏輯,用於所有步驟 Bean 定義中的常見參數:
private void enrichWithJobParameters(Map<String, Object> jobParameters, ContainsJobParameters container) {
if (jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) != null) {
container.setTriggeredDateTime(ZonedDateTime.parse(jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME)
.toString()));
}
if (jobParameters.get(BatchConstants.TRACE_ID) != null) {
container.setTraceId(jobParameters.get(BatchConstants.TRACE_ID).toString());
}
}總而言之,其他參數是特定於組件的,並且沒有通用的邏輯。
4.1. 配置 ItemReader
首先,我們需要配置 ItemReader 並豐富常用參數:
@Bean
@StepScope
public ExpiresSoonMedicineReader expiresSoonMedicineReader(JdbcTemplate jdbcTemplate, @Value("#{jobParameters}") Map<String, Object> jobParameters) {
ExpiresSoonMedicineReader medicineReader = new ExpiresSoonMedicineReader(jdbcTemplate);
enrichWithJobParameters(jobParameters, medicineReader);
return medicineReader;
}讓我們更詳細地瞭解一下精確讀取器的實現。<em>TriggeredDateTime</em> 和 <em>traceId</em> 參數在 Bean 構造過程中直接注入,<em>defaultExpiration</em> 參數通過 setter 注入由 Spring 注入。為了演示,我們已經在 <em>doOpen()</em> 方法中使用了一切:
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> implements ContainsJobParameters {
private ZonedDateTime triggeredDateTime;
private String traceId;
@Value("#{jobParameters['DEFAULT_EXPIRATION']}")
private long defaultExpiration;
private List<Medicine> expiringMedicineList;
...
@Override
protected void doOpen() {
expiringMedicineList = jdbcTemplate.query(FIND_EXPIRING_SOON_MEDICINE, ps -> ps.setLong(1, defaultExpiration), (rs, row) -> getMedicine(rs));
log.info("Trace = {}. Found {} meds that expires soon", traceId, expiringMedicineList.size());
if (!expiringMedicineList.isEmpty()) {
setMaxItemCount(expiringMedicineList.size());
}
}
@PostConstruct
public void init() {
setName(ClassUtils.getShortName(getClass()));
}
}ItemReader 不應標記為 @Component。 此外,我們需要調用 setName() 方法來設置所需的讀取器名稱。
4.2. 配置 ItemProcessor 和 ItemWriter
ItemProcessor 和 ItemWriter 遵循與 ItemReader 相同的策略,因此它們不需要進行任何特定的配置即可訪問參數。 Bean 定義邏輯通過 enrichWithJobParameters() 方法初始化常用的參數。 其他用於單個類且不需要在所有組件中填充的參數,則由 Spring 通過 setter 注入在相應類中進行豐富。
我們應該標記所有依賴於屬性的 Bean 使用 @StepScope 註解。 否則,Spring 只會在上下文啓動時創建 Bean 一次,並且不會將參數的值注入。
4.3. 配置完整流程
我們無需採取任何特定操作來配置任務參數。因此,我們只需要將所有 Bean 組合起來:
@Bean
public Job medExpirationJob(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
MedicineWriter medicineWriter,
MedicineProcessor medicineProcessor,
ExpiresSoonMedicineReader expiresSoonMedicineReader) {
Step notifyAboutExpiringMedicine = new StepBuilder("notifyAboutExpiringMedicine", jobRepository).<Medicine, Medicine>chunk(10)
.reader(expiresSoonMedicineReader)
.processor(medicineProcessor)
.writer(medicineWriter)
.faultTolerant()
.transactionManager(transactionManager)
.build();
return new JobBuilder("medExpirationJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(notifyAboutExpiringMedicine)
.build();
}5. 運行應用程序
讓我們運行一個完整的示例,並查看應用程序如何使用所有參數。我們需要從 SpringBatchExpireMedicationApplication 類啓動 Spring Boot 應用程序。
當計劃任務執行後,Spring 會記錄所有參數:
INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=medExpirationJob]] launched with the following parameters: [{'SALE_STARTS_DAYS':'{value=45, type=class java.lang.Long, identifying=true}','MEDICINE_SALE':'{value=0.1, type=class java.lang.Double, identifying=true}','TRACE_ID':'{value=e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, type=class java.lang.String, identifying=true}','ALERT_TYPE':'{value=LOGS, type=class java.lang.String, identifying=true}','TRIGGERED_DATE_TIME':'{value=2023-12-06T22:36:00.011436600Z, type=class java.lang.String, identifying=true}','DEFAULT_EXPIRATION':'{value=60, type=class java.lang.Long, identifying=true}'}]
首先,ItemReader 根據 DEFAULT_EXPIRATION 參數,記錄已發現的藥品信息:
INFO c.b.b.job.ExpiresSoonMedicineReader - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. Found 2 meds that expires soon第二,ItemProcessor 使用 SALE_STARTS_DAYS 和 MEDICINE_SALE 參數來計算新的價格:
INFO c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 18.0 for medicine 9d39321d-34f3-4eb7-bb9a-a69734e0e372
INFO c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 36.0 for medicine acd99d6a-27be-4c89-babe-0edf4dca22cb
最後,ItemWriter 會將更新後的藥物記錄寫入同一追蹤的日誌:
INFO c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=9d39321d-34f3-4eb7-bb9a-a69734e0e372, name=Flucloxacillin, type=ANTIBACTERIALS, expirationDate=2024-01-16 00:00:00.0, originalPrice=20.0, salePrice=18.0)
INFO c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=acd99d6a-27be-4c89-babe-0edf4dca22cb, name=Prozac, type=ANTIDEPRESSANTS, expirationDate=2024-01-06 00:00:00.0, originalPrice=40.0, salePrice=36.0)
INFO c.b.b.job.MedicineWriter - Finishing job started at 2023-12-07T11:58:00.014430400Z6. 結論
在本文中,我們學習瞭如何在 Spring Batch 中使用 Job 參數。<em ItemReader</em>>, <em ItemProcessor</em>>, 和 <em ItemWriter</em>> 可以通過 Bean 初始化時手動添加參數,也可以通過 Spring 注入的<em @BeforeStep>` 或 setter 注入進行增強。