點擊上方“程序員蝸牛g”,選擇“設為星標”

跟蝸牛哥一起,每天進步一點點

手寫一個高性能批量任務處理器!_List

程序員蝸牛g

大廠程序員一枚 跟蝸牛一起 每天進步一點點

33篇原創內容


公眾號

 


(一)整體架構思路

  • 任務分片:將原始任務列表按指定大小拆分,每個分片作為一個子任務,避免單個任務過大導致的併發效率低下;
  • 線程池執行:用ThreadPoolExecutor創建自定義線程池,提交所有子任務併發執行;
  • 進度統計:用線程安全的原子類(AtomicInteger)統計已完成、失敗任務數,避免併發計數問題;
  • 結果彙總:用線程安全的集合(CopyOnWriteArrayList)存儲每個任務的執行結果,支持後續查詢;
  • 分階段協同:用CountDownLatch實現 “等待所有子任務完成後彙總結果”,用CyclicBarrier實現 “多階段任務協同”(如 “讀取→處理→彙總” 三步流程);
  • 容錯處理:每個子任務執行時捕獲異常,記錄失敗原因,確保單個任務失敗不中斷整體流程。

(二)核心技術選型

  1. 線程池:ThreadPoolExecutor(自定義配置,適配不同任務場景);
  2. 併發工具:CountDownLatch(等待所有子任務完成)、CyclicBarrier(多階段任務協同);
  3. 線程安全組件:AtomicInteger(原子計數)、CopyOnWriteArrayList(線程安全結果存儲);
  4. 任務抽象:定義Task接口,支持任意類型任務的擴展(數據導入、短信發送等)。

(三)核心流程圖

手寫一個高性能批量任務處理器!_List_02

三、處理器實現:BatchTaskProcessor 核心代碼

 

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 批量任務處理器:支持任務分片、併發執行、進度監控、結果彙總
 */
public class BatchTaskProcessor<T> {
    // 線程池(核心配置)
    private final ExecutorService executorService;
    // 任務分片大小
    private final int batchSize;
    // 已完成任務數(原子類,線程安全)
    private final AtomicInteger completedCount = new AtomicInteger(0);
    // 失敗任務數(原子類,線程安全)
    private final AtomicInteger failedCount = new AtomicInteger(0);
    // 任務執行結果列表(線程安全集合)
    private final List<TaskResult<T>> resultList = new CopyOnWriteArrayList<>();

    /**
     * 構造器:自定義線程池配置
     * @param corePoolSize 核心線程數
     * @param maxPoolSize 最大線程數
     * @param keepAliveTime 非核心線程空閒時間
     * @param batchSize 任務分片大小
     */
    public BatchTaskProcessor(int corePoolSize, int maxPoolSize, long keepAliveTime, int batchSize) {
        this.batchSize = Math.max(batchSize, 1); // 分片大小至少為1
        // 初始化線程池(自定義配置,避免OOM)
        this.executorService = new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100), // 有界任務隊列,避免任務堆積
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略:提交線程自行執行
        );
    }

    /**
     * 核心方法:執行批量任務(無階段協同)
     * @param taskList 原始任務列表
     * @param taskHandler 任務處理器(自定義任務執行邏輯)
     * @return 執行結果彙總
     * @throws InterruptedException 線程中斷異常
     */
    public BatchResult<T> executeBatchTasks(List<T> taskList, TaskHandler<T> taskHandler) throws InterruptedException {
        if (taskList == null || taskList.isEmpty()) {
            return new BatchResult<>(0, 0, resultList);
        }

        // 1. 任務分片
        List<List<T>> taskShards = splitTask(taskList, batchSize);
        int totalTaskCount = taskList.size();
        CountDownLatch countDownLatch = new CountDownLatch(taskShards.size());

        System.out.println("📋 批量任務開始執行:");
        System.out.println(" - 總任務數:" + totalTaskCount);
        System.out.println(" - 任務分片數:" + taskShards.size());
        System.out.println(" - 線程池線程數:" + executorService.toString());

        // 2. 提交分片任務到線程池
        for (int i = 0; i < taskShards.size(); i++) {
            int shardIndex = i;
            List<T> shard = taskShards.get(i);
            executorService.submit(() -> {
                try {
                    // 執行當前分片的所有任務
                    for (T task : shard) {
                        TaskResult<T> result = new TaskResult<>();
                        result.setTask(task);
                        try {
                            // 自定義任務執行邏輯(如導入數據、發送短信)
                            boolean success = taskHandler.process(task);
                            result.setSuccess(success);
                            if (success) {
                                completedCount.incrementAndGet();
                            } else {
                                failedCount.incrementAndGet();
                                result.setErrorMessage("任務執行失敗(無異常)");
                            }
                        } catch (Exception e) {
                            // 捕獲任務執行異常,記錄失敗原因
                            failedCount.incrementAndGet();
                            result.setSuccess(false);
                            result.setErrorMessage("任務執行失敗:" + e.getMessage());
                        }
                        // 存儲任務結果(線程安全)
                        resultList.add(result);
                    }
                    System.out.printf("✅ 分片%d執行完成,處理任務數:%d\n", shardIndex, shard.size());
                } finally {
                    // 分片任務完成,倒計時減1(必須在finally中,確保必執行)
                    countDownLatch.countDown();
                }
            });
        }

        // 3. 主線程阻塞等待所有分片任務完成
        countDownLatch.await();
        // 4. 關閉線程池(不再接收新任務,等待已提交任務完成)
        executorService.shutdown();
        // 5. 返回彙總結果
        return new BatchResult<>(completedCount.get(), failedCount.get(), resultList);
    }

    /**
     * 進階方法:分階段批量任務(如“讀取→處理→彙總”)
     * @param taskList 原始任務列表
     * @param readHandler 讀取階段處理器
     * @param processHandler 處理階段處理器
     * @param summaryHandler 彙總階段處理器
     * @return 最終彙總結果
     * @throws InterruptedException 線程中斷異常
     * @throws BrokenBarrierException 屏障損壞異常
     */
    public String executePhasedBatchTasks(List<T> taskList,
                                          TaskHandler<T> readHandler,
                                          TaskHandler<T> processHandler,
                                          SummaryHandler<T> summaryHandler) throws InterruptedException, BrokenBarrierException {
        if (taskList == null || taskList.isEmpty()) {
            return "無任務可執行";
        }

        // 任務分片
        List<List<T>> taskShards = splitTask(taskList, batchSize);
        int shardCount = taskShards.size();
        // CyclicBarrier:分階段協同,所有分片完成當前階段後,進入下一階段
        CyclicBarrier barrier = new CyclicBarrier(shardCount, () -> {
            // 屏障動作:所有分片完成當前階段後執行(如打印階段進度)
            System.out.println("\n🚩 當前階段所有分片執行完成,進入下一階段");
        });

        // 初始化線程池
        ExecutorService phasedExecutor = new ThreadPoolExecutor(
                shardCount, shardCount, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        System.out.println("📋 分階段批量任務開始執行:");
        System.out.println(" - 總任務數:" + taskList.size());
        System.out.println(" - 分片數:" + shardCount);

        // 提交分階段任務
        for (int i = 0; i < shardCount; i++) {
            int shardIndex = i;
            List<T> shard = taskShards.get(i);
            phasedExecutor.submit(() -> {
                try {
                    // 第一階段:讀取數據(如從文件/數據庫讀取)
                    System.out.printf("📥 分片%d開始讀取階段\n", shardIndex);
                    for (T task : shard) {
                        readHandler.process(task);
                    }
                    System.out.printf("📥 分片%d讀取階段完成\n", shardIndex);
                    // 等待所有分片完成讀取階段
                    barrier.await();

                    // 第二階段:處理數據(如數據清洗、業務邏輯處理)
                    System.out.printf("⚙️  分片%d開始處理階段\n", shardIndex);
                    for (T task : shard) {
                        processHandler.process(task);
                    }
                    System.out.printf("⚙️  分片%d處理階段完成\n", shardIndex);
                    // 等待所有分片完成處理階段
                    barrier.await();

                    // 第三階段:彙總數據(如統計分片結果)
                    System.out.printf("📊 分片%d開始彙總階段\n", shardIndex);
                    summaryHandler.summary(shard);
                    System.out.printf("📊 分片%d彙總階段完成\n", shardIndex);
                    // 等待所有分片完成彙總階段
                    barrier.await();

                } catch (InterruptedException | BrokenBarrierException e) {
                    System.err.printf("❌ 分片%d執行異常:%s\n", shardIndex, e.getMessage());
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    System.err.printf("❌ 分片%d執行異常:%s\n", shardIndex, e.getMessage());
                }
            });
        }

        // 等待所有任務完成,關閉線程池
        phasedExecutor.shutdown();
        while (!phasedExecutor.isTerminated()) {
            Thread.sleep(100);
        }

        // 最終彙總結果
        return summaryHandler.getFinalSummary();
    }

    /**
     * 工具方法:任務分片(將原始任務列表按batchSize拆分)
     */
    private List<List<T>> splitTask(List<T> taskList, int batchSize) {
        List<List<T>> shards = new ArrayList<>();
        int total = taskList.size();
        int start = 0;
        while (start < total) {
            int end = Math.min(start + batchSize, total);
            shards.add(taskList.subList(start, end));
            start = end;
        }
        return shards;
    }

    /**
     * 任務處理器接口(自定義任務執行邏輯)
     */
    public interface TaskHandler<T> {
        boolean process(T task) throws Exception;
    }

    /**
     * 彙總處理器接口(自定義彙總邏輯)
     */
    public interface SummaryHandler<T> {
        void summary(List<T> shardTask);
        String getFinalSummary();
    }

    /**
     * 單個任務執行結果實體
     */
    public static class TaskResult<T> {
        private T task;
        private boolean success;
        private String errorMessage;

        // getter/setter
        public T getTask() { return task; }
        public void setTask(T task) { this.task = task; }
        public boolean isSuccess() { return success; }
        public void setSuccess(boolean success) { this.success = success; }
        public String getErrorMessage() { return errorMessage; }
        public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
    }

    /**
     * 批量任務彙總結果實體
     */
    public static class BatchResult<T> {
        private int completedCount; // 成功任務數
        private int failedCount;    // 失敗任務數
        private List<TaskResult<T>> taskResults; // 所有任務結果

        public BatchResult(int completedCount, int failedCount, List<TaskResult<T>> taskResults) {
            this.completedCount = completedCount;
            this.failedCount = failedCount;
            this.taskResults = taskResults;
        }

        // getter
        public int getCompletedCount() { return completedCount; }
        public int getFailedCount() { return failedCount; }
        public List<TaskResult<T>> getTaskResults() { return taskResults; }

        @Override
        public String toString() {
            return "\n📊 批量任務執行彙總:" +
                    "\n - 總任務數:" + (completedCount + failedCount) +
                    "\n - 成功數:" + completedCount +
                    "\n - 失敗數:" + failedCount +
                    "\n - 成功率:" + String.format("%.2f%%", (double) completedCount / (completedCount + failedCount) * 100);
        }
    }
}

 

處理器使用:2 種常見場景示例

(一)場景 1:基礎批量任務(無階段協同)

批量發送短信任務,無需分階段,直接併發執行所有任務:

 

import java.util.Arrays;
import java.util.List;

public class BasicBatchDemo {
    public static void main(String[] args) {
        // 1. 模擬100條短信發送任務(任務數據:手機號)
        List<String> phoneList = Arrays.asList(
                "13800138000", "13900139000", "13700137000",
                // ... 省略97條手機號
                "13600136000"
        );

        // 2. 創建批量任務處理器(CPU密集型任務:核心線程數=CPU核心數+1;IO密集型=2*CPU核心數+1)
        // 短信發送是IO密集型(網絡請求),此處配置8線程,分片大小10
        BatchTaskProcessor<String> processor = new BatchTaskProcessor<>(
                8, 10, 60, 10
        );

        // 3. 自定義任務執行邏輯(短信發送)
        BatchTaskProcessor.TaskHandler<String> smsHandler = phone -> {
            // 模擬短信發送邏輯(如調用第三方短信API)
            System.out.printf("發送短信到:%s\n", phone);
            Thread.sleep(100); // 模擬網絡請求耗時
            return true; // 執行成功
        };

        // 4. 執行批量任務並獲取結果
        try {
            BatchTaskProcessor.BatchResult<String> result = processor.executeBatchTasks(phoneList, smsHandler);
            System.out.println(result);

            // 5. 輸出失敗任務詳情(可選)
            result.getTaskResults().stream()
                    .filter(taskResult -> !taskResult.isSuccess())
                    .forEach(taskResult -> System.out.printf("❌ 失敗任務:%s,原因:%s\n",
                            taskResult.getTask(), taskResult.getErrorMessage()));

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

(二)場景 2:分階段批量任務(讀取→處理→彙總)

批量導入用户數據任務,分三階段執行:讀取 Excel 數據→清洗數據→彙總統計:

 

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class PhasedBatchDemo {
    public static void main(String[] args) {
        // 1. 模擬100條原始用户數據(待導入)
        List<String> userDataList = Arrays.asList(
                "張三,20,13800138000", "李四,25,13900139000",
                // ... 省略98條數據
                "王五,30,13700137000"
        );

        // 2. 創建批量任務處理器(分片大小15)
        BatchTaskProcessor<String> processor = new BatchTaskProcessor<>(
                6, 8, 60, 15
        );

        // 3. 自定義分階段處理器
        // 第一階段:讀取數據(模擬從Excel讀取)
        BatchTaskProcessor.TaskHandler<String> readHandler = data -> {
            System.out.printf("讀取數據:%s\n", data);
            return true;
        };

        // 第二階段:處理數據(模擬數據清洗,去除無效數據)
        BatchTaskProcessor.TaskHandler<String> processHandler = data -> {
            String[] parts = data.split(",");
            if (parts.length != 3) {
                throw new RuntimeException("數據格式錯誤:" + data);
            }
            System.out.printf("清洗數據:%s→%s(格式校驗通過)\n", data, parts[0]);
            return true;
        };

        // 第三階段:彙總統計(統計成功導入的用户數)
        AtomicInteger totalImported = new AtomicInteger(0);
        BatchTaskProcessor.SummaryHandler<String> summaryHandler = new BatchTaskProcessor.SummaryHandler<String>() {
            @Override
            public void summary(List<String> shardTask) {
                // 統計當前分片的有效數據數
                int validCount = (int) shardTask.stream()
                        .filter(data -> data.split(",").length == 3)
                        .count();
                totalImported.addAndGet(validCount);
            }

            @Override
            public String getFinalSummary() {
                return String.format("\n📊 分階段任務彙總:成功導入用户數=%d,總處理數據數=%d",
                        totalImported.get(), userDataList.size());
            }
        };

        // 4. 執行分階段批量任務
        try {
            String finalResult = processor.executePhasedBatchTasks(
                    userDataList, readHandler, processHandler, summaryHandler
            );
            System.out.println(finalResult);
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

 

核心技術解析:線程池與併發工具的協同價值

(一)線程池:批量任務的 “高效執行引擎”

  • 自定義配置適配場景:通過corePoolSize、maxPoolSize等參數,根據任務類型(CPU 密集型 / IO 密集型)調整線程數,避免資源浪費;
  • 任務分片提升併發:將大量任務拆分為小分片,避免單個任務過大導致的線程阻塞,同時減少線程切換開銷;
  • 拒絕策略保障穩健:使用CallerRunsPolicy拒絕策略,當任務隊列滿時,由提交線程自行執行任務,避免任務丟失。

(二)CountDownLatch:單向等待的 “同步開關”

  • 用於 “主線程等待所有子任務完成”:初始化倒計時次數為分片數,每個分片任務完成後調用countDown(),主線程通過await()阻塞等待,確保彙總結果時所有任務已執行完畢;
  • finally塊中調用countDown():確保即使任務執行異常,倒計時也會正常減少,避免主線程無限阻塞。

(三)CyclicBarrier:多階段協同的 “階段閘門”

  • 用於 “分階段任務協同”:每個階段所有分片完成後,通過屏障動作(如打印階段進度)通知,再進入下一階段,確保階段執行的有序性;
  • 可重複使用:相比 CountDownLatch,CyclicBarrier 支持多輪同步,完美適配 “讀取→處理→彙總” 等多階段流程。

(四)線程安全組件:併發環境的數據保障

  • AtomicInteger:原子計數,避免多線程併發修改任務數導致的計數錯誤;
  • CopyOnWriteArrayList:線程安全的集合,適用於 “讀多寫少” 的場景(批量任務中結果讀取頻率低於寫入頻率),避免ArrayList在併發環境下的ConcurrentModificationException。

避坑指南:批量任務處理器的使用注意事項

線程池配置需適配任務類型:

  • CPU 密集型任務(如數據計算):核心線程數設為CPU核心數 + 1(避免 CPU 空閒);
  • IO 密集型任務(如網絡請求、文件讀寫):核心線程數設為2 * CPU核心數 + 1(充分利用 CPU 在 IO 等待時的空閒時間);
  • 任務隊列必須用有界隊列(如ArrayBlockingQueue),避免無界隊列導致的任務堆積和 OOM

併發工具的正確使用:

  • CountDownLatch:確保countDown()調用次數與初始化次數一致,建議在finally塊中執行;
  • CyclicBarrier:處理BrokenBarrierException(如線程中斷、超時導致的屏障損壞),必要時調用reset()重置屏障;
  • 避免在屏障動作中執行耗時操作:屏障動作由最後一個到達的線程執行,耗時操作會阻塞所有線程進入下一階段。

任務分片大小合理設置:

  • 分片過小:導致分片數過多,線程切換頻繁,效率下降;
  • 分片過大:單個分片任務執行時間過長,併發優勢不明顯;
  • 建議:根據任務平均執行時間調整,分片大小控制在 10-100 之間(IO 密集型可適當增大)。

異常處理不可忽視:

  • 單個任務執行異常必須捕獲:避免單個任務失敗導致整個分片任務中斷;
  • 記錄失敗任務詳情:方便後續覆盤和重試,提升批量任務的可維護性。

實戰總結:併發編程的高效實踐

這個批量任務處理器的核心是 “線程池 + 併發工具” 的組合拳 —— 線程池解決 “任務高效執行” 問題,併發工具解決 “任務同步協同” 問題,兩者結合讓批量任務處理既高效又穩健。

如果這篇文章對您有所幫助,或者有所啓發的話,求一鍵三連:點贊、轉發、在看。

關注公眾號:woniuxgg,在公眾號中回覆:筆記  就可以獲得蝸牛為你精心準備的java實戰語雀筆記,回覆面試、開發手冊、有超讚的粉絲福利