一、為什麼需要線程間通信?
大家好!今天我們來聊聊多線程編程中的一個核心問題:線程間通信。
想象一下這個場景:你開發了一個電商系統,一個線程負責接收用户下單請求,另一個線程負責庫存扣減,還有一個線程負責發送通知。這些線程之間如果無法協作,就像各自為戰的士兵,無法完成統一的任務。
線程間通信解決的核心問題是:
- 線程協作:多個線程按照預定的順序執行任務
- 數據共享:一個線程產生的數據,需要被另一個線程使用
- 狀態同步:一個線程的狀態變化需要通知其他線程
Java 提供了多種線程間通信機制,今天我們重點介紹三種經典方式:
二、第一種:wait/notify 機制
2.1 核心原理
wait/notify 是 Java 最基礎的線程間通信機制,它們是 Object 類的方法,而不是 Thread 類的方法。這意味着任何對象都可以作為線程間通信的媒介。
基本工作原理如下:
2.2 核心方法説明
- wait(): 讓當前線程進入等待狀態,並釋放對象鎖
- wait(long timeout): 帶超時時間的等待
- wait(long timeout, int nanos): 更精細的超時控制
- notify(): 隨機喚醒一個在該對象上等待的線程
- notifyAll(): 喚醒所有在該對象上等待的線程
2.3 使用規則
使用 wait/notify 有一些必須遵守的規則,否則會拋出 IllegalMonitorStateException 異常:
- 必須在 synchronized 同步塊或方法中調用
- 必須是同一個監視器對象
- wait 後必須使用循環檢查等待條件(避免虛假喚醒)
關於虛假喚醒,Java 官方文檔明確指出:
"線程可能在沒有被通知、中斷或超時的情況下被喚醒,這被稱為虛假喚醒。雖然這在實際中很少發生,但應用程序必須通過測試應該導致線程被喚醒的條件來防範它,並且如果條件不滿足則繼續等待。換句話説,等待應該總是發生在循環中。"
這是由操作系統線程調度機制決定的,不是 Java 的 bug。
2.4 生產者-消費者示例
下面是一個典型的生產者-消費者模式示例,通過 wait/notify 實現線程間協作:
public class WaitNotifyExample {
private final Queue<String> queue = new LinkedList<>();
private final int MAX_SIZE = 5;
public synchronized void produce(String data) throws InterruptedException {
// 使用while循環檢查條件,防止虛假喚醒
while (queue.size() == MAX_SIZE) {
System.out.println("隊列已滿,生產者等待...");
this.wait(); // 隊列滿了,生產者線程等待
}
queue.add(data);
System.out.println("生產數據: " + data + ", 當前隊列大小: " + queue.size());
// 只通知消費者線程,避免不必要的喚醒
this.notify(); // 在單生產者單消費者的情況下可以用notify提高效率
}
public synchronized String consume() throws InterruptedException {
// 使用while循環檢查條件,防止虛假喚醒
while (queue.isEmpty()) {
System.out.println("隊列為空,消費者等待...");
this.wait(); // 隊列空了,消費者線程等待
}
String data = queue.poll();
System.out.println("消費數據: " + data + ", 當前隊列大小: " + queue.size());
// 只通知生產者線程,避免不必要的喚醒
this.notify(); // 在單生產者單消費者的情況下可以用notify提高效率
return data;
}
// 對於多生產者多消費者的場景,應改用notifyAll避免線程飢餓
public synchronized void produceMulti(String data) throws InterruptedException {
while (queue.size() == MAX_SIZE) {
System.out.println(Thread.currentThread().getName() + ": 隊列已滿,生產者等待...");
this.wait();
}
queue.add(data);
System.out.println(Thread.currentThread().getName() + ": 生產數據: " + data + ", 當前隊列大小: " + queue.size());
// 當有多個生產者和消費者時,必須用notifyAll確保正確喚醒
this.notifyAll();
}
public static void main(String[] args) {
WaitNotifyExample example = new WaitNotifyExample();
// 創建生產者線程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
example.produce("數據-" + i);
Thread.sleep(new Random().nextInt(1000));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 創建消費者線程
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
example.consume();
Thread.sleep(new Random().nextInt(1000));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
2.5 notify()與 notifyAll()的選擇策略
何時使用 notify(),何時使用 notifyAll()?這是線程間通信中的重要決策:
-
使用 notify()的情況:
- 所有等待線程都是同質的(做相同任務)
- 單一消費者/生產者模式
- 性能敏感,且能確保不會導致線程飢餓
-
使用 notifyAll()的情況:
- 有多種不同類型的等待線程
- 多生產者/多消費者模式
- 安全性要求高於性能要求
- 當不確定使用哪個更合適時(默認選擇)
2.6 常見問題與解決方案
-
虛假喚醒問題
問題:線程可能在沒有 notify/notifyAll 調用的情況下被喚醒
解決:始終使用 while 循環檢查等待條件,而不是 if 語句
-
死鎖風險
如果生產者和消費者互相等待對方的通知,且都沒有收到通知,就會發生死鎖。可以考慮使用帶超時參數的 wait(timeout)方法,例如:
// 超時等待,避免永久死鎖 if (!condition) { this.wait(1000); // 最多等待1秒 } -
異常處理
wait()方法會拋出 InterruptedException,需要適當處理:
try { while (!condition) { object.wait(); } } catch (InterruptedException e) { // 恢復中斷狀態,不吞掉中斷 Thread.currentThread().interrupt(); // 或者進行資源清理並提前返回 return; }
三、第二種:Condition 條件變量
3.1 基本概念
Condition 是在 Java 5 引入的,它提供了比 wait/notify 更加靈活和精確的線程間控制機制。Condition 對象總是與 Lock 對象一起使用。
3.2 Condition 接口的核心方法
- await(): 類似於 wait(),釋放鎖並等待
- await(long time, TimeUnit unit): 帶超時的等待
- awaitUninterruptibly(): 不可中斷的等待
- awaitUntil(Date deadline): 等待到指定的時間點
- signal(): 類似於 notify(),喚醒一個等待線程
- signalAll(): 類似於 notifyAll(),喚醒所有等待線程
3.3 分組喚醒原理
Condition 的核心優勢在於實現"精確通知"。與 wait/notify 使用同一個等待隊列不同,每個 Condition 對象管理着各自獨立的等待隊列。
wait/notify: 所有線程在同一個等待隊列
┌─────────────────────┐
│ 對象監視器等待隊列 │
├─────────┬───────────┤
│ 線程A │ 線程B │
└─────────┴───────────┘
Condition: 每個Condition維護獨立的等待隊列
┌─────────────────────┐ ┌─────────────────────┐
│ Condition1等待隊列 │ │ Condition2等待隊列 │
├─────────┬───────────┤ ├─────────┬───────────┤
│ 線程A │ 線程C │ │ 線程B │ 線程D │
└─────────┴───────────┘ └─────────┴───────────┘
這種機制使得:
- 生產者可以只喚醒消費者(而不是所有等待線程)
- 清空操作可以只喚醒生產者(而不是消費者)
- 不同類型的等待可以使用不同的 Condition
3.4 相比 wait/notify 的優勢
- 可以精確喚醒指定線程組:一個 Lock 可以創建多個 Condition 對象,實現分組喚醒
- 有更好的中斷控制:提供可中斷和不可中斷的等待
- 可以設置超時時間:更靈活的超時機制(支持時間單位)
- 可以實現公平鎖:使用 ReentrantLock 的公平性特性
- 通過獨立等待隊列實現精準喚醒:僅通知目標線程組,避免喚醒無關線程(如生產者不喚醒其他生產者),從而減少 CPU 資源浪費
3.5 精確通知示例
下面是一個使用 Condition 實現的生產者-消費者模式,支持精確通知:
public class ConditionExample {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // 隊列不滿條件
private final Condition notEmpty = lock.newCondition(); // 隊列不空條件
private final Queue<String> queue = new LinkedList<>();
private final int MAX_SIZE = 5;
public void produce(String data) throws InterruptedException {
lock.lock();
try {
// 隊列已滿,等待不滿條件
while (queue.size() == MAX_SIZE) {
System.out.println("隊列已滿,生產者等待...");
notFull.await(); // 生產者在notFull條件上等待
}
queue.add(data);
System.out.println("生產數據: " + data + ", 當前隊列大小: " + queue.size());
// 通知消費者隊列不為空 - 精確通知,只喚醒消費者線程
notEmpty.signal();
} finally {
// 必須在finally中釋放鎖,確保鎖一定被釋放
lock.unlock();
}
}
public String consume() throws InterruptedException {
lock.lock();
try {
// 隊列為空,等待不空條件
while (queue.isEmpty()) {
System.out.println("隊列為空,消費者等待...");
notEmpty.await(); // 消費者在notEmpty條件上等待
}
String data = queue.poll();
System.out.println("消費數據: " + data + ", 當前隊列大小: " + queue.size());
// 通知生產者隊列不滿 - 精確通知,只喚醒生產者線程
notFull.signal();
return data;
} finally {
lock.unlock();
}
}
// 使用可中斷鎖嘗試獲取數據,帶超時控制
public String consumeWithTimeout(long timeout, TimeUnit unit) throws InterruptedException {
// 嘗試獲取鎖,可設置超時
if (!lock.tryLock(timeout, unit)) {
System.out.println("獲取鎖超時,放棄消費");
return null;
}
try {
// 使用超時等待
if (queue.isEmpty() && !notEmpty.await(timeout, unit)) {
System.out.println("等待數據超時,放棄消費");
return null;
}
if (!queue.isEmpty()) {
String data = queue.poll();
System.out.println("消費數據: " + data + ", 當前隊列大小: " + queue.size());
notFull.signal();
return data;
}
return null;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ConditionExample example = new ConditionExample();
// 創建生產者線程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
example.produce("數據-" + i);
Thread.sleep(new Random().nextInt(1000));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 創建消費者線程
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
example.consume();
Thread.sleep(new Random().nextInt(1000));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
3.6 使用 Condition 實現多條件協作
我們可以使用多個 Condition 實現更復雜的場景,比如一個緩衝區,有讀者、寫者和清理者三種角色:
public class MultiConditionExample {
private final ReentrantLock lock = new ReentrantLock();
private final Condition writerCondition = lock.newCondition(); // 寫入條件
private final Condition readerCondition = lock.newCondition(); // 讀取條件
private final Condition cleanerCondition = lock.newCondition(); // 清理條件
private boolean hasData = false;
private boolean needClean = false;
private String data;
// 寫入數據
public void write(String data) throws InterruptedException {
lock.lock();
try {
// 已有數據或需要清理,等待
while (hasData || needClean) {
System.out.println(Thread.currentThread().getName() + " 等待寫入條件...");
writerCondition.await();
}
this.data = data;
hasData = true;
System.out.println(Thread.currentThread().getName() + " 寫入數據: " + data);
// 通知讀者可以讀取數據
readerCondition.signal();
} finally {
lock.unlock();
}
}
// 讀取數據
public String read() throws InterruptedException {
lock.lock();
try {
// 沒有數據或需要清理,等待
while (!hasData || needClean) {
System.out.println(Thread.currentThread().getName() + " 等待讀取條件...");
readerCondition.await();
}
String result = this.data;
hasData = false;
needClean = true;
System.out.println(Thread.currentThread().getName() + " 讀取數據: " + result);
// 通知清理者可以清理
cleanerCondition.signal();
return result;
} finally {
lock.unlock();
}
}
// 清理操作
public void clean() throws InterruptedException {
lock.lock();
try {
// 不需要清理,等待
while (!needClean) {
System.out.println(Thread.currentThread().getName() + " 等待清理條件...");
cleanerCondition.await();
}
this.data = null;
needClean = false;
System.out.println(Thread.currentThread().getName() + " 清理完成");
// 通知寫者可以寫入數據
writerCondition.signal();
} finally {
lock.unlock();
}
}
// 測試方法
public static void main(String[] args) {
MultiConditionExample example = new MultiConditionExample();
// 寫入線程
new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
example.write("數據-" + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "寫入線程").start();
// 讀取線程
new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
example.read();
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "讀取線程").start();
// 清理線程
new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
example.clean();
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "清理線程").start();
}
}
3.7 Condition 與虛假喚醒
需要注意的是,Condition 的 await()方法同樣會發生虛假喚醒,與 wait()方法類似。虛假喚醒是所有線程等待機制的固有特性,而不是特定通知機制的缺陷。即使使用 Condition 的精確通知機制,仍然需要使用循環檢查等待條件:
// 正確使用Condition的方式
lock.lock();
try {
while (!condition) { // 使用循環檢查條件
condition.await();
}
// 處理條件滿足的情況
} finally {
lock.unlock();
}
Condition 雖然通過獨立的等待隊列減少了"無效喚醒"(非目標線程的喚醒),但無法消除操作系統層面的虛假喚醒可能性。
3.8 wait/notify 與 Condition 性能對比
雖然 Condition 在功能上更加強大,但實際性能與 wait/notify 非常接近,因為兩者底層都依賴於操作系統的線程阻塞機制。兩者的性能差異在高併發場景下可忽略,功能匹配度是首要考慮因素。主要區別在於:
- Condition 需要顯式管理鎖:Lock.lock()和 lock.unlock(),增加了代碼複雜度
- Condition 提供更多控制能力:超時、中斷、多條件等
- Lock 支持非阻塞嘗試獲取鎖:tryLock()可避免長時間阻塞
選擇標準:功能需求應優先於性能考慮,複雜的線程間協作場景下首選 Condition。
四、第三種:管道通信
4.1 管道通信基本概念
Java IO 提供了管道流,專門用於線程間的數據傳輸,適用於單 JVM 內的線程間通信。主要涉及以下幾個類:
- PipedOutputStream 和 PipedInputStream
- PipedWriter 和 PipedReader
這些類構成了線程之間的管道通信通道,一個線程向管道寫入數據,另一個線程從管道讀取數據。
4.2 管道通信的核心特性
-
阻塞機制:
- 管道緩衝區滿時,write()操作會阻塞
- 管道緩衝區空時,read()操作會阻塞
- 這一特性自動實現了生產者-消費者模式的流控制
-
字節流與字符流:
- 字節流:PipedInputStream/PipedOutputStream - 處理二進制數據
- 字符流:PipedReader/PipedWriter - 處理文本數據(帶字符編碼)
-
內部緩衝區:
- 默認大小為 1024 字節
- 可以在構造函數中指定緩衝區大小
4.3 管道通信的使用場景
管道通信特別適合於:
- 需要傳輸原始數據或字符流的場景
- 生產者-消費者模式中的數據傳輸
- 多個處理階段之間的流水線處理
- 日誌記錄器、數據過濾、實時數據處理
4.4 字節管道示例
下面是一個使用 PipedInputStream 和 PipedOutputStream 的示例:
public class PipedStreamExample {
public static void main(String[] args) throws Exception {
// 創建管道輸出流和輸入流
PipedOutputStream output = new PipedOutputStream();
PipedInputStream input = new PipedInputStream(output); // 直接在構造器中連接
// 創建寫入線程
Thread writerThread = new Thread(() -> {
try {
System.out.println("寫入線程啓動");
for (int i = 1; i <= 10; i++) {
String message = "數據-" + i;
output.write(message.getBytes());
System.out.println("寫入: " + message);
// 如果註釋掉sleep,可能會因為管道緩衝區滿而阻塞
Thread.sleep(500);
}
// 關閉輸出流,表示不再寫入數據
output.close();
} catch (Exception e) {
e.printStackTrace();
}
});
// 創建讀取線程
Thread readerThread = new Thread(() -> {
try {
System.out.println("讀取線程啓動");
byte[] buffer = new byte[100]; // 小於完整消息長度,演示多次讀取
int len;
// read方法在管道沒有數據時會阻塞,直到有數據或管道關閉
while ((len = input.read(buffer)) != -1) {
String message = new String(buffer, 0, len);
System.out.println("讀取: " + message);
// 如果註釋掉sleep,可能會因為消費太快而導致管道經常為空
Thread.sleep(1000);
}
input.close();
} catch (Exception e) {
e.printStackTrace();
}
});
// 啓動線程
writerThread.start();
readerThread.start();
}
}
4.5 字符管道示例
下面是使用 PipedWriter 和 PipedReader 的字符流管道示例:
public class PipedReaderWriterExample {
public static void main(String[] args) throws Exception {
// 創建管道寫入器和讀取器
PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader(writer, 1024); // 指定緩衝區大小
// 創建寫入線程
Thread writerThread = new Thread(() -> {
try {
System.out.println("寫入線程啓動");
for (int i = 1; i <= 10; i++) {
String message = "字符數據-" + i + "\n";
writer.write(message);
writer.flush(); // 確保數據立即寫入管道
System.out.println("寫入: " + message);
Thread.sleep(500);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
});
// 創建讀取線程
Thread readerThread = new Thread(() -> {
try {
System.out.println("讀取線程啓動");
char[] buffer = new char[1024];
int len;
// 演示按行讀取
BufferedReader bufferedReader = new BufferedReader(reader);
String line;
while ((line = bufferedReader.readLine()) != null) {
System.out.println("讀取一行: " + line);
Thread.sleep(700);
}
reader.close();
} catch (Exception e) {
e.printStackTrace();
}
});
// 啓動線程
writerThread.start();
readerThread.start();
}
}
4.6 管道通信的注意事項
-
管道流容量有限:
- 默認容量為 1024 字節
- 寫入過多數據而沒有及時讀取,寫入方會阻塞
- 讀取空管道時,讀取方會阻塞
-
連接機制:
- 使用前必須先調用 connect()方法連接兩個流,或在構造時指定
- 多次 connect 會拋出異常
-
單向通信:
- 管道是單向的,需要雙向通信時需要創建兩對管道
- 分清楚誰是生產者(寫入方),誰是消費者(讀取方)
-
關閉管理:
- 必須正確關閉管道流(在 finally 塊中)
- 一端關閉後另一端會收到-1 或 null,表示流結束
-
線程安全性:
- 單個管道的寫入/讀取操作是線程安全的(即單個寫入線程和單個讀取線程無需額外同步)
- 多個線程同時寫入/讀取同一個管道仍需外部同步
- 管道不支持多寫多讀模式,設計上就是一個線程寫,一個線程讀
五、輔助通信方式:volatile 變量
5.1 volatile 基本原理
volatile 是 Java 提供的輕量級線程間通信機制,它保證了變量的可見性和有序性,但不保證原子性。
5.2 volatile 的實現原理
-
可見性保證:
- volatile 變量的寫操作會強制刷新到主內存
- volatile 變量的讀操作會強制從主內存獲取最新值
- 保證一個線程對變量的修改對其他線程立即可見
-
內存屏障:
- volatile 變量的讀寫操作會插入內存屏障指令,禁止指令重排序
- 保證程序執行的有序性,防止編譯器和 CPU 的優化破壞併發安全
- 在 x86 架構上,寫操作會生成鎖前綴指令(LOCK prefix)
-
無鎖機制:
- 不會導致線程阻塞
- 比 synchronized 更輕量級,性能更好
- 適合一寫多讀場景
5.3 volatile 與原子性
volatile 不保證原子性,這意味着:
// 以下操作在多線程環境中不安全,即使counter是volatile
volatile int counter = 0;
counter++; // 非原子操作:讀取-修改-寫入
對於需要原子性的場景,可以結合原子類使用:
// 使用原子類保證原子性
AtomicInteger counter = new AtomicInteger(0);
counter.incrementAndGet(); // 原子操作
或者使用 synchronized:
volatile int counter = 0;
synchronized void increment() {
counter++; // 在同步塊中安全
}
5.4 使用 volatile 實現線程間通信示例
下面是一個使用 volatile 變量實現線程間通信的簡單例子:
public class VolatileCommunicationExample {
// 使用volatile標記共享變量
private static volatile boolean flag = false;
private static volatile int counter = 0;
public static void main(String[] args) throws InterruptedException {
// 創建Writer線程
Thread writerThread = new Thread(() -> {
System.out.println("寫入線程開始運行");
try {
Thread.sleep(1000); // 模擬耗時操作
counter = 100; // 更新數據
// volatile變量的寫操作會強制刷新到主內存
// 其他線程的讀取會從主內存獲取最新值
// 內存屏障確保以下操作不會被重排序到上面操作之前
flag = true; // 設置標誌位
System.out.println("寫入線程完成數據更新: counter = " + counter);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 創建Reader線程
Thread readerThread = new Thread(() -> {
System.out.println("讀取線程開始運行");
// 不使用sleep等待,volatile的可見性使其工作
while (!flag) {
// 自旋等待標誌位變化
Thread.yield(); // 減少CPU佔用
}
// 由於volatile的可見性保證,這裏讀取到的一定是最新值100
System.out.println("讀取線程讀取到數據: counter = " + counter);
});
// 啓動線程
readerThread.start();
writerThread.start();
// 等待線程結束
writerThread.join();
readerThread.join();
}
}
5.5 volatile 的適用場景與侷限性
適用場景:
- 狀態標誌:如開關變量、完成標誌等
- 一寫多讀:一個線程寫,多個線程讀的場景
- 無原子操作:變量操作不需要保證原子性的場景
- 雙重檢查鎖定模式:在單例模式中用於安全發佈
侷限性:
- 不保證原子性:對於
i++這樣的複合操作無法保證 - 不能替代鎖:對於複雜共享狀態的控制還是需要鎖
- 性能考慮:頻繁修改的變量使用 volatile 可能導致總線流量增加
六、三種通信方式的對比與選擇
不同的線程間通信方式有各自的特點和適用場景,下表對比了它們的關鍵特性:
| 特性 | wait/notify | Condition | 管道通信 | volatile |
|---|---|---|---|---|
| 線程安全級別 | 高(內置鎖) | 高(顯式鎖) | 中(緩衝區) | 低(僅可見性) |
| 數據傳輸能力 | 通過共享對象 | 通過共享對象 | 流式傳輸 | 單個變量 |
| 適用場景 | 線程間協作 | 複雜線程間協作 | 數據流傳輸 | 狀態標誌 |
| 實現複雜度 | 簡單 | 中等 | 中等 | 簡單 |
| 控制精度 | 一般 | 高 | 不適用 | 不適用 |
| 阻塞特性 | 阻塞 | 阻塞 | 阻塞 | 非阻塞 |
| 鎖機制 | synchronized | ReentrantLock | 內部同步 | 無鎖 |
| 通信方向 | 多向 | 多向 | 單向 | 多向 |
| 通知精確性 | 不精確 | 精確 | 不適用 | 不適用 |
| 適用數據類型 | 任意對象 | 任意對象 | 支持連續的二進制數據或文本數據,適合流式處理(如日誌、文件內容),不適合離散的對象傳輸 | 基本類型/對象引用 |
七、線程間通信實戰案例:日誌收集器
下面是一個綜合應用案例,實現一個簡單的日誌收集器:
public class LogCollector {
// 日誌隊列 - 內部已實現線程安全
private final BlockingQueue<String> logQueue = new LinkedBlockingQueue<>(1000);
// 停止標誌
private volatile boolean stopped = false;
// 用於管道通信的字符寫入器與讀取器
private PipedWriter logWriter;
private PipedReader logReader;
// 線程管理
private Thread collectorThread; // 日誌收集線程
private Thread processorThread; // 日誌處理線程
private Thread outputThread; // 日誌輸出線程
public LogCollector() throws IOException {
// 初始化管道
this.logWriter = new PipedWriter();
this.logReader = new PipedReader(logWriter);
}
public void start() {
// 創建日誌收集線程
collectorThread = new Thread(() -> {
System.out.println("日誌收集線程啓動");
try {
while (!stopped) {
// 模擬生成日誌
String log = "INFO " + new Date() + ": " + "系統運行正常,內存使用率: "
+ new Random().nextInt(100) + "%";
// BlockingQueue的put方法在隊列滿時會自動阻塞
logQueue.put(log);
System.out.println("收集日誌: " + log);
// 控制日誌生成速度
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("日誌收集線程結束");
});
// 創建日誌處理線程
processorThread = new Thread(() -> {
System.out.println("日誌處理線程啓動");
try {
while (!stopped || !logQueue.isEmpty()) {
// BlockingQueue的poll方法在隊列空時會阻塞指定時間
String log = logQueue.poll(500, TimeUnit.MILLISECONDS);
if (log != null) {
// 處理日誌(這裏簡單加上處理標記)
String processedLog = "已處理: " + log + "\n";
// 通過管道發送到輸出線程
logWriter.write(processedLog);
logWriter.flush();
}
}
// 處理完所有日誌後關閉寫入器
logWriter.close();
} catch (InterruptedException | IOException e) {
Thread.currentThread().interrupt();
}
System.out.println("日誌處理線程結束");
});
// 創建日誌輸出線程
outputThread = new Thread(() -> {
try {
System.out.println("日誌輸出線程啓動");
BufferedReader reader = new BufferedReader(logReader);
String line;
// 從管道中讀取處理後的日誌並輸出
while ((line = reader.readLine()) != null) {
System.out.println("輸出處理後的日誌: " + line);
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("日誌輸出線程結束");
});
// 啓動線程
collectorThread.start();
processorThread.start();
outputThread.start();
}
public void stop() {
stopped = true; // volatile保證可見性
System.out.println("日誌收集器停止中...");
}
public static void main(String[] args) throws InterruptedException, IOException {
LogCollector collector = new LogCollector();
collector.start();
// 運行10秒後停止
Thread.sleep(10000);
collector.stop();
}
}
這個案例綜合使用了多種線程間通信方式:
- BlockingQueue: 作為線程安全的日誌隊列,自動實現生產者-消費者模式
- Piped 流: 將處理後的日誌傳輸到輸出線程
- volatile 變量: 用作停止標誌控制線程終止
設計説明:這個實例展示瞭如何組合不同的線程間通信機制實現複雜功能:
- BlockingQueue 處理生產者-消費者數據傳遞(收集線程與處理線程)
- 管道通信處理字符流傳輸(處理線程與輸出線程)
- volatile 變量處理狀態同步(停止信號)
八、總結與常見問題
8.1 線程間通信方式總結
| 通信方式 | 核心 API | 使用場景 | 注意事項 |
|---|---|---|---|
| wait/notify | Object.wait()
Object.notify() Object.notifyAll() |
簡單同步
生產者-消費者 |
必須在 synchronized 中使用
使用 while 循環檢查條件 防止虛假喚醒 |
| Condition | lock.newCondition()
condition.await() condition.signal() |
複雜多條件
精確通知 |
必須與 Lock 配合使用
需手動加解鎖 使用 try/finally 保證鎖釋放 同樣需防範虛假喚醒 |
| 管道通信 | PipedInputStream
PipedOutputStream PipedReader PipedWriter |
數據傳輸
流處理 |
需要 connect 連接
單向通信 注意關閉資源 一寫一讀模式 |
| volatile | volatile 關鍵字 | 狀態標誌
一寫多讀 |
不保證原子性
適合簡單狀態同步 |
8.2 常見問題解答
-
wait()和 sleep()的區別是什麼?
- wait()釋放鎖,sleep()不釋放鎖
- wait()需要在 synchronized 塊中調用,sleep()不需要
- wait()需要被 notify()/notifyAll()喚醒,sleep()時間到自動恢復
- wait()是 Object 類方法,sleep()是 Thread 類方法
-
為什麼 wait()需要在 synchronized 塊中調用?
- 確保線程在檢查條件和調用 wait()期間持有鎖,避免競態條件
- 調用 wait()前必須獲得對象的監視器鎖,這是 JVM 層面的要求
- 確保線程放棄鎖並進入等待狀態的操作是原子的
-
如何處理虛假喚醒問題?
// 正確做法:使用while循環 synchronized (obj) { while (!condition) { // 循環檢查 obj.wait(); } // 處理條件滿足情況 } // 錯誤做法:使用if語句 synchronized (obj) { if (!condition) { // 只檢查一次 obj.wait(); } // 可能在條件仍不滿足時執行 } -
Condition 相比 wait/notify 的優勢在哪裏?
- 可以創建多個等待隊列,實現精確通知
- 可以實現不可中斷的等待(awaitUninterruptibly)
- 支持更靈活的超時控制(可指定時間單位)
- 與 ReentrantLock 結合可實現公平鎖
-
如何選擇合適的線程間通信方式?
- 簡單狀態同步:volatile 變量
- 一個等待條件:wait/notify
- 多個等待條件:Condition
- 數據流傳輸:管道通信
- 隊列操作:BlockingQueue
-
volatile 與 AtomicInteger 的區別?
- volatile 只保證可見性和有序性,不保證原子性
- AtomicInteger 通過 CAS(Compare-And-Swap)操作保證原子性
- 對於
i++這樣的操作,需要使用 AtomicInteger 而非 volatile - 兩者結合使用可以實現高效的線程安全代碼
-
管道通信與消息隊列有什麼區別?
- 管道是 Java 內置的線程間通信機制,限於單 JVM 內
- 消息隊列通常指分佈式消息系統(如 Kafka),可跨進程/服務器
- 管道適合輕量級的線程間流數據傳輸
- 消息隊列適合更大規模的分佈式系統組件間通信
感謝您耐心閲讀到這裏!如果覺得本文對您有幫助,歡迎點贊 👍、收藏 ⭐、分享給需要的朋友,您的支持是我持續輸出技術乾貨的最大動力!
如果想獲取更多 Java 技術深度解析,歡迎點擊頭像關注我,後續會每日更新高質量技術文章,陪您一起進階成長~