文章目錄
- 1. 併發工具類概述
- 1.1 主要併發工具類分類
- 2. 鎖機制工具類
- 2.1 ReentrantLock 可重入鎖
- 2.2 ReentrantReadWriteLock 讀寫鎖
- 3. 同步器工具類
- 3.1 CountDownLatch 倒計時門閂
- 3.2 CyclicBarrier 循環屏障
- 3.3 Semaphore 信號量
- 4. 併發集合類
- 4.1 ConcurrentHashMap 併發哈希表
- 5. 原子變量類
- 5.1 AtomicInteger 和 LongAdder
- 5.2 AtomicReference 原子引用
- 6. 線程池工具類
- 6.1 ThreadPoolExecutor 自定義線程池
- 6.2 Executors 工具類創建線程池
- 7. 併發工具類原理分析
- 7.1 AQS (AbstractQueuedSynchronizer) 原理
- 7.2 併發工具類流程圖
- CountDownLatch 工作流程
- ReentrantLock 加鎖流程
- 8. 最佳實踐和注意事項
- 8.1 避免死鎖
- 8.2 性能考慮
- 9. 總結
在現代多核CPU架構下,併發編程已成為開發高性能應用的核心技能。Java提供了豐富的併發工具類,讓我們能夠編寫出線程安全且高效的多線程程序。
1. 併發工具類概述
Java併發工具類主要位於java.util.concurrent包中,這些工具類基於AQS(AbstractQueuedSynchronizer)框架構建,提供了比傳統的synchronized關鍵字更強大、更靈活的線程同步機制。
1.1 主要併發工具類分類
|
類別
|
主要工具類
|
用途
|
|
鎖機制
|
ReentrantLock, ReentrantReadWriteLock
|
提供更靈活的鎖操作
|
|
同步器
|
CountDownLatch, CyclicBarrier, Semaphore
|
控制線程間的協調
|
|
線程池
|
ThreadPoolExecutor, Executors
|
管理線程生命週期
|
|
併發集合
|
ConcurrentHashMap, CopyOnWriteArrayList
|
線程安全的集合類
|
|
原子變量
|
AtomicInteger, AtomicReference
|
無鎖的線程安全編程
|
2. 鎖機制工具類
2.1 ReentrantLock 可重入鎖
ReentrantLock提供了與synchronized相似的功能,但更加靈活。
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 獲取鎖
try {
count++;
System.out.println(Thread.currentThread().getName() + " count: " + count);
} finally {
lock.unlock(); // 確保鎖被釋放
}
}
public static void main(String[] args) {
ReentrantLockDemo demo = new ReentrantLockDemo();
// 創建多個線程同時執行increment方法
for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (int j = 0; j < 3; j++) {
demo.increment();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Thread-" + i).start();
}
}
}
高級特性:嘗試獲取鎖和可中斷鎖
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class AdvancedReentrantLockDemo {
private final ReentrantLock lock = new ReentrantLock();
public boolean tryPerformTask() {
// 嘗試在1秒內獲取鎖
boolean acquired = false;
try {
acquired = lock.tryLock(1, TimeUnit.SECONDS);
if (acquired) {
// 執行關鍵代碼
System.out.println(Thread.currentThread().getName() + " 成功獲取鎖並執行任務");
Thread.sleep(500); // 模擬任務執行
return true;
} else {
System.out.println(Thread.currentThread().getName() + " 獲取鎖超時");
return false;
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 被中斷");
return false;
} finally {
if (acquired) {
lock.unlock();
}
}
}
public void interruptibleTask() throws InterruptedException {
lock.lockInterruptibly(); // 可中斷的獲取鎖
try {
// 執行長時間任務
while (!Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() + " 正在執行任務...");
Thread.sleep(1000);
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
2.2 ReentrantReadWriteLock 讀寫鎖
讀寫鎖允許多個讀操作同時進行,但寫操作是獨佔的。
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private String data = "初始數據";
public String readData() {
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 讀取數據: " + data);
Thread.sleep(100); // 模擬讀取耗時
return data;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
rwLock.readLock().unlock();
}
}
public void writeData(String newData) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 寫入數據: " + newData);
Thread.sleep(200); // 模擬寫入耗時
this.data = newData;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
rwLock.writeLock().unlock();
}
}
public static void main(String[] args) {
ReadWriteLockDemo demo = new ReadWriteLockDemo();
// 創建多個讀線程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
for (int j = 0; j < 3; j++) {
demo.readData();
}
}, "Reader-" + i).start();
}
// 創建寫線程
for (int i = 0; i < 2; i++) {
final int index = i;
new Thread(() -> {
demo.writeData("更新數據-" + index);
}, "Writer-" + i).start();
}
}
}
3. 同步器工具類
3.1 CountDownLatch 倒計時門閂
CountDownLatch允許一個或多個線程等待其他線程完成操作。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int workerCount = 5;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i;
new Thread(() -> {
try {
System.out.println("工人 " + workerId + " 等待開工信號...");
startSignal.await(); // 等待開工信號
// 模擬工作
int workTime = ThreadLocalRandom.current().nextInt(1000, 3000);
System.out.println("工人 " + workerId + " 開始工作,預計需要 " + workTime + " 毫秒");
Thread.sleep(workTime);
System.out.println("工人 " + workerId + " 完成工作");
doneSignal.countDown(); // 完成工作,計數減1
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
// 主線程準備開工信號
Thread.sleep(2000);
System.out.println("=== 經理髮出開工信號 ===");
startSignal.countDown(); // 發出開工信號
// 等待所有工人完成工作
doneSignal.await();
System.out.println("=== 所有工人都已完成工作,項目結束 ===");
}
}
複雜應用場景:多階段任務處理
public class MultiStageTaskDemo {
public static void main(String[] args) throws InterruptedException {
int taskCount = 3;
CountDownLatch phase1Latch = new CountDownLatch(taskCount);
CountDownLatch phase2Latch = new CountDownLatch(taskCount);
CountDownLatch finalLatch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
// 第一階段
System.out.println("任務 " + taskId + " 第一階段開始");
Thread.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
System.out.println("任務 " + taskId + " 第一階段完成");
phase1Latch.countDown();
// 等待其他任務完成第一階段
phase1Latch.await();
// 第二階段
System.out.println("任務 " + taskId + " 第二階段開始");
Thread.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
System.out.println("任務 " + taskId + " 第二階段完成");
phase2Latch.countDown();
// 等待其他任務完成第二階段
phase2Latch.await();
// 最終階段
System.out.println("任務 " + taskId + " 最終階段開始");
Thread.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
System.out.println("任務 " + taskId + " 最終階段完成");
finalLatch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
finalLatch.await();
System.out.println("所有任務全部完成!");
}
}
3.2 CyclicBarrier 循環屏障
CyclicBarrier讓一組線程互相等待,直到所有線程都到達某個屏障點。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int runnerCount = 4;
CyclicBarrier barrier = new CyclicBarrier(runnerCount,
() -> System.out.println("所有選手都到達屏障,繼續下一輪比賽"));
for (int i = 0; i < runnerCount; i++) {
final int runnerId = i;
new Thread(() -> {
try {
for (int round = 1; round <= 3; round++) {
// 模擬跑步時間
int runTime = ThreadLocalRandom.current().nextInt(1000, 3000);
Thread.sleep(runTime);
System.out.println("選手 " + runnerId + " 完成第 " + round + " 輪比賽,用時 " + runTime + "ms");
// 等待其他選手
barrier.await();
}
System.out.println("選手 " + runnerId + " 完成所有比賽!");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
複雜場景:多線程數據計算
public class MatrixCalculationDemo {
static class CalculationTask implements Runnable {
private final int[][] matrix;
private final int startRow;
private final int endRow;
private final CyclicBarrier barrier;
private int partialSum;
public CalculationTask(int[][] matrix, int startRow, int endRow, CyclicBarrier barrier) {
this.matrix = matrix;
this.startRow = startRow;
this.endRow = endRow;
this.barrier = barrier;
}
@Override
public void run() {
try {
// 第一階段:計算部分和
partialSum = 0;
for (int i = startRow; i < endRow; i++) {
for (int j = 0; j < matrix[i].length; j++) {
partialSum += matrix[i][j];
}
}
System.out.println(Thread.currentThread().getName() + " 計算部分和: " + partialSum);
barrier.await();
// 第二階段:其他處理(這裏簡單模擬)
Thread.sleep(100);
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
public int getPartialSum() {
return partialSum;
}
}
public static void main(String[] args) throws InterruptedException {
int size = 100;
int[][] matrix = new int[size][size];
// 初始化矩陣
for (int i = 0; i < size; i++) {
for (int j = 0; j < size; j++) {
matrix[i][j] = ThreadLocalRandom.current().nextInt(1, 10);
}
}
int threadCount = 4;
int rowsPerThread = size / threadCount;
CyclicBarrier barrier = new CyclicBarrier(threadCount,
() -> System.out.println("所有線程完成當前階段"));
CalculationTask[] tasks = new CalculationTask[threadCount];
Thread[] threads = new Thread[threadCount];
// 創建並啓動線程
for (int i = 0; i < threadCount; i++) {
int startRow = i * rowsPerThread;
int endRow = (i == threadCount - 1) ? size : startRow + rowsPerThread;
tasks[i] = new CalculationTask(matrix, startRow, endRow, barrier);
threads[i] = new Thread(tasks[i], "Calculator-" + i);
threads[i].start();
}
// 等待所有線程完成
for (Thread thread : threads) {
thread.join();
}
// 彙總結果
int totalSum = 0;
for (CalculationTask task : tasks) {
totalSum += task.getPartialSum();
}
System.out.println("矩陣總和: " + totalSum);
}
}
3.3 Semaphore 信號量
Semaphore用於控制同時訪問特定資源的線程數量。
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
public class SemaphoreDemo {
static class ResourcePool {
private final Semaphore semaphore;
private final boolean[] resources;
public ResourcePool(int poolSize) {
this.semaphore = new Semaphore(poolSize, true); // 公平信號量
this.resources = new boolean[poolSize];
for (int i = 0; i < poolSize; i++) {
resources[i] = true;
}
}
public int acquireResource() throws InterruptedException {
semaphore.acquire();
return getNextAvailableResource();
}
public void releaseResource(int resourceId) {
if (markResourceAsAvailable(resourceId)) {
semaphore.release();
}
}
private synchronized int getNextAvailableResource() {
for (int i = 0; i < resources.length; i++) {
if (resources[i]) {
resources[i] = false;
return i;
}
}
return -1; // 不會執行到這裏
}
private synchronized boolean markResourceAsAvailable(int resourceId) {
if (!resources[resourceId]) {
resources[resourceId] = true;
return true;
}
return false;
}
}
public static void main(String[] args) {
ResourcePool pool = new ResourcePool(3); // 3個資源
// 創建10個線程競爭資源
for (int i = 0; i < 10; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("線程 " + threadId + " 等待獲取資源...");
int resource = pool.acquireResource();
System.out.println("線程 " + threadId + " 獲取資源 " + resource);
// 模擬使用資源
Thread.sleep(ThreadLocalRandom.current().nextInt(2000, 5000));
// 釋放資源
pool.releaseResource(resource);
System.out.println("線程 " + threadId + " 釋放資源 " + resource);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
高級用法:數據庫連接池模擬
public class ConnectionPoolDemo {
static class DatabaseConnection {
private final int id;
private boolean inUse = false;
public DatabaseConnection(int id) {
this.id = id;
}
public void executeQuery(String query) throws InterruptedException {
System.out.println("連接 " + id + " 執行查詢: " + query);
Thread.sleep(ThreadLocalRandom.current().nextInt(500, 2000));
}
public void close() {
System.out.println("關閉連接 " + id);
}
}
static class ConnectionPool {
private final Semaphore semaphore;
private final DatabaseConnection[] connections;
private final boolean[] used;
public ConnectionPool(int poolSize) {
this.semaphore = new Semaphore(poolSize, true);
this.connections = new DatabaseConnection[poolSize];
this.used = new boolean[poolSize];
for (int i = 0; i < poolSize; i++) {
connections[i] = new DatabaseConnection(i);
used[i] = false;
}
}
public DatabaseConnection getConnection() throws InterruptedException {
semaphore.acquire();
return getAvailableConnection();
}
public void releaseConnection(DatabaseConnection connection) {
if (markConnectionAsAvailable(connection)) {
semaphore.release();
}
}
private synchronized DatabaseConnection getAvailableConnection() {
for (int i = 0; i < used.length; i++) {
if (!used[i]) {
used[i] = true;
System.out.println("獲取連接 " + i);
return connections[i];
}
}
return null; // 不會執行到這裏
}
private synchronized boolean markConnectionAsAvailable(DatabaseConnection connection) {
for (int i = 0; i < connections.length; i++) {
if (connections[i] == connection) {
if (used[i]) {
used[i] = false;
System.out.println("釋放連接 " + i);
return true;
}
return false;
}
}
return false;
}
}
public static void main(String[] args) {
ConnectionPool pool = new ConnectionPool(5);
// 模擬多個客户端請求
for (int i = 0; i < 15; i++) {
final int clientId = i;
new Thread(() -> {
try {
DatabaseConnection conn = pool.getConnection();
conn.executeQuery("SELECT * FROM table WHERE client=" + clientId);
pool.releaseConnection(conn);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
4. 併發集合類
4.1 ConcurrentHashMap 併發哈希表
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
public class ConcurrentHashMapDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>();
// 添加初始數據
scores.put("Alice", 0);
scores.put("Bob", 0);
scores.put("Charlie", 0);
// 創建多個線程更新分數
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10; j++) {
// 隨機選擇一個玩家增加分數
String[] players = {"Alice", "Bob", "Charlie"};
String player = players[ThreadLocalRandom.current().nextInt(players.length)];
int points = ThreadLocalRandom.current().nextInt(1, 11);
// 使用compute方法原子性更新
scores.compute(player, (key, value) -> value + points);
System.out.println(Thread.currentThread().getName() +
" 給 " + player + " 增加 " + points + " 分");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
threads[i].start();
}
// 等待所有線程完成
for (Thread thread : threads) {
thread.join();
}
// 輸出最終結果
System.out.println("\n最終分數:");
scores.forEach((player, score) ->
System.out.println(player + ": " + score + " 分"));
}
}
高級特性:併發統計
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
public class ConcurrentStatisticsDemo {
public static void main(String[] args) throws InterruptedException {
// 使用LongAdder進行高性能統計
ConcurrentHashMap<String, LongAdder> wordCount = new ConcurrentHashMap<>();
String[] documents = {
"java concurrent programming is important",
"java multithreading helps performance",
"concurrent hashmap is thread safe"
};
Thread[] processors = new Thread[documents.length];
for (int i = 0; i < documents.length; i++) {
final String document = documents[i];
processors[i] = new Thread(() -> {
String[] words = document.split(" ");
for (String word : words) {
// 使用computeIfAbsent確保線程安全
wordCount.computeIfAbsent(word, k -> new LongAdder()).increment();
}
});
processors[i].start();
}
// 等待所有處理完成
for (Thread processor : processors) {
processor.join();
}
// 輸出詞頻統計結果
System.out.println("詞頻統計結果:");
wordCount.forEach((word, count) ->
System.out.println(word + ": " + count));
}
}
5. 原子變量類
5.1 AtomicInteger 和 LongAdder
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.ThreadLocalRandom;
public class AtomicDemo {
static class AtomicCounter {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public void decrement() {
count.decrementAndGet();
}
public int get() {
return count.get();
}
// 複雜原子操作
public boolean compareAndSet(int expect, int update) {
return count.compareAndSet(expect, update);
}
}
static class LongAdderCounter {
private final LongAdder count = new LongAdder();
public void increment() {
count.increment();
}
public long get() {
return count.sum();
}
}
public static void main(String[] args) throws InterruptedException {
// 測試AtomicInteger
AtomicCounter atomicCounter = new AtomicCounter();
Thread[] atomicThreads = new Thread[10];
for (int i = 0; i < atomicThreads.length; i++) {
atomicThreads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
atomicCounter.increment();
// 模擬一些處理
if (j % 100 == 0) {
atomicCounter.compareAndSet(atomicCounter.get(), atomicCounter.get());
}
}
});
atomicThreads[i].start();
}
for (Thread thread : atomicThreads) {
thread.join();
}
System.out.println("AtomicInteger 最終計數: " + atomicCounter.get());
// 測試LongAdder(高併發場景性能更好)
LongAdderCounter adderCounter = new LongAdderCounter();
Thread[] adderThreads = new Thread[10];
for (int i = 0; i < adderThreads.length; i++) {
adderThreads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
adderCounter.increment();
}
});
adderThreads[i].start();
}
for (Thread thread : adderThreads) {
thread.join();
}
System.out.println("LongAdder 最終計數: " + adderCounter.get());
}
}
5.2 AtomicReference 原子引用
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceDemo {
static class User {
private final String name;
private final int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "User{name='" + name + "', age=" + age + "}";
}
}
public static void main(String[] args) {
AtomicReference<User> atomicUser = new AtomicReference<>(
new User("Alice", 25)
);
// 創建多個線程嘗試更新用户信息
for (int i = 0; i < 5; i++) {
final int threadId = i;
new Thread(() -> {
User currentUser, newUser;
do {
currentUser = atomicUser.get();
newUser = new User(
"User-" + threadId,
currentUser.age + 1
);
System.out.println(Thread.currentThread().getName() +
" 嘗試更新: " + currentUser + " -> " + newUser);
} while (!atomicUser.compareAndSet(currentUser, newUser));
System.out.println(Thread.currentThread().getName() + " 更新成功");
}).start();
}
// 等待一會兒讓所有線程完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最終用户信息: " + atomicUser.get());
}
}
6. 線程池工具類
6.1 ThreadPoolExecutor 自定義線程池
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolDemo {
static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
CustomThreadFactory(String poolName) {
namePrefix = poolName + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("任務被拒絕: " + r.toString());
// 可以在這裏實現自定義的拒絕策略,比如記錄日誌、持久化任務等
}
}
public static void main(String[] args) throws InterruptedException {
// 創建自定義線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心線程數
4, // 最大線程數
60, // 空閒線程存活時間
TimeUnit.SECONDS, // 時間單位
new LinkedBlockingQueue<>(10), // 工作隊列
new CustomThreadFactory("CustomPool"), // 線程工廠
new CustomRejectedExecutionHandler() // 拒絕策略
);
// 提交任務
AtomicInteger completedTasks = new AtomicInteger(0);
for (int i = 0; i < 20; i++) {
final int taskId = i;
try {
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() +
" 執行任務 " + taskId);
Thread.sleep(ThreadLocalRandom.current().nextInt(500, 2000));
completedTasks.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} catch (RejectedExecutionException e) {
System.out.println("任務 " + taskId + " 被拒絕");
}
}
// 關閉線程池
executor.shutdown();
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
if (terminated) {
System.out.println("所有任務完成,共完成: " + completedTasks.get() + " 個任務");
} else {
System.out.println("等待超時,已完成: " + completedTasks.get() + " 個任務");
}
}
}
6.2 Executors 工具類創建線程池
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ExecutorsDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 1. 固定大小線程池
ExecutorService fixedPool = Executors.newFixedThreadPool(3);
// 2. 單線程線程池
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
// 3. 緩存線程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 4. 定時任務線程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
System.out.println("=== 固定大小線程池演示 ===");
AtomicInteger counter = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
fixedPool.execute(() -> {
try {
int value = counter.incrementAndGet();
System.out.println(Thread.currentThread().getName() +
" 執行任務,當前計數: " + value);
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}
latch.await();
System.out.println("所有任務完成,最終計數: " + counter.get());
System.out.println("\n=== 定時任務演示 ===");
CountDownLatch scheduledLatch = new CountDownLatch(3);
// 延遲執行
scheduledPool.schedule(() -> {
System.out.println("延遲5秒執行的任務");
scheduledLatch.countDown();
}, 5, TimeUnit.SECONDS);
// 固定速率執行
ScheduledFuture<?> future = scheduledPool.scheduleAtFixedRate(() -> {
System.out.println("固定速率任務執行時間: " + System.currentTimeMillis());
scheduledLatch.countDown();
}, 1, 2, TimeUnit.SECONDS);
// 等待任務完成
scheduledLatch.await();
future.cancel(false); // 取消定時任務
System.out.println("\n=== Callable和Future演示 ===");
ExecutorService callablePool = Executors.newFixedThreadPool(3);
// 提交Callable任務
Future<Integer> future1 = callablePool.submit(() -> {
Thread.sleep(1000);
return 42;
});
Future<String> future2 = callablePool.submit(() -> {
Thread.sleep(1500);
return "Hello, World!";
});
Future<Double> future3 = callablePool.submit(() -> {
Thread.sleep(800);
return 3.14159;
});
// 獲取結果
System.out.println("結果1: " + future1.get());
System.out.println("結果2: " + future2.get());
System.out.println("結果3: " + future3.get());
// 關閉所有線程池
fixedPool.shutdown();
singleThreadPool.shutdown();
cachedPool.shutdown();
scheduledPool.shutdown();
callablePool.shutdown();
}
}
7. 併發工具類原理分析
7.1 AQS (AbstractQueuedSynchronizer) 原理
AQS是Java併發包的核心基礎組件,它提供了一個框架,用於實現依賴於先進先出(FIFO)等待隊列的阻塞鎖和相關同步器。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
// 自定義的簡單互斥鎖,基於AQS實現
public class SimpleMutex {
private final Sync sync = new Sync();
// 內部同步器類
private static class Sync extends AbstractQueuedSynchronizer {
// 嘗試獲取鎖
@Override
protected boolean tryAcquire(int acquires) {
// 使用CAS操作將狀態從0改為1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 嘗試釋放鎖
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 判斷是否被獨佔
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
7.2 併發工具類流程圖
CountDownLatch 工作流程
是
否
是
否
主線程創建CountDownLatch
設置初始計數N
主線程調用await
計數是否為0?
主線程繼續執行
主線程阻塞等待
工作線程完成任務
調用countDown
計數減1
計數是否為0?
喚醒所有等待線程
繼續等待其他線程
ReentrantLock 加鎖流程
是
否
是
否
線程嘗試獲取鎖
鎖是否空閒?
CAS設置鎖狀態
獲取鎖成功
當前線程是否持有鎖?
重入計數+1
加入等待隊列
線程進入等待狀態
被喚醒後嘗試獲取鎖
8. 最佳實踐和注意事項
8.1 避免死鎖
public class DeadlockPrevention {
// 按固定順序獲取鎖來避免死鎖
public void transferMoney(Account from, Account to, int amount) {
Object firstLock = from.getId() < to.getId() ? from : to;
Object secondLock = from.getId() < to.getId() ? to : from;
synchronized (firstLock) {
synchronized (secondLock) {
if (from.getBalance() >= amount) {
from.debit(amount);
to.credit(amount);
}
}
}
}
// 使用tryLock避免死鎖
public boolean tryTransferMoney(Account from, Account to, int amount,
long timeout, TimeUnit unit) throws InterruptedException {
long stopTime = System.nanoTime() + unit.toNanos(timeout);
while (true) {
if (from.getLock().tryLock()) {
try {
if (to.getLock().tryLock()) {
try {
if (from.getBalance() >= amount) {
from.debit(amount);
to.credit(amount);
return true;
} else {
return false;
}
} finally {
to.getLock().unlock();
}
}
} finally {
from.getLock().unlock();
}
}
if (System.nanoTime() > stopTime) {
return false;
}
// 隨機休眠避免活鎖
Thread.sleep(ThreadLocalRandom.current().nextInt(100));
}
}
static class Account {
private final String id;
private int balance;
private final ReentrantLock lock = new ReentrantLock();
public Account(String id, int balance) {
this.id = id;
this.balance = balance;
}
public String getId() { return id; }
public int getBalance() { return balance; }
public ReentrantLock getLock() { return lock; }
public void debit(int amount) {
balance -= amount;
}
public void credit(int amount) {
balance += amount;
}
}
}
8.2 性能考慮
public class PerformanceConsiderations {
// 使用LongAdder替代AtomicLong在高併發寫場景
public static void testCounterPerformance() throws InterruptedException {
int threadCount = 10;
int iterations = 100000;
// 測試AtomicLong
AtomicLong atomicLong = new AtomicLong();
CountDownLatch latch1 = new CountDownLatch(threadCount);
long startTime = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
for (int j = 0; j < iterations; j++) {
atomicLong.incrementAndGet();
}
latch1.countDown();
}).start();
}
latch1.await();
long atomicTime = System.currentTimeMillis() - startTime;
// 測試LongAdder
LongAdder longAdder = new LongAdder();
CountDownLatch latch2 = new CountDownLatch(threadCount);
startTime = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
for (int j = 0; j < iterations; j++) {
longAdder.increment();
}
latch2.countDown();
}).start();
}
latch2.await();
long adderTime = System.currentTimeMillis() - startTime;
System.out.println("AtomicLong 耗時: " + atomicTime + "ms");
System.out.println("LongAdder 耗時: " + adderTime + "ms");
System.out.println("AtomicLong 結果: " + atomicLong.get());
System.out.println("LongAdder 結果: " + longAdder.sum());
}
// 選擇合適的併發集合
public static void testCollectionPerformance() {
int elementCount = 100000;
// ConcurrentHashMap vs synchronized HashMap
Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();
Map<String, Integer> synchronizedMap = Collections.synchronizedMap(new HashMap<>());
long startTime = System.currentTimeMillis();
for (int i = 0; i < elementCount; i++) {
concurrentMap.put("key" + i, i);
}
long concurrentTime = System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();
for (int i = 0; i < elementCount; i++) {
synchronizedMap.put("key" + i, i);
}
long synchronizedTime = System.currentTimeMillis() - startTime;
System.out.println("ConcurrentHashMap 寫入耗時: " + concurrentTime + "ms");
System.out.println("SynchronizedHashMap 寫入耗時: " + synchronizedTime + "ms");
}
}
9. 總結
Java併發工具類提供了強大而靈活的線程同步機制,相比傳統的synchronized關鍵字,它們提供了更細粒度的控制和更好的性能。在實際開發中,我們應該:
- 根據場景選擇合適的工具類:讀多寫少用讀寫鎖,線程協調用同步器,高併發計數用LongAdder
- 注意資源管理:及時釋放鎖和關閉線程池
- 避免常見陷阱:死鎖、活鎖、資源競爭
- 性能優化:在合適的場景使用合適的併發工具