博客 / 詳情

返回

Java深入研究ThreadPoolExecutor線程池

在Java代碼中我們常常會開啓異步線程去執行一些網絡請求,或是開啓子線程去讀寫文件,這些線程的開啓與執行在併發量較小的場景下可以正常運行,如果涉及併發量比較大、線程數量有限、響應速度要快的業務場景下,此時就不允許單獨創建線程去執行任務,而是基於線程池管理、分發線程機制去執行線程任務,從而降低資源消耗、提高響應速度,統一管理線程資源

線程池的創建與分類

Exectors類是concurrent包下的用於快速創建線程的工具類,該類中定義了一系列創建不同線程類型的靜態方法,實際還是調用ThreadPoolExecutor類的有參函數,下面看下對應的方法源碼

--- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}
--- 調用有參函數
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        Executors.defaultThreadFactory(), defaultHandler);
}
  1. newFixedThreadPool : 固定數量的線程池,可用於限制特定線程啓用數量的場景,調用ThreadPoolExecutor構造函數中的參數定義如下

    • corePoolSize : 核心線程數量
    • maximumPoolSize : 最大線程數量
    • keepAliveTime : 當線程的數量大於核心線程時,空閒線程在終止之前等待新任務的最大時間
    • unit : 參數keepAliveTime的時間單位
    • workQueue : 存放等待執行任務的阻塞隊列,常用的組賽隊列如下

      1. ArrayBlockingQueue : 基於數組的有界阻塞隊列,遵循FIFO(先進先出)原則,構造函數提供設置隊列大小參數,採用ReentrantLock(基於AQS實現)獲取重入鎖,如果向已滿的隊列插入則當前線程阻塞
      2. LinkedBlockingQueue : 基於鏈表的無界阻塞隊列,默認大小為Integer.MAX_VALUE,向該隊列插入數據時會封裝到Node<>節點所對應的鏈表中,隊列內部使用了putLock和takeLock標識添加、刪除鎖,二者可併發執行
      3. SynchronousQueue : 單向鏈表同步隊列,具體需查看源碼(知識盲區,未研究到該隊列)
      4. PriorityBlockingQueue : 具有優先級排序的無界阻塞隊列,默認以自然排序方式或者通過傳入可比較的Comparator比較器進行排序
    • threadFactory : 默認線程創建工廠
    • defaultHandler : 拒絕策略,默認使用ThreadPoolExecutor.AbortPolicy,表示當隊列滿了並且工作線程大於線程池最大線程數量,此時直接拋出異常,

      1. CallerRunsPolicy : 用於被拒絕任務的處理程序,它直接在 execute 方法的調用線程中運行被拒絕的任務;如果執行程序已關閉,則會丟棄該任務
      2. DiscardOldestPolicy : 丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務
      3. DiscardPolicy : 默認丟棄被拒絕的任務

      一般場景下默認使用ThreadPoolExecutor.AbortPolicy拒絕策略

<br/>

  1. newSingleThreadExecutor : 單線程的線程池,只有一個核心線程在執行,可用於需要按照特定順序執行的場景

    public static ExecutorService newSingleThreadExecutor() {
         return new FinalizableDelegatedExecutorService
             (new ThreadPoolExecutor(1, 1,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>()));
    }

    通過入參可以看到只使用一個線程,採用LinkedBlockingQueue無界隊列,keepAliveTime是0s,説明線程創建了不會超時終止,該線程順序執行所有任務

  2. newCachedThreadPool : 核心線程為0,非核心線程數為int的最大值

    public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                     60L, TimeUnit.SECONDS,
                                     new SynchronousQueue<Runnable>());
    }

    keepAliveTime是60s,採用SynchronousQueue同步阻塞隊列,當有新的任務進來此時如果有空閒的線程則重複使用,否則就重新創建一個新的線程,線程空閒60s後會被回收,關於同步阻塞隊列可以看這篇文章SynchronousQueue

  3. newScheduledThreadPool : 核心線程為傳入的固定數值,非核心線程數為int的最大值,可用於延時或定時任務的執行

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
     return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
     super(corePoolSize, Integer.MAX_VALUE,
           DEFAULT_KEEPALIVE_MILLIS, 
           MILLISECONDS,
           new DelayedWorkQueue());
    }

    方法參數中的DelayedWorkQueue底層也是基於數組實現的最小堆的具有優先級的隊列,隊列中的任務按照執行時間升序排列,執行時間越靠近當前時間的任務排在最前面,此時也會最先執行該任務

<br/>

線程池內部執行機制

內部執行機制基本按照ThreadPoolExecutor構造函數傳入的參數來處理提交進來的任務

線程池ThreadPoolExecutor中的execute執行方法

先看下execute方法源碼

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

方法中的ctl是ThreadPoolExecutor中申明的提供原子操作的Integer對象,用於獲取當前線程池狀態

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  1. 判斷當前線程數量是否小於核心線程數量corePoolSize,如果滿足條件調用addWorker()方法創建一個新的核心線程
  2. 如果大於corePoolSize,接着判斷當前線程池是否是運行狀態並且通過workQueue.offer()寫入阻塞隊列
  3. 此時再次檢查線程池狀態是否正在運行,否則從隊列中移除任務並執行拒絕策略;如果是運行狀態,調用workerCountOf()判斷當前線程池線程數,數量為0就新創建一個新的線程
  4. 如果首次判斷線程池狀態非運行狀態,調用addWorker()創建心線程如果失敗,執行拒絕策略

<br/>
ThreadPoolExecutor中提供了shutdown()、shutdownNow()方法用於關閉線程池,調用shutdown時不再接受新的任務,之前提交的任務等執行結束再關閉線程池,調用shutdownNow時會嘗試停止線程池中的任務然後再關閉,並且返回未處理完的List<> tasks任務列表

ForkJoinPool初探

ThreadPoolExecutor阻塞隊列中的任務都是單個線程去執行,如果此時需要進行密集型計算任務(比如大數組排序、遍歷系統文件夾並計算文件數量),就可能出現線程池中一個線程繁忙而其他線程空閒,導致CPU負載不均衡系統資源浪費,ForkJoinPool就是用於將單個密集型計算任務拆分成多個小任務,通過fork讓線程池其它線程來執行小任務,通過join合併線程執行任務結果,採用並行執行機制,提高CPU的使用率

ForkJoinPool構造函數

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
        defaultForkJoinWorkerThreadFactory, null, false);
}

public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
    
public ForkJoinPool(int parallelism,
    ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {
        this(checkParallelism(parallelism),checkFactory(factory),handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

構造函數參數定義如下

  • parallelism : 並行線程數,默認為Runtime.getRuntime().availableProcessors(),最小為1
  • factory : 線程創建工廠,對象類型為ForkJoinWorkerThread
  • handler : 線程執行時異常處理
  • asyncMode : 為true表示處理任務的線程以隊列形式按照先進先出(FIFO)順序,此時不支持任務合併,false則是按照棧的形式後進先出(LIFO)順序,默認是false支持任務合併(join)

獲取ForkJoinPool對象可以直接使用commonPool()方法,

val pool = ForkJoinPool.commonPool()

public static ForkJoinPool commonPool() {
    // assert common != null : "static init error";
    return common;
}

而common對應的初始化放在靜態代碼塊中,且最終調用了ForkJoinPool的構造函數

static{
    //...
    common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
    //...
}

private static ForkJoinPool makeCommonPool() {
    //...
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
}

ForkJoinTask任務

ForkJoinTask抽象類實現了Future接口,同時提供了RecursiveAction和RecursiveTask兩個實現類,該實現類提供泛型類型申明,源碼如下

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    V result;                                //返回結果
    protected abstract V compute();         //執行任務

    public final V getRawResult() {         //獲取result
        return result;
    }

    protected final boolean exec() {        //ForkJoinTask中的抽象方法
        result = compute();
        return true;
    }
}

RecursiveAction相對應RecursiveTask返回值為void,下面看下ForkJoinTask的三個核心方法

  • fork : 在任務執行過程中將大任務拆分為多個小的子任務
  • join : 調用子任務的join()方法等待任務返回結果,如果子任務執行異常,join()會拋出異常,quietlyJoin()方法不會拋出異常,需要調用getException()或getRawResult()手動處理異常和結果
  • invoke : 在當前線程同步執行該任務,同join一樣,如果子任務執行異常,invoke()會拋出異常,quietlyInvoke()方法不會拋出異常,需要調用getException()或getRawResult()手動處理異常和結果

執行ForkJoinTask任務

使用ForkJoinPool時,可以通過以下三個方法執行ForkJoinTask任務,

public <T> T invoke(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public void execute(ForkJoinTask<?> task)
  • invoke : 執行有返回值的任務,同步阻塞方法直到任務執行完畢
  • submit : 執行沒有返回值的任務
  • execute : 執行帶有ForkJoinTask對象返回的任務,非阻塞方法,調用後ForkJoinPool會立即執行並返回當前執行的task對象

invoke()、submit()是對ExecutorService接口的方法實現,同時ForkJoinPool 也定義了用來執行ForkJoinTask的execute方法

work-stealing模式

關於work-stealing模式描述可參見下面這篇博文,核心就是每個工作線程都有自己的任務隊列,當某個線程完成任務後會去"拿"別的隊列裏的任務去執行,work-stealing模式

ForkJoinPool 與 ThreadPoolExecutor 比較

ThreadPoolExecutor 與 ForkJoinPool 都實現了ExecutorService,不同之處在於前者只能執行 Runnable 和 Callable 任務,執行順序是按照其在阻塞隊列中的順序來執行;後者除了能執行前者的任務類型外還擴展處ForkJoinTask類型任務,從而滿足work-stealing這種算法模式,ForkJoinPool涉及的技術點還有很多,需要繼續深入探索,例如ForkJoinPool中的線程狀態、fork()、compute()、join()的調用順序、子任務拆分粒度等細節內容...

user avatar codingdgsun 頭像 maenj_ba_lah 頭像 lion_jv 頭像
3 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.