Java併發編程--基礎進階高級(完結)_System

文章目錄

  • 1. 併發工具類概述
  • 1.1 主要併發工具類分類
  • 2. 鎖機制工具類
  • 2.1 ReentrantLock 可重入鎖
  • 2.2 ReentrantReadWriteLock 讀寫鎖
  • 3. 同步器工具類
  • 3.1 CountDownLatch 倒計時門閂
  • 3.2 CyclicBarrier 循環屏障
  • 3.3 Semaphore 信號量
  • 4. 併發集合類
  • 4.1 ConcurrentHashMap 併發哈希表
  • 5. 原子變量類
  • 5.1 AtomicInteger 和 LongAdder
  • 5.2 AtomicReference 原子引用
  • 6. 線程池工具類
  • 6.1 ThreadPoolExecutor 自定義線程池
  • 6.2 Executors 工具類創建線程池
  • 7. 併發工具類原理分析
  • 7.1 AQS (AbstractQueuedSynchronizer) 原理
  • 7.2 併發工具類流程圖
  • CountDownLatch 工作流程
  • ReentrantLock 加鎖流程
  • 8. 最佳實踐和注意事項
  • 8.1 避免死鎖
  • 8.2 性能考慮
  • 9. 總結

在現代多核CPU架構下,併發編程已成為開發高性能應用的核心技能。Java提供了豐富的併發工具類,讓我們能夠編寫出線程安全且高效的多線程程序。

1. 併發工具類概述

Java併發工具類主要位於java.util.concurrent包中,這些工具類基於AQS(AbstractQueuedSynchronizer)框架構建,提供了比傳統的synchronized關鍵字更強大、更靈活的線程同步機制。

1.1 主要併發工具類分類

類別

主要工具類

用途

鎖機制

ReentrantLock, ReentrantReadWriteLock

提供更靈活的鎖操作

同步器

CountDownLatch, CyclicBarrier, Semaphore

控制線程間的協調

線程池

ThreadPoolExecutor, Executors

管理線程生命週期

併發集合

ConcurrentHashMap, CopyOnWriteArrayList

線程安全的集合類

原子變量

AtomicInteger, AtomicReference

無鎖的線程安全編程

2. 鎖機制工具類

2.1 ReentrantLock 可重入鎖

ReentrantLock提供了與synchronized相似的功能,但更加靈活。

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {
    private final ReentrantLock lock = new ReentrantLock();
    private int count = 0;

    public void increment() {
        lock.lock();  // 獲取鎖
        try {
            count++;
            System.out.println(Thread.currentThread().getName() + " count: " + count);
        } finally {
            lock.unlock();  // 確保鎖被釋放
        }
    }

    public static void main(String[] args) {
        ReentrantLockDemo demo = new ReentrantLockDemo();
        
        // 創建多個線程同時執行increment方法
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    demo.increment();
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, "Thread-" + i).start();
        }
    }
}

高級特性:嘗試獲取鎖和可中斷鎖

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class AdvancedReentrantLockDemo {
    private final ReentrantLock lock = new ReentrantLock();
    
    public boolean tryPerformTask() {
        // 嘗試在1秒內獲取鎖
        boolean acquired = false;
        try {
            acquired = lock.tryLock(1, TimeUnit.SECONDS);
            if (acquired) {
                // 執行關鍵代碼
                System.out.println(Thread.currentThread().getName() + " 成功獲取鎖並執行任務");
                Thread.sleep(500); // 模擬任務執行
                return true;
            } else {
                System.out.println(Thread.currentThread().getName() + " 獲取鎖超時");
                return false;
            }
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " 被中斷");
            return false;
        } finally {
            if (acquired) {
                lock.unlock();
            }
        }
    }
    
    public void interruptibleTask() throws InterruptedException {
        lock.lockInterruptibly();  // 可中斷的獲取鎖
        try {
            // 執行長時間任務
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println(Thread.currentThread().getName() + " 正在執行任務...");
                Thread.sleep(1000);
            }
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

2.2 ReentrantReadWriteLock 讀寫鎖

讀寫鎖允許多個讀操作同時進行,但寫操作是獨佔的。

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private String data = "初始數據";
    
    public String readData() {
        rwLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 讀取數據: " + data);
            Thread.sleep(100); // 模擬讀取耗時
            return data;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            rwLock.readLock().unlock();
        }
    }
    
    public void writeData(String newData) {
        rwLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 寫入數據: " + newData);
            Thread.sleep(200); // 模擬寫入耗時
            this.data = newData;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            rwLock.writeLock().unlock();
        }
    }
    
    public static void main(String[] args) {
        ReadWriteLockDemo demo = new ReadWriteLockDemo();
        
        // 創建多個讀線程
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    demo.readData();
                }
            }, "Reader-" + i).start();
        }
        
        // 創建寫線程
        for (int i = 0; i < 2; i++) {
            final int index = i;
            new Thread(() -> {
                demo.writeData("更新數據-" + index);
            }, "Writer-" + i).start();
        }
    }
}

3. 同步器工具類

3.1 CountDownLatch 倒計時門閂

CountDownLatch允許一個或多個線程等待其他線程完成操作。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;

public class CountDownLatchDemo {
    
    public static void main(String[] args) throws InterruptedException {
        int workerCount = 5;
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(workerCount);
        
        for (int i = 0; i < workerCount; i++) {
            final int workerId = i;
            new Thread(() -> {
                try {
                    System.out.println("工人 " + workerId + " 等待開工信號...");
                    startSignal.await();  // 等待開工信號
                    
                    // 模擬工作
                    int workTime = ThreadLocalRandom.current().nextInt(1000, 3000);
                    System.out.println("工人 " + workerId + " 開始工作,預計需要 " + workTime + " 毫秒");
                    Thread.sleep(workTime);
                    
                    System.out.println("工人 " + workerId + " 完成工作");
                    doneSignal.countDown();  // 完成工作,計數減1
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
        
        // 主線程準備開工信號
        Thread.sleep(2000);
        System.out.println("=== 經理髮出開工信號 ===");
        startSignal.countDown();  // 發出開工信號
        
        // 等待所有工人完成工作
        doneSignal.await();
        System.out.println("=== 所有工人都已完成工作,項目結束 ===");
    }
}

複雜應用場景:多階段任務處理

public class MultiStageTaskDemo {
    
    public static void main(String[] args) throws InterruptedException {
        int taskCount = 3;
        CountDownLatch phase1Latch = new CountDownLatch(taskCount);
        CountDownLatch phase2Latch = new CountDownLatch(taskCount);
        CountDownLatch finalLatch = new CountDownLatch(taskCount);
        
        for (int i = 0; i < taskCount; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    // 第一階段
                    System.out.println("任務 " + taskId + " 第一階段開始");
                    Thread.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
                    System.out.println("任務 " + taskId + " 第一階段完成");
                    phase1Latch.countDown();
                    
                    // 等待其他任務完成第一階段
                    phase1Latch.await();
                    
                    // 第二階段
                    System.out.println("任務 " + taskId + " 第二階段開始");
                    Thread.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
                    System.out.println("任務 " + taskId + " 第二階段完成");
                    phase2Latch.countDown();
                    
                    // 等待其他任務完成第二階段
                    phase2Latch.await();
                    
                    // 最終階段
                    System.out.println("任務 " + taskId + " 最終階段開始");
                    Thread.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
                    System.out.println("任務 " + taskId + " 最終階段完成");
                    finalLatch.countDown();
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
        
        finalLatch.await();
        System.out.println("所有任務全部完成!");
    }
}

3.2 CyclicBarrier 循環屏障

CyclicBarrier讓一組線程互相等待,直到所有線程都到達某個屏障點。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;

public class CyclicBarrierDemo {
    
    public static void main(String[] args) {
        int runnerCount = 4;
        CyclicBarrier barrier = new CyclicBarrier(runnerCount, 
            () -> System.out.println("所有選手都到達屏障,繼續下一輪比賽"));
        
        for (int i = 0; i < runnerCount; i++) {
            final int runnerId = i;
            new Thread(() -> {
                try {
                    for (int round = 1; round <= 3; round++) {
                        // 模擬跑步時間
                        int runTime = ThreadLocalRandom.current().nextInt(1000, 3000);
                        Thread.sleep(runTime);
                        System.out.println("選手 " + runnerId + " 完成第 " + round + " 輪比賽,用時 " + runTime + "ms");
                        
                        // 等待其他選手
                        barrier.await();
                    }
                    System.out.println("選手 " + runnerId + " 完成所有比賽!");
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

複雜場景:多線程數據計算

public class MatrixCalculationDemo {
    
    static class CalculationTask implements Runnable {
        private final int[][] matrix;
        private final int startRow;
        private final int endRow;
        private final CyclicBarrier barrier;
        private int partialSum;
        
        public CalculationTask(int[][] matrix, int startRow, int endRow, CyclicBarrier barrier) {
            this.matrix = matrix;
            this.startRow = startRow;
            this.endRow = endRow;
            this.barrier = barrier;
        }
        
        @Override
        public void run() {
            try {
                // 第一階段:計算部分和
                partialSum = 0;
                for (int i = startRow; i < endRow; i++) {
                    for (int j = 0; j < matrix[i].length; j++) {
                        partialSum += matrix[i][j];
                    }
                }
                System.out.println(Thread.currentThread().getName() + " 計算部分和: " + partialSum);
                barrier.await();
                
                // 第二階段:其他處理(這裏簡單模擬)
                Thread.sleep(100);
                barrier.await();
                
            } catch (InterruptedException | BrokenBarrierException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        public int getPartialSum() {
            return partialSum;
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        int size = 100;
        int[][] matrix = new int[size][size];
        
        // 初始化矩陣
        for (int i = 0; i < size; i++) {
            for (int j = 0; j < size; j++) {
                matrix[i][j] = ThreadLocalRandom.current().nextInt(1, 10);
            }
        }
        
        int threadCount = 4;
        int rowsPerThread = size / threadCount;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, 
            () -> System.out.println("所有線程完成當前階段"));
        
        CalculationTask[] tasks = new CalculationTask[threadCount];
        Thread[] threads = new Thread[threadCount];
        
        // 創建並啓動線程
        for (int i = 0; i < threadCount; i++) {
            int startRow = i * rowsPerThread;
            int endRow = (i == threadCount - 1) ? size : startRow + rowsPerThread;
            tasks[i] = new CalculationTask(matrix, startRow, endRow, barrier);
            threads[i] = new Thread(tasks[i], "Calculator-" + i);
            threads[i].start();
        }
        
        // 等待所有線程完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        // 彙總結果
        int totalSum = 0;
        for (CalculationTask task : tasks) {
            totalSum += task.getPartialSum();
        }
        
        System.out.println("矩陣總和: " + totalSum);
    }
}

3.3 Semaphore 信號量

Semaphore用於控制同時訪問特定資源的線程數量。

import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;

public class SemaphoreDemo {
    
    static class ResourcePool {
        private final Semaphore semaphore;
        private final boolean[] resources;
        
        public ResourcePool(int poolSize) {
            this.semaphore = new Semaphore(poolSize, true); // 公平信號量
            this.resources = new boolean[poolSize];
            for (int i = 0; i < poolSize; i++) {
                resources[i] = true;
            }
        }
        
        public int acquireResource() throws InterruptedException {
            semaphore.acquire();
            return getNextAvailableResource();
        }
        
        public void releaseResource(int resourceId) {
            if (markResourceAsAvailable(resourceId)) {
                semaphore.release();
            }
        }
        
        private synchronized int getNextAvailableResource() {
            for (int i = 0; i < resources.length; i++) {
                if (resources[i]) {
                    resources[i] = false;
                    return i;
                }
            }
            return -1; // 不會執行到這裏
        }
        
        private synchronized boolean markResourceAsAvailable(int resourceId) {
            if (!resources[resourceId]) {
                resources[resourceId] = true;
                return true;
            }
            return false;
        }
    }
    
    public static void main(String[] args) {
        ResourcePool pool = new ResourcePool(3); // 3個資源
        
        // 創建10個線程競爭資源
        for (int i = 0; i < 10; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("線程 " + threadId + " 等待獲取資源...");
                    int resource = pool.acquireResource();
                    System.out.println("線程 " + threadId + " 獲取資源 " + resource);
                    
                    // 模擬使用資源
                    Thread.sleep(ThreadLocalRandom.current().nextInt(2000, 5000));
                    
                    // 釋放資源
                    pool.releaseResource(resource);
                    System.out.println("線程 " + threadId + " 釋放資源 " + resource);
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

高級用法:數據庫連接池模擬

public class ConnectionPoolDemo {
    
    static class DatabaseConnection {
        private final int id;
        private boolean inUse = false;
        
        public DatabaseConnection(int id) {
            this.id = id;
        }
        
        public void executeQuery(String query) throws InterruptedException {
            System.out.println("連接 " + id + " 執行查詢: " + query);
            Thread.sleep(ThreadLocalRandom.current().nextInt(500, 2000));
        }
        
        public void close() {
            System.out.println("關閉連接 " + id);
        }
    }
    
    static class ConnectionPool {
        private final Semaphore semaphore;
        private final DatabaseConnection[] connections;
        private final boolean[] used;
        
        public ConnectionPool(int poolSize) {
            this.semaphore = new Semaphore(poolSize, true);
            this.connections = new DatabaseConnection[poolSize];
            this.used = new boolean[poolSize];
            
            for (int i = 0; i < poolSize; i++) {
                connections[i] = new DatabaseConnection(i);
                used[i] = false;
            }
        }
        
        public DatabaseConnection getConnection() throws InterruptedException {
            semaphore.acquire();
            return getAvailableConnection();
        }
        
        public void releaseConnection(DatabaseConnection connection) {
            if (markConnectionAsAvailable(connection)) {
                semaphore.release();
            }
        }
        
        private synchronized DatabaseConnection getAvailableConnection() {
            for (int i = 0; i < used.length; i++) {
                if (!used[i]) {
                    used[i] = true;
                    System.out.println("獲取連接 " + i);
                    return connections[i];
                }
            }
            return null; // 不會執行到這裏
        }
        
        private synchronized boolean markConnectionAsAvailable(DatabaseConnection connection) {
            for (int i = 0; i < connections.length; i++) {
                if (connections[i] == connection) {
                    if (used[i]) {
                        used[i] = false;
                        System.out.println("釋放連接 " + i);
                        return true;
                    }
                    return false;
                }
            }
            return false;
        }
    }
    
    public static void main(String[] args) {
        ConnectionPool pool = new ConnectionPool(5);
        
        // 模擬多個客户端請求
        for (int i = 0; i < 15; i++) {
            final int clientId = i;
            new Thread(() -> {
                try {
                    DatabaseConnection conn = pool.getConnection();
                    conn.executeQuery("SELECT * FROM table WHERE client=" + clientId);
                    pool.releaseConnection(conn);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

4. 併發集合類

4.1 ConcurrentHashMap 併發哈希表

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;

public class ConcurrentHashMapDemo {
    
    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>();
        
        // 添加初始數據
        scores.put("Alice", 0);
        scores.put("Bob", 0);
        scores.put("Charlie", 0);
        
        // 創建多個線程更新分數
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    // 隨機選擇一個玩家增加分數
                    String[] players = {"Alice", "Bob", "Charlie"};
                    String player = players[ThreadLocalRandom.current().nextInt(players.length)];
                    int points = ThreadLocalRandom.current().nextInt(1, 11);
                    
                    // 使用compute方法原子性更新
                    scores.compute(player, (key, value) -> value + points);
                    
                    System.out.println(Thread.currentThread().getName() + 
                                     " 給 " + player + " 增加 " + points + " 分");
                    
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
            threads[i].start();
        }
        
        // 等待所有線程完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        // 輸出最終結果
        System.out.println("\n最終分數:");
        scores.forEach((player, score) -> 
            System.out.println(player + ": " + score + " 分"));
    }
}

高級特性:併發統計

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;

public class ConcurrentStatisticsDemo {
    
    public static void main(String[] args) throws InterruptedException {
        // 使用LongAdder進行高性能統計
        ConcurrentHashMap<String, LongAdder> wordCount = new ConcurrentHashMap<>();
        
        String[] documents = {
            "java concurrent programming is important",
            "java multithreading helps performance",
            "concurrent hashmap is thread safe"
        };
        
        Thread[] processors = new Thread[documents.length];
        for (int i = 0; i < documents.length; i++) {
            final String document = documents[i];
            processors[i] = new Thread(() -> {
                String[] words = document.split(" ");
                for (String word : words) {
                    // 使用computeIfAbsent確保線程安全
                    wordCount.computeIfAbsent(word, k -> new LongAdder()).increment();
                }
            });
            processors[i].start();
        }
        
        // 等待所有處理完成
        for (Thread processor : processors) {
            processor.join();
        }
        
        // 輸出詞頻統計結果
        System.out.println("詞頻統計結果:");
        wordCount.forEach((word, count) -> 
            System.out.println(word + ": " + count));
    }
}

5. 原子變量類

5.1 AtomicInteger 和 LongAdder

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.ThreadLocalRandom;

public class AtomicDemo {
    
    static class AtomicCounter {
        private final AtomicInteger count = new AtomicInteger(0);
        
        public void increment() {
            count.incrementAndGet();
        }
        
        public void decrement() {
            count.decrementAndGet();
        }
        
        public int get() {
            return count.get();
        }
        
        // 複雜原子操作
        public boolean compareAndSet(int expect, int update) {
            return count.compareAndSet(expect, update);
        }
    }
    
    static class LongAdderCounter {
        private final LongAdder count = new LongAdder();
        
        public void increment() {
            count.increment();
        }
        
        public long get() {
            return count.sum();
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        // 測試AtomicInteger
        AtomicCounter atomicCounter = new AtomicCounter();
        Thread[] atomicThreads = new Thread[10];
        
        for (int i = 0; i < atomicThreads.length; i++) {
            atomicThreads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    atomicCounter.increment();
                    // 模擬一些處理
                    if (j % 100 == 0) {
                        atomicCounter.compareAndSet(atomicCounter.get(), atomicCounter.get());
                    }
                }
            });
            atomicThreads[i].start();
        }
        
        for (Thread thread : atomicThreads) {
            thread.join();
        }
        System.out.println("AtomicInteger 最終計數: " + atomicCounter.get());
        
        // 測試LongAdder(高併發場景性能更好)
        LongAdderCounter adderCounter = new LongAdderCounter();
        Thread[] adderThreads = new Thread[10];
        
        for (int i = 0; i < adderThreads.length; i++) {
            adderThreads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    adderCounter.increment();
                }
            });
            adderThreads[i].start();
        }
        
        for (Thread thread : adderThreads) {
            thread.join();
        }
        System.out.println("LongAdder 最終計數: " + adderCounter.get());
    }
}

5.2 AtomicReference 原子引用

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceDemo {
    
    static class User {
        private final String name;
        private final int age;
        
        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }
        
        @Override
        public String toString() {
            return "User{name='" + name + "', age=" + age + "}";
        }
    }
    
    public static void main(String[] args) {
        AtomicReference<User> atomicUser = new AtomicReference<>(
            new User("Alice", 25)
        );
        
        // 創建多個線程嘗試更新用户信息
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            new Thread(() -> {
                User currentUser, newUser;
                do {
                    currentUser = atomicUser.get();
                    newUser = new User(
                        "User-" + threadId, 
                        currentUser.age + 1
                    );
                    System.out.println(Thread.currentThread().getName() + 
                                     " 嘗試更新: " + currentUser + " -> " + newUser);
                } while (!atomicUser.compareAndSet(currentUser, newUser));
                
                System.out.println(Thread.currentThread().getName() + " 更新成功");
            }).start();
        }
        
        // 等待一會兒讓所有線程完成
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("最終用户信息: " + atomicUser.get());
    }
}

6. 線程池工具類

6.1 ThreadPoolExecutor 自定義線程池

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolDemo {
    
    static class CustomThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        
        CustomThreadFactory(String poolName) {
            namePrefix = poolName + "-thread-";
        }
        
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
            t.setDaemon(false);
            t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    
    static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("任務被拒絕: " + r.toString());
            // 可以在這裏實現自定義的拒絕策略,比如記錄日誌、持久化任務等
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        // 創建自定義線程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, // 核心線程數
            4, // 最大線程數
            60, // 空閒線程存活時間
            TimeUnit.SECONDS, // 時間單位
            new LinkedBlockingQueue<>(10), // 工作隊列
            new CustomThreadFactory("CustomPool"), // 線程工廠
            new CustomRejectedExecutionHandler() // 拒絕策略
        );
        
        // 提交任務
        AtomicInteger completedTasks = new AtomicInteger(0);
        for (int i = 0; i < 20; i++) {
            final int taskId = i;
            try {
                executor.execute(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + 
                                         " 執行任務 " + taskId);
                        Thread.sleep(ThreadLocalRandom.current().nextInt(500, 2000));
                        completedTasks.incrementAndGet();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            } catch (RejectedExecutionException e) {
                System.out.println("任務 " + taskId + " 被拒絕");
            }
        }
        
        // 關閉線程池
        executor.shutdown();
        boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
        if (terminated) {
            System.out.println("所有任務完成,共完成: " + completedTasks.get() + " 個任務");
        } else {
            System.out.println("等待超時,已完成: " + completedTasks.get() + " 個任務");
        }
    }
}

6.2 Executors 工具類創建線程池

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ExecutorsDemo {
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1. 固定大小線程池
        ExecutorService fixedPool = Executors.newFixedThreadPool(3);
        
        // 2. 單線程線程池
        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
        
        // 3. 緩存線程池
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        
        // 4. 定時任務線程池
        ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
        
        System.out.println("=== 固定大小線程池演示 ===");
        AtomicInteger counter = new AtomicInteger(0);
        CountDownLatch latch = new CountDownLatch(10);
        
        for (int i = 0; i < 10; i++) {
            fixedPool.execute(() -> {
                try {
                    int value = counter.incrementAndGet();
                    System.out.println(Thread.currentThread().getName() + 
                                     " 執行任務,當前計數: " + value);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        System.out.println("所有任務完成,最終計數: " + counter.get());
        
        System.out.println("\n=== 定時任務演示 ===");
        CountDownLatch scheduledLatch = new CountDownLatch(3);
        
        // 延遲執行
        scheduledPool.schedule(() -> {
            System.out.println("延遲5秒執行的任務");
            scheduledLatch.countDown();
        }, 5, TimeUnit.SECONDS);
        
        // 固定速率執行
        ScheduledFuture<?> future = scheduledPool.scheduleAtFixedRate(() -> {
            System.out.println("固定速率任務執行時間: " + System.currentTimeMillis());
            scheduledLatch.countDown();
        }, 1, 2, TimeUnit.SECONDS);
        
        // 等待任務完成
        scheduledLatch.await();
        future.cancel(false); // 取消定時任務
        
        System.out.println("\n=== Callable和Future演示 ===");
        ExecutorService callablePool = Executors.newFixedThreadPool(3);
        
        // 提交Callable任務
        Future<Integer> future1 = callablePool.submit(() -> {
            Thread.sleep(1000);
            return 42;
        });
        
        Future<String> future2 = callablePool.submit(() -> {
            Thread.sleep(1500);
            return "Hello, World!";
        });
        
        Future<Double> future3 = callablePool.submit(() -> {
            Thread.sleep(800);
            return 3.14159;
        });
        
        // 獲取結果
        System.out.println("結果1: " + future1.get());
        System.out.println("結果2: " + future2.get());
        System.out.println("結果3: " + future3.get());
        
        // 關閉所有線程池
        fixedPool.shutdown();
        singleThreadPool.shutdown();
        cachedPool.shutdown();
        scheduledPool.shutdown();
        callablePool.shutdown();
    }
}

7. 併發工具類原理分析

7.1 AQS (AbstractQueuedSynchronizer) 原理

AQS是Java併發包的核心基礎組件,它提供了一個框架,用於實現依賴於先進先出(FIFO)等待隊列的阻塞鎖和相關同步器。

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

// 自定義的簡單互斥鎖,基於AQS實現
public class SimpleMutex {
    private final Sync sync = new Sync();
    
    // 內部同步器類
    private static class Sync extends AbstractQueuedSynchronizer {
        // 嘗試獲取鎖
        @Override
        protected boolean tryAcquire(int acquires) {
            // 使用CAS操作將狀態從0改為1
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        
        // 嘗試釋放鎖
        @Override
        protected boolean tryRelease(int releases) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        
        // 判斷是否被獨佔
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }
    
    public void lock() {
        sync.acquire(1);
    }
    
    public void unlock() {
        sync.release(1);
    }
    
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

7.2 併發工具類流程圖

CountDownLatch 工作流程










主線程創建CountDownLatch

設置初始計數N

主線程調用await

計數是否為0?

主線程繼續執行

主線程阻塞等待

工作線程完成任務

調用countDown

計數減1

計數是否為0?

喚醒所有等待線程

繼續等待其他線程


ReentrantLock 加鎖流程








線程嘗試獲取鎖

鎖是否空閒?

CAS設置鎖狀態

獲取鎖成功

當前線程是否持有鎖?

重入計數+1

加入等待隊列

線程進入等待狀態

被喚醒後嘗試獲取鎖


8. 最佳實踐和注意事項

8.1 避免死鎖

public class DeadlockPrevention {
    
    // 按固定順序獲取鎖來避免死鎖
    public void transferMoney(Account from, Account to, int amount) {
        Object firstLock = from.getId() < to.getId() ? from : to;
        Object secondLock = from.getId() < to.getId() ? to : from;
        
        synchronized (firstLock) {
            synchronized (secondLock) {
                if (from.getBalance() >= amount) {
                    from.debit(amount);
                    to.credit(amount);
                }
            }
        }
    }
    
    // 使用tryLock避免死鎖
    public boolean tryTransferMoney(Account from, Account to, int amount, 
                                   long timeout, TimeUnit unit) throws InterruptedException {
        long stopTime = System.nanoTime() + unit.toNanos(timeout);
        
        while (true) {
            if (from.getLock().tryLock()) {
                try {
                    if (to.getLock().tryLock()) {
                        try {
                            if (from.getBalance() >= amount) {
                                from.debit(amount);
                                to.credit(amount);
                                return true;
                            } else {
                                return false;
                            }
                        } finally {
                            to.getLock().unlock();
                        }
                    }
                } finally {
                    from.getLock().unlock();
                }
            }
            
            if (System.nanoTime() > stopTime) {
                return false;
            }
            
            // 隨機休眠避免活鎖
            Thread.sleep(ThreadLocalRandom.current().nextInt(100));
        }
    }
    
    static class Account {
        private final String id;
        private int balance;
        private final ReentrantLock lock = new ReentrantLock();
        
        public Account(String id, int balance) {
            this.id = id;
            this.balance = balance;
        }
        
        public String getId() { return id; }
        public int getBalance() { return balance; }
        public ReentrantLock getLock() { return lock; }
        
        public void debit(int amount) {
            balance -= amount;
        }
        
        public void credit(int amount) {
            balance += amount;
        }
    }
}

8.2 性能考慮

public class PerformanceConsiderations {
    
    // 使用LongAdder替代AtomicLong在高併發寫場景
    public static void testCounterPerformance() throws InterruptedException {
        int threadCount = 10;
        int iterations = 100000;
        
        // 測試AtomicLong
        AtomicLong atomicLong = new AtomicLong();
        CountDownLatch latch1 = new CountDownLatch(threadCount);
        
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                for (int j = 0; j < iterations; j++) {
                    atomicLong.incrementAndGet();
                }
                latch1.countDown();
            }).start();
        }
        latch1.await();
        long atomicTime = System.currentTimeMillis() - startTime;
        
        // 測試LongAdder
        LongAdder longAdder = new LongAdder();
        CountDownLatch latch2 = new CountDownLatch(threadCount);
        
        startTime = System.currentTimeMillis();
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                for (int j = 0; j < iterations; j++) {
                    longAdder.increment();
                }
                latch2.countDown();
            }).start();
        }
        latch2.await();
        long adderTime = System.currentTimeMillis() - startTime;
        
        System.out.println("AtomicLong 耗時: " + atomicTime + "ms");
        System.out.println("LongAdder 耗時: " + adderTime + "ms");
        System.out.println("AtomicLong 結果: " + atomicLong.get());
        System.out.println("LongAdder 結果: " + longAdder.sum());
    }
    
    // 選擇合適的併發集合
    public static void testCollectionPerformance() {
        int elementCount = 100000;
        
        // ConcurrentHashMap vs synchronized HashMap
        Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();
        Map<String, Integer> synchronizedMap = Collections.synchronizedMap(new HashMap<>());
        
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < elementCount; i++) {
            concurrentMap.put("key" + i, i);
        }
        long concurrentTime = System.currentTimeMillis() - startTime;
        
        startTime = System.currentTimeMillis();
        for (int i = 0; i < elementCount; i++) {
            synchronizedMap.put("key" + i, i);
        }
        long synchronizedTime = System.currentTimeMillis() - startTime;
        
        System.out.println("ConcurrentHashMap 寫入耗時: " + concurrentTime + "ms");
        System.out.println("SynchronizedHashMap 寫入耗時: " + synchronizedTime + "ms");
    }
}

9. 總結

Java併發工具類提供了強大而靈活的線程同步機制,相比傳統的synchronized關鍵字,它們提供了更細粒度的控制和更好的性能。在實際開發中,我們應該:

  1. 根據場景選擇合適的工具類:讀多寫少用讀寫鎖,線程協調用同步器,高併發計數用LongAdder
  2. 注意資源管理:及時釋放鎖和關閉線程池
  3. 避免常見陷阱:死鎖、活鎖、資源競爭
  4. 性能優化:在合適的場景使用合適的併發工具