線程池與併發工具
線程池概述
在第十四天的學習中,我們掌握了多線程編程的基礎知識,包括線程的創建、生命週期和線程安全等內容。今天我們將深入學習線程池技術,這是Java併發編程中提高性能和資源利用率的重要手段。
線程池概念:線程池是一種線程管理機制,它預先創建一定數量的線程,將任務提交到池中執行,而不是為每個任務單獨創建線程。當任務執行完成後,線程不會被銷燬,而是返回到線程池等待下一個任務。
線程池優勢:
- 降低資源消耗:通過重用已創建的線程,避免頻繁創建和銷燬線程帶來的性能開銷
- 提高響應速度:任務到達時無需等待線程創建即可立即執行
- 提高線程可管理性:可以控制最大併發線程數,防止線程過多導致系統資源耗盡
- 提供更多功能:如定時執行、定期執行、任務隊列管理等
線程池核心參數
Java中的ThreadPoolExecutor是線程池的核心實現類,其構造方法包含以下核心參數:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
核心參數詳解:
- corePoolSize:核心線程數,線程池長期維持的線程數量
- 即使線程處於空閒狀態,核心線程也不會被銷燬(除非設置了allowCoreThreadTimeOut)
- 當提交新任務時,如果當前運行的線程數小於核心線程數,會創建新線程執行任務
- maximumPoolSize:最大線程數,線程池允許創建的最大線程數量
- 當隊列滿且核心線程都在忙時,會創建新線程直到達到最大線程數
- 非核心線程空閒時間超過keepAliveTime會被銷燬
- keepAliveTime:非核心線程的空閒存活時間
- 超過這個時間,非核心線程會被回收
- 通過allowCoreThreadTimeOut(true)可以讓核心線程也超時回收
- unit:keepAliveTime參數的時間單位
- 可選值:TimeUnit.NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS等
- workQueue:任務隊列,用於保存等待執行的任務
- 常用實現:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue
- threadFactory:線程工廠,用於創建新線程
- 可以自定義線程名稱、優先級、是否為守護線程等
- handler:拒絕策略,當線程池和隊列都滿時的任務處理策略
- AbortPolicy:默認策略,直接拋出RejectedExecutionException
- CallerRunsPolicy:由提交任務的線程執行任務
- DiscardPolicy:直接丟棄任務,不拋出異常
- DiscardOldestPolicy:丟棄隊列中最舊的任務,然後嘗試提交新任務
線程池工作原理
線程池的工作流程可以概括為以下步驟:
- 當提交一個新任務時,線程池首先檢查核心線程是否都在執行任務
- 如果不是,則創建新的核心線程執行任務
- 如果是,則進入下一步
- 檢查任務隊列是否已滿
- 如果未滿,將任務加入隊列等待執行
- 如果已滿,進入下一步
- 檢查當前線程數是否達到最大線程數
- 如果沒有達到,創建非核心線程執行任務
- 如果已達到,執行拒絕策略
線程池狀態:
- RUNNING:正常運行狀態,接受新任務並處理隊列中的任務
- SHUTDOWN:關閉狀態,不接受新任務,但處理隊列中的任務
- STOP:停止狀態,不接受新任務,不處理隊列中的任務,中斷正在執行的任務
- TIDYING:整理狀態,所有任務已終止,準備執行terminated()方法
- TERMINATED:終止狀態,terminated()方法已執行完成
常見線程池類型
Java的Executors類提供了幾種常用的線程池實現:
- FixedThreadPool:固定大小的線程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
- 特點:核心線程數=最大線程數,隊列容量無界
- 適用場景:需要控制併發線程數量的場景
- CachedThreadPool:可緩存的線程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
- 特點:核心線程數=0,最大線程數=Integer.MAX_VALUE,隊列是同步隊列
- 適用場景:執行大量短期異步任務,任務提交頻繁但執行時間短
- SingleThreadExecutor:單線程的線程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
- 特點:核心線程數=1,最大線程數=1,保證任務按順序執行
- 適用場景:需要保證任務順序執行,且不需要併發執行的場景
- ScheduledThreadPool:定時任務線程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
- 特點:支持定時執行或週期性執行任務
- 適用場景:需要定時執行任務的場景,如定時備份、定時檢查等
注意:在實際項目開發中,阿里巴巴Java開發手冊建議直接使用ThreadPoolExecutor構造函數創建線程池,而不是使用Executors提供的工廠方法,以明確線程池參數,避免資源耗盡風險。
使用示例
基本使用示例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
// 創建固定大小的線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交5個任務
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is running, thread id: " +
Thread.currentThread().getId());
try {
Thread.sleep(1000); // 模擬任務執行
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 關閉線程池
executor.shutdown();
}
}
Callable與Future使用示例
當需要任務返回結果時,可以使用Callable接口,配合Future獲取任務執行結果:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
// 創建Callable任務
Callable<Integer> task = () -> {
System.out.println("Task is running...");
Thread.sleep(2000);
return 1 + 1; // 返回計算結果
};
// 提交任務並獲取Future對象
Future<Integer> future = executor.submit(task);
System.out.println("Task submitted, waiting for result...");
try {
// 獲取任務結果,會阻塞直到任務完成
Integer result = future.get();
System.out.println("Task result: " + result);
// 檢查任務是否完成
System.out.println("Is task done? " + future.isDone());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
定時任務示例
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
// 延遲3秒後執行一次
scheduledExecutor.schedule(() -> {
System.out.println("Task executed after 3 seconds");
}, 3, TimeUnit.SECONDS);
// 延遲1秒後開始,每隔2秒執行一次
scheduledExecutor.scheduleAtFixedRate(() -> {
System.out.println("Task executed periodically");
}, 1, 2, TimeUnit.SECONDS);
// 程序運行10秒後關閉線程池
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledExecutor.shutdown();
}
}
線程池最佳實踐
- 合理設置線程池參數
- 核心線程數:根據CPU核心數和任務類型確定
- CPU密集型任務:核心線程數 = CPU核心數 + 1
- IO密集型任務:核心線程數 = CPU核心數 * 2
- 最大線程數:一般設置為核心線程數的2-4倍
- 隊列容量:根據系統能承受的最大任務積壓數設置
- 拒絕策略:根據業務需求選擇合適的拒絕策略
- 手動創建ThreadPoolExecutor
// 推薦使用方式
ExecutorService executorService = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), // 有界隊列,避免資源耗盡
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-pool-" + threadNumber.getAndIncrement());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);
- 正確關閉線程池
- 使用shutdown():等待所有已提交任務執行完畢後關閉
- 使用shutdownNow():立即嘗試停止所有正在執行的任務,返回等待執行的任務列表
- 優雅關閉示例:
executorService.shutdown();
try {
// 等待60秒讓所有任務完成
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
// 如果超時,強制關閉
executorService.shutdownNow();
// 再次等待30秒
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
System.err.println("線程池未能正常關閉");
}
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
- 監控線程池狀態
- 通過ThreadPoolExecutor的以下方法監控線程池狀態:
- getPoolSize():當前線程池中的線程數
- getActiveCount():當前正在執行任務的線程數
- getCompletedTaskCount():已完成的任務總數
- getQueue().size():等待執行的任務數
- 避免任務堆積
- 使用有界隊列,避免任務無限堆積
- 設置合理的拒絕策略,在系統負載過高時能夠優雅降級
- 使用CompletableFuture簡化異步編程
Java 8引入的CompletableFuture結合線程池可以更優雅地處理異步任務:
CompletableFuture.supplyAsync(() -> {
// 異步執行任務
return "result";
}, executorService)
.thenApply(result -> {
// 處理結果
return result.toUpperCase();
})
.thenAccept(finalResult -> {
// 消費最終結果
System.out.println(finalResult);
})
.exceptionally(ex -> {
// 處理異常
ex.printStackTrace();
return null;
});
併發工具類
除了線程池,Java還提供了一些實用的併發工具類,用於簡化併發編程:
- CountDownLatch:倒計時門閂
- 允許一個或多個線程等待其他線程完成操作
CountDownLatch latch = new CountDownLatch(3);
// 其他線程執行完後調用countDown()
new Thread(() -> {
try {
// 執行任務
latch.countDown(); // 計數器減1
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 等待計數器歸零
latch.await();
- CyclicBarrier:循環柵欄
- 讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,所有被阻塞的線程才會繼續執行
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有線程已到達屏障,開始執行下一步");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
// 執行任務
System.out.println(Thread.currentThread().getName() + "到達屏障");
barrier.await(); // 等待其他線程
// 所有線程到達後執行後續操作
System.out.println(Thread.currentThread().getName() + "繼續執行");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
- Semaphore:信號量
- 控制同時訪問特定資源的線程數量
Semaphore semaphore = new Semaphore(5); // 允許5個線程同時訪問
try {
semaphore.acquire(); // 獲取許可
// 訪問共享資源
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 釋放許可
}
- ConcurrentHashMap:併發哈希表
- 線程安全的HashMap實現,支持高併發操作
- BlockingQueue:阻塞隊列
- 支持阻塞的插入和移除操作,常用於生產者-消費者模式
總結
今天我們學習了Java線程池和併發工具的核心內容,包括:
- 線程池的概念和優勢
- 線程池的核心參數和工作原理
- 常見的線程池類型及其特點
- 線程池的使用示例,包括基本使用、帶返回值的任務和定時任務
- 線程池的最佳實踐,包括參數設置、創建方式和關閉方法
- 常用的併發工具類
線程池是Java併發編程的重要內容,合理使用線程池可以顯著提高程序性能和資源利用率。在實際項目中,需要根據具體業務場景選擇合適的線程池類型和參數配置,並遵循最佳實踐,避免常見的線程池使用陷阱。
下一天我們將學習Java中的鎖機制和併發容器,進一步提升我們的併發編程能力。