點擊上方“程序員蝸牛g”,選擇“設為星標”
跟蝸牛哥一起,每天進步一點點
程序員蝸牛g
大廠程序員一枚 跟蝸牛一起 每天進步一點點
33篇原創內容
公眾號
(一)整體架構思路
- 任務分片:將原始任務列表按指定大小拆分,每個分片作為一個子任務,避免單個任務過大導致的併發效率低下;
- 線程池執行:用ThreadPoolExecutor創建自定義線程池,提交所有子任務併發執行;
- 進度統計:用線程安全的原子類(AtomicInteger)統計已完成、失敗任務數,避免併發計數問題;
- 結果彙總:用線程安全的集合(CopyOnWriteArrayList)存儲每個任務的執行結果,支持後續查詢;
- 分階段協同:用CountDownLatch實現 “等待所有子任務完成後彙總結果”,用CyclicBarrier實現 “多階段任務協同”(如 “讀取→處理→彙總” 三步流程);
- 容錯處理:每個子任務執行時捕獲異常,記錄失敗原因,確保單個任務失敗不中斷整體流程。
(二)核心技術選型
- 線程池:ThreadPoolExecutor(自定義配置,適配不同任務場景);
- 併發工具:CountDownLatch(等待所有子任務完成)、CyclicBarrier(多階段任務協同);
- 線程安全組件:AtomicInteger(原子計數)、CopyOnWriteArrayList(線程安全結果存儲);
- 任務抽象:定義Task接口,支持任意類型任務的擴展(數據導入、短信發送等)。
(三)核心流程圖
三、處理器實現:BatchTaskProcessor 核心代碼
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 批量任務處理器:支持任務分片、併發執行、進度監控、結果彙總
*/
public class BatchTaskProcessor<T> {
// 線程池(核心配置)
private final ExecutorService executorService;
// 任務分片大小
private final int batchSize;
// 已完成任務數(原子類,線程安全)
private final AtomicInteger completedCount = new AtomicInteger(0);
// 失敗任務數(原子類,線程安全)
private final AtomicInteger failedCount = new AtomicInteger(0);
// 任務執行結果列表(線程安全集合)
private final List<TaskResult<T>> resultList = new CopyOnWriteArrayList<>();
/**
* 構造器:自定義線程池配置
* @param corePoolSize 核心線程數
* @param maxPoolSize 最大線程數
* @param keepAliveTime 非核心線程空閒時間
* @param batchSize 任務分片大小
*/
public BatchTaskProcessor(int corePoolSize, int maxPoolSize, long keepAliveTime, int batchSize) {
this.batchSize = Math.max(batchSize, 1); // 分片大小至少為1
// 初始化線程池(自定義配置,避免OOM)
this.executorService = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), // 有界任務隊列,避免任務堆積
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略:提交線程自行執行
);
}
/**
* 核心方法:執行批量任務(無階段協同)
* @param taskList 原始任務列表
* @param taskHandler 任務處理器(自定義任務執行邏輯)
* @return 執行結果彙總
* @throws InterruptedException 線程中斷異常
*/
public BatchResult<T> executeBatchTasks(List<T> taskList, TaskHandler<T> taskHandler) throws InterruptedException {
if (taskList == null || taskList.isEmpty()) {
return new BatchResult<>(0, 0, resultList);
}
// 1. 任務分片
List<List<T>> taskShards = splitTask(taskList, batchSize);
int totalTaskCount = taskList.size();
CountDownLatch countDownLatch = new CountDownLatch(taskShards.size());
System.out.println("📋 批量任務開始執行:");
System.out.println(" - 總任務數:" + totalTaskCount);
System.out.println(" - 任務分片數:" + taskShards.size());
System.out.println(" - 線程池線程數:" + executorService.toString());
// 2. 提交分片任務到線程池
for (int i = 0; i < taskShards.size(); i++) {
int shardIndex = i;
List<T> shard = taskShards.get(i);
executorService.submit(() -> {
try {
// 執行當前分片的所有任務
for (T task : shard) {
TaskResult<T> result = new TaskResult<>();
result.setTask(task);
try {
// 自定義任務執行邏輯(如導入數據、發送短信)
boolean success = taskHandler.process(task);
result.setSuccess(success);
if (success) {
completedCount.incrementAndGet();
} else {
failedCount.incrementAndGet();
result.setErrorMessage("任務執行失敗(無異常)");
}
} catch (Exception e) {
// 捕獲任務執行異常,記錄失敗原因
failedCount.incrementAndGet();
result.setSuccess(false);
result.setErrorMessage("任務執行失敗:" + e.getMessage());
}
// 存儲任務結果(線程安全)
resultList.add(result);
}
System.out.printf("✅ 分片%d執行完成,處理任務數:%d\n", shardIndex, shard.size());
} finally {
// 分片任務完成,倒計時減1(必須在finally中,確保必執行)
countDownLatch.countDown();
}
});
}
// 3. 主線程阻塞等待所有分片任務完成
countDownLatch.await();
// 4. 關閉線程池(不再接收新任務,等待已提交任務完成)
executorService.shutdown();
// 5. 返回彙總結果
return new BatchResult<>(completedCount.get(), failedCount.get(), resultList);
}
/**
* 進階方法:分階段批量任務(如“讀取→處理→彙總”)
* @param taskList 原始任務列表
* @param readHandler 讀取階段處理器
* @param processHandler 處理階段處理器
* @param summaryHandler 彙總階段處理器
* @return 最終彙總結果
* @throws InterruptedException 線程中斷異常
* @throws BrokenBarrierException 屏障損壞異常
*/
public String executePhasedBatchTasks(List<T> taskList,
TaskHandler<T> readHandler,
TaskHandler<T> processHandler,
SummaryHandler<T> summaryHandler) throws InterruptedException, BrokenBarrierException {
if (taskList == null || taskList.isEmpty()) {
return "無任務可執行";
}
// 任務分片
List<List<T>> taskShards = splitTask(taskList, batchSize);
int shardCount = taskShards.size();
// CyclicBarrier:分階段協同,所有分片完成當前階段後,進入下一階段
CyclicBarrier barrier = new CyclicBarrier(shardCount, () -> {
// 屏障動作:所有分片完成當前階段後執行(如打印階段進度)
System.out.println("\n🚩 當前階段所有分片執行完成,進入下一階段");
});
// 初始化線程池
ExecutorService phasedExecutor = new ThreadPoolExecutor(
shardCount, shardCount, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
System.out.println("📋 分階段批量任務開始執行:");
System.out.println(" - 總任務數:" + taskList.size());
System.out.println(" - 分片數:" + shardCount);
// 提交分階段任務
for (int i = 0; i < shardCount; i++) {
int shardIndex = i;
List<T> shard = taskShards.get(i);
phasedExecutor.submit(() -> {
try {
// 第一階段:讀取數據(如從文件/數據庫讀取)
System.out.printf("📥 分片%d開始讀取階段\n", shardIndex);
for (T task : shard) {
readHandler.process(task);
}
System.out.printf("📥 分片%d讀取階段完成\n", shardIndex);
// 等待所有分片完成讀取階段
barrier.await();
// 第二階段:處理數據(如數據清洗、業務邏輯處理)
System.out.printf("⚙️ 分片%d開始處理階段\n", shardIndex);
for (T task : shard) {
processHandler.process(task);
}
System.out.printf("⚙️ 分片%d處理階段完成\n", shardIndex);
// 等待所有分片完成處理階段
barrier.await();
// 第三階段:彙總數據(如統計分片結果)
System.out.printf("📊 分片%d開始彙總階段\n", shardIndex);
summaryHandler.summary(shard);
System.out.printf("📊 分片%d彙總階段完成\n", shardIndex);
// 等待所有分片完成彙總階段
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
System.err.printf("❌ 分片%d執行異常:%s\n", shardIndex, e.getMessage());
Thread.currentThread().interrupt();
} catch (Exception e) {
System.err.printf("❌ 分片%d執行異常:%s\n", shardIndex, e.getMessage());
}
});
}
// 等待所有任務完成,關閉線程池
phasedExecutor.shutdown();
while (!phasedExecutor.isTerminated()) {
Thread.sleep(100);
}
// 最終彙總結果
return summaryHandler.getFinalSummary();
}
/**
* 工具方法:任務分片(將原始任務列表按batchSize拆分)
*/
private List<List<T>> splitTask(List<T> taskList, int batchSize) {
List<List<T>> shards = new ArrayList<>();
int total = taskList.size();
int start = 0;
while (start < total) {
int end = Math.min(start + batchSize, total);
shards.add(taskList.subList(start, end));
start = end;
}
return shards;
}
/**
* 任務處理器接口(自定義任務執行邏輯)
*/
public interface TaskHandler<T> {
boolean process(T task) throws Exception;
}
/**
* 彙總處理器接口(自定義彙總邏輯)
*/
public interface SummaryHandler<T> {
void summary(List<T> shardTask);
String getFinalSummary();
}
/**
* 單個任務執行結果實體
*/
public static class TaskResult<T> {
private T task;
private boolean success;
private String errorMessage;
// getter/setter
public T getTask() { return task; }
public void setTask(T task) { this.task = task; }
public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
public String getErrorMessage() { return errorMessage; }
public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
}
/**
* 批量任務彙總結果實體
*/
public static class BatchResult<T> {
private int completedCount; // 成功任務數
private int failedCount; // 失敗任務數
private List<TaskResult<T>> taskResults; // 所有任務結果
public BatchResult(int completedCount, int failedCount, List<TaskResult<T>> taskResults) {
this.completedCount = completedCount;
this.failedCount = failedCount;
this.taskResults = taskResults;
}
// getter
public int getCompletedCount() { return completedCount; }
public int getFailedCount() { return failedCount; }
public List<TaskResult<T>> getTaskResults() { return taskResults; }
@Override
public String toString() {
return "\n📊 批量任務執行彙總:" +
"\n - 總任務數:" + (completedCount + failedCount) +
"\n - 成功數:" + completedCount +
"\n - 失敗數:" + failedCount +
"\n - 成功率:" + String.format("%.2f%%", (double) completedCount / (completedCount + failedCount) * 100);
}
}
}
處理器使用:2 種常見場景示例
(一)場景 1:基礎批量任務(無階段協同)
批量發送短信任務,無需分階段,直接併發執行所有任務:
import java.util.Arrays;
import java.util.List;
public class BasicBatchDemo {
public static void main(String[] args) {
// 1. 模擬100條短信發送任務(任務數據:手機號)
List<String> phoneList = Arrays.asList(
"13800138000", "13900139000", "13700137000",
// ... 省略97條手機號
"13600136000"
);
// 2. 創建批量任務處理器(CPU密集型任務:核心線程數=CPU核心數+1;IO密集型=2*CPU核心數+1)
// 短信發送是IO密集型(網絡請求),此處配置8線程,分片大小10
BatchTaskProcessor<String> processor = new BatchTaskProcessor<>(
8, 10, 60, 10
);
// 3. 自定義任務執行邏輯(短信發送)
BatchTaskProcessor.TaskHandler<String> smsHandler = phone -> {
// 模擬短信發送邏輯(如調用第三方短信API)
System.out.printf("發送短信到:%s\n", phone);
Thread.sleep(100); // 模擬網絡請求耗時
return true; // 執行成功
};
// 4. 執行批量任務並獲取結果
try {
BatchTaskProcessor.BatchResult<String> result = processor.executeBatchTasks(phoneList, smsHandler);
System.out.println(result);
// 5. 輸出失敗任務詳情(可選)
result.getTaskResults().stream()
.filter(taskResult -> !taskResult.isSuccess())
.forEach(taskResult -> System.out.printf("❌ 失敗任務:%s,原因:%s\n",
taskResult.getTask(), taskResult.getErrorMessage()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
(二)場景 2:分階段批量任務(讀取→處理→彙總)
批量導入用户數據任務,分三階段執行:讀取 Excel 數據→清洗數據→彙總統計:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class PhasedBatchDemo {
public static void main(String[] args) {
// 1. 模擬100條原始用户數據(待導入)
List<String> userDataList = Arrays.asList(
"張三,20,13800138000", "李四,25,13900139000",
// ... 省略98條數據
"王五,30,13700137000"
);
// 2. 創建批量任務處理器(分片大小15)
BatchTaskProcessor<String> processor = new BatchTaskProcessor<>(
6, 8, 60, 15
);
// 3. 自定義分階段處理器
// 第一階段:讀取數據(模擬從Excel讀取)
BatchTaskProcessor.TaskHandler<String> readHandler = data -> {
System.out.printf("讀取數據:%s\n", data);
return true;
};
// 第二階段:處理數據(模擬數據清洗,去除無效數據)
BatchTaskProcessor.TaskHandler<String> processHandler = data -> {
String[] parts = data.split(",");
if (parts.length != 3) {
throw new RuntimeException("數據格式錯誤:" + data);
}
System.out.printf("清洗數據:%s→%s(格式校驗通過)\n", data, parts[0]);
return true;
};
// 第三階段:彙總統計(統計成功導入的用户數)
AtomicInteger totalImported = new AtomicInteger(0);
BatchTaskProcessor.SummaryHandler<String> summaryHandler = new BatchTaskProcessor.SummaryHandler<String>() {
@Override
public void summary(List<String> shardTask) {
// 統計當前分片的有效數據數
int validCount = (int) shardTask.stream()
.filter(data -> data.split(",").length == 3)
.count();
totalImported.addAndGet(validCount);
}
@Override
public String getFinalSummary() {
return String.format("\n📊 分階段任務彙總:成功導入用户數=%d,總處理數據數=%d",
totalImported.get(), userDataList.size());
}
};
// 4. 執行分階段批量任務
try {
String finalResult = processor.executePhasedBatchTasks(
userDataList, readHandler, processHandler, summaryHandler
);
System.out.println(finalResult);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
核心技術解析:線程池與併發工具的協同價值
(一)線程池:批量任務的 “高效執行引擎”
- 自定義配置適配場景:通過corePoolSize、maxPoolSize等參數,根據任務類型(CPU 密集型 / IO 密集型)調整線程數,避免資源浪費;
- 任務分片提升併發:將大量任務拆分為小分片,避免單個任務過大導致的線程阻塞,同時減少線程切換開銷;
- 拒絕策略保障穩健:使用CallerRunsPolicy拒絕策略,當任務隊列滿時,由提交線程自行執行任務,避免任務丟失。
(二)CountDownLatch:單向等待的 “同步開關”
- 用於 “主線程等待所有子任務完成”:初始化倒計時次數為分片數,每個分片任務完成後調用countDown(),主線程通過await()阻塞等待,確保彙總結果時所有任務已執行完畢;
- finally塊中調用countDown():確保即使任務執行異常,倒計時也會正常減少,避免主線程無限阻塞。
(三)CyclicBarrier:多階段協同的 “階段閘門”
- 用於 “分階段任務協同”:每個階段所有分片完成後,通過屏障動作(如打印階段進度)通知,再進入下一階段,確保階段執行的有序性;
- 可重複使用:相比 CountDownLatch,CyclicBarrier 支持多輪同步,完美適配 “讀取→處理→彙總” 等多階段流程。
(四)線程安全組件:併發環境的數據保障
- AtomicInteger:原子計數,避免多線程併發修改任務數導致的計數錯誤;
- CopyOnWriteArrayList:線程安全的集合,適用於 “讀多寫少” 的場景(批量任務中結果讀取頻率低於寫入頻率),避免ArrayList在併發環境下的ConcurrentModificationException。
避坑指南:批量任務處理器的使用注意事項
線程池配置需適配任務類型:
- CPU 密集型任務(如數據計算):核心線程數設為CPU核心數 + 1(避免 CPU 空閒);
- IO 密集型任務(如網絡請求、文件讀寫):核心線程數設為2 * CPU核心數 + 1(充分利用 CPU 在 IO 等待時的空閒時間);
- 任務隊列必須用有界隊列(如ArrayBlockingQueue),避免無界隊列導致的任務堆積和 OOM。
併發工具的正確使用:
- CountDownLatch:確保countDown()調用次數與初始化次數一致,建議在finally塊中執行;
- CyclicBarrier:處理BrokenBarrierException(如線程中斷、超時導致的屏障損壞),必要時調用reset()重置屏障;
- 避免在屏障動作中執行耗時操作:屏障動作由最後一個到達的線程執行,耗時操作會阻塞所有線程進入下一階段。
任務分片大小合理設置:
- 分片過小:導致分片數過多,線程切換頻繁,效率下降;
- 分片過大:單個分片任務執行時間過長,併發優勢不明顯;
- 建議:根據任務平均執行時間調整,分片大小控制在 10-100 之間(IO 密集型可適當增大)。
異常處理不可忽視:
- 單個任務執行異常必須捕獲:避免單個任務失敗導致整個分片任務中斷;
- 記錄失敗任務詳情:方便後續覆盤和重試,提升批量任務的可維護性。
實戰總結:併發編程的高效實踐
這個批量任務處理器的核心是 “線程池 + 併發工具” 的組合拳 —— 線程池解決 “任務高效執行” 問題,併發工具解決 “任務同步協同” 問題,兩者結合讓批量任務處理既高效又穩健。
如果這篇文章對您有所幫助,或者有所啓發的話,求一鍵三連:點贊、轉發、在看。
關注公眾號:woniuxgg,在公眾號中回覆:筆記 就可以獲得蝸牛為你精心準備的java實戰語雀筆記,回覆面試、開發手冊、有超讚的粉絲福利