線程池與併發工具

線程池概述

在第十四天的學習中,我們掌握了多線程編程的基礎知識,包括線程的創建、生命週期和線程安全等內容。今天我們將深入學習線程池技術,這是Java併發編程中提高性能和資源利用率的重要手段。

線程池概念:線程池是一種線程管理機制,它預先創建一定數量的線程,將任務提交到池中執行,而不是為每個任務單獨創建線程。當任務執行完成後,線程不會被銷燬,而是返回到線程池等待下一個任務。

線程池優勢

  • 降低資源消耗:通過重用已創建的線程,避免頻繁創建和銷燬線程帶來的性能開銷
  • 提高響應速度:任務到達時無需等待線程創建即可立即執行
  • 提高線程可管理性:可以控制最大併發線程數,防止線程過多導致系統資源耗盡
  • 提供更多功能:如定時執行、定期執行、任務隊列管理等

線程池核心參數

Java中的ThreadPoolExecutor是線程池的核心實現類,其構造方法包含以下核心參數:

複製

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

核心參數詳解

  1. corePoolSize:核心線程數,線程池長期維持的線程數量
  • 即使線程處於空閒狀態,核心線程也不會被銷燬(除非設置了allowCoreThreadTimeOut)
  • 當提交新任務時,如果當前運行的線程數小於核心線程數,會創建新線程執行任務
  1. maximumPoolSize:最大線程數,線程池允許創建的最大線程數量
  • 當隊列滿且核心線程都在忙時,會創建新線程直到達到最大線程數
  • 非核心線程空閒時間超過keepAliveTime會被銷燬
  1. keepAliveTime:非核心線程的空閒存活時間
  • 超過這個時間,非核心線程會被回收
  • 通過allowCoreThreadTimeOut(true)可以讓核心線程也超時回收
  1. unit:keepAliveTime參數的時間單位
  • 可選值:TimeUnit.NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS等
  1. workQueue:任務隊列,用於保存等待執行的任務
  • 常用實現:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue
  1. threadFactory:線程工廠,用於創建新線程
  • 可以自定義線程名稱、優先級、是否為守護線程等
  1. handler:拒絕策略,當線程池和隊列都滿時的任務處理策略
  • AbortPolicy:默認策略,直接拋出RejectedExecutionException
  • CallerRunsPolicy:由提交任務的線程執行任務
  • DiscardPolicy:直接丟棄任務,不拋出異常
  • DiscardOldestPolicy:丟棄隊列中最舊的任務,然後嘗試提交新任務

線程池工作原理

線程池的工作流程可以概括為以下步驟:

  1. 當提交一個新任務時,線程池首先檢查核心線程是否都在執行任務
  • 如果不是,則創建新的核心線程執行任務
  • 如果是,則進入下一步
  1. 檢查任務隊列是否已滿
  • 如果未滿,將任務加入隊列等待執行
  • 如果已滿,進入下一步
  1. 檢查當前線程數是否達到最大線程數
  • 如果沒有達到,創建非核心線程執行任務
  • 如果已達到,執行拒絕策略

線程池狀態

  • RUNNING:正常運行狀態,接受新任務並處理隊列中的任務
  • SHUTDOWN:關閉狀態,不接受新任務,但處理隊列中的任務
  • STOP:停止狀態,不接受新任務,不處理隊列中的任務,中斷正在執行的任務
  • TIDYING:整理狀態,所有任務已終止,準備執行terminated()方法
  • TERMINATED:終止狀態,terminated()方法已執行完成

常見線程池類型

Java的Executors類提供了幾種常用的線程池實現:

  1. FixedThreadPool:固定大小的線程池

複製

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);

  • 特點:核心線程數=最大線程數,隊列容量無界
  • 適用場景:需要控制併發線程數量的場景
  1. CachedThreadPool:可緩存的線程池

複製

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

  • 特點:核心線程數=0,最大線程數=Integer.MAX_VALUE,隊列是同步隊列
  • 適用場景:執行大量短期異步任務,任務提交頻繁但執行時間短
  1. SingleThreadExecutor:單線程的線程池

複製

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

  • 特點:核心線程數=1,最大線程數=1,保證任務按順序執行
  • 適用場景:需要保證任務順序執行,且不需要併發執行的場景
  1. ScheduledThreadPool:定時任務線程池

複製

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);

  • 特點:支持定時執行或週期性執行任務
  • 適用場景:需要定時執行任務的場景,如定時備份、定時檢查等

注意:在實際項目開發中,阿里巴巴Java開發手冊建議直接使用ThreadPoolExecutor構造函數創建線程池,而不是使用Executors提供的工廠方法,以明確線程池參數,避免資源耗盡風險。

使用示例

基本使用示例

複製

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 創建固定大小的線程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 提交5個任務
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " is running, thread id: " +
                                   Thread.currentThread().getId());
                try {
                    Thread.sleep(1000); // 模擬任務執行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 關閉線程池
        executor.shutdown();
    }
}

Callable與Future使用示例

當需要任務返回結果時,可以使用Callable接口,配合Future獲取任務執行結果:

複製

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 創建Callable任務
        Callable<Integer> task = () -> {
            System.out.println("Task is running...");
            Thread.sleep(2000);
            return 1 + 1; // 返回計算結果
        };

        // 提交任務並獲取Future對象
        Future<Integer> future = executor.submit(task);

        System.out.println("Task submitted, waiting for result...");

        try {
            // 獲取任務結果,會阻塞直到任務完成
            Integer result = future.get();
            System.out.println("Task result: " + result);

            // 檢查任務是否完成
            System.out.println("Is task done? " + future.isDone());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

定時任務示例

複製

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);

        // 延遲3秒後執行一次
        scheduledExecutor.schedule(() -> {
            System.out.println("Task executed after 3 seconds");
        }, 3, TimeUnit.SECONDS);

        // 延遲1秒後開始,每隔2秒執行一次
        scheduledExecutor.scheduleAtFixedRate(() -> {
            System.out.println("Task executed periodically");
        }, 1, 2, TimeUnit.SECONDS);

        // 程序運行10秒後關閉線程池
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        scheduledExecutor.shutdown();
    }
}

線程池最佳實踐

  1. 合理設置線程池參數
  • 核心線程數:根據CPU核心數和任務類型確定
  • CPU密集型任務:核心線程數 = CPU核心數 + 1
  • IO密集型任務:核心線程數 = CPU核心數 * 2
  • 最大線程數:一般設置為核心線程數的2-4倍
  • 隊列容量:根據系統能承受的最大任務積壓數設置
  • 拒絕策略:根據業務需求選擇合適的拒絕策略
  1. 手動創建ThreadPoolExecutor

複製

// 推薦使用方式
   ExecutorService executorService = new ThreadPoolExecutor(
       5, // corePoolSize
       10, // maximumPoolSize
       60, // keepAliveTime
       TimeUnit.SECONDS,
       new ArrayBlockingQueue<>(100), // 有界隊列,避免資源耗盡
       new ThreadFactory() {
           private final AtomicInteger threadNumber = new AtomicInteger(1);

           @Override
           public Thread newThread(Runnable r) {
               Thread t = new Thread(r, "my-thread-pool-" + threadNumber.getAndIncrement());
               t.setDaemon(false);
               return t;
           }
       },
       new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
   );

  1. 正確關閉線程池
  • 使用shutdown():等待所有已提交任務執行完畢後關閉
  • 使用shutdownNow():立即嘗試停止所有正在執行的任務,返回等待執行的任務列表
  • 優雅關閉示例:

複製

executorService.shutdown();
   try {
       // 等待60秒讓所有任務完成
       if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
           // 如果超時,強制關閉
           executorService.shutdownNow();
           // 再次等待30秒
           if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
               System.err.println("線程池未能正常關閉");
           }
       }
   } catch (InterruptedException e) {
       executorService.shutdownNow();
   }

  1. 監控線程池狀態
  • 通過ThreadPoolExecutor的以下方法監控線程池狀態:
  • getPoolSize():當前線程池中的線程數
  • getActiveCount():當前正在執行任務的線程數
  • getCompletedTaskCount():已完成的任務總數
  • getQueue().size():等待執行的任務數
  1. 避免任務堆積
  • 使用有界隊列,避免任務無限堆積
  • 設置合理的拒絕策略,在系統負載過高時能夠優雅降級
  1. 使用CompletableFuture簡化異步編程
    Java 8引入的CompletableFuture結合線程池可以更優雅地處理異步任務:

複製

CompletableFuture.supplyAsync(() -> {
       // 異步執行任務
       return "result";
   }, executorService)
   .thenApply(result -> {
       // 處理結果
       return result.toUpperCase();
   })
   .thenAccept(finalResult -> {
       // 消費最終結果
       System.out.println(finalResult);
   })
   .exceptionally(ex -> {
       // 處理異常
       ex.printStackTrace();
       return null;
   });

併發工具類

除了線程池,Java還提供了一些實用的併發工具類,用於簡化併發編程:

  1. CountDownLatch:倒計時門閂
  • 允許一個或多個線程等待其他線程完成操作

複製

CountDownLatch latch = new CountDownLatch(3);

   // 其他線程執行完後調用countDown()
   new Thread(() -> {
       try {
           // 執行任務
           latch.countDown(); // 計數器減1
       } catch (Exception e) {
           e.printStackTrace();
       }
   }).start();

   // 等待計數器歸零
   latch.await();

  1. CyclicBarrier:循環柵欄
  • 讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,所有被阻塞的線程才會繼續執行

複製

CyclicBarrier barrier = new CyclicBarrier(3, () -> {
       System.out.println("所有線程已到達屏障,開始執行下一步");
   });

   for (int i = 0; i < 3; i++) {
       new Thread(() -> {
           try {
               // 執行任務
               System.out.println(Thread.currentThread().getName() + "到達屏障");
               barrier.await(); // 等待其他線程
               // 所有線程到達後執行後續操作
               System.out.println(Thread.currentThread().getName() + "繼續執行");
           } catch (Exception e) {
               e.printStackTrace();
           }
       }).start();
   }

  1. Semaphore:信號量
  • 控制同時訪問特定資源的線程數量

複製

Semaphore semaphore = new Semaphore(5); // 允許5個線程同時訪問

   try {
       semaphore.acquire(); // 獲取許可
       // 訪問共享資源
   } catch (InterruptedException e) {
       e.printStackTrace();
   } finally {
       semaphore.release(); // 釋放許可
   }

  1. ConcurrentHashMap:併發哈希表
  • 線程安全的HashMap實現,支持高併發操作
  1. BlockingQueue:阻塞隊列
  • 支持阻塞的插入和移除操作,常用於生產者-消費者模式

總結

今天我們學習了Java線程池和併發工具的核心內容,包括:

  • 線程池的概念和優勢
  • 線程池的核心參數和工作原理
  • 常見的線程池類型及其特點
  • 線程池的使用示例,包括基本使用、帶返回值的任務和定時任務
  • 線程池的最佳實踐,包括參數設置、創建方式和關閉方法
  • 常用的併發工具類

線程池是Java併發編程的重要內容,合理使用線程池可以顯著提高程序性能和資源利用率。在實際項目中,需要根據具體業務場景選擇合適的線程池類型和參數配置,並遵循最佳實踐,避免常見的線程池使用陷阱。

下一天我們將學習Java中的鎖機制和併發容器,進一步提升我們的併發編程能力。