在 AQS (AbstractQueuedSynchronizer) 中,這些方法涉及到同步的獲取和排隊機制,它們實現了類似於鎖(Lock)和信號量(Semaphore)的功能。AQS 通過內部維護一個 FIFO 隊列和一些節點來管理線程的同步。下面逐個解釋這些方法的作用:
AQS 核心方法和源碼
1. acquire(int arg)
- 作用:嘗試獲取同步狀態,如果失敗,則加入隊列並阻塞線程。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
-
流程:
- 調用
tryAcquire(arg)嘗試獲取鎖。 - 如果獲取失敗,則調用
addWaiter()將線程加入隊列。 - 調用
acquireQueued()進行排隊等待。 - 如果等待過程中被中斷,調用
selfInterrupt()重新設置中斷狀態。
- 調用
2. tryAcquire(int arg)
- 作用:嘗試獲取同步狀態。這個方法是抽象方法,子類需要實現。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
- 注意:具體的同步工具(如
ReentrantLock)會重寫此方法來定義獲取同步狀態的邏輯。
3. addWaiter(Node.EXCLUSIVE)
- 作用:將當前線程封裝成一個
Node,並加入等待隊列末尾。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
-
流程:
- 封裝當前線程為一個
Node節點。 - 嘗試將節點加入等待隊列尾部,如果失敗則調用
enq()進行入隊。
- 封裝當前線程為一個
4. enq(Node node)
- 作用:將節點安全地加入等待隊列末尾(無限重試,直到成功)。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 隊列未初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
-
流程:
- 檢查隊列是否存在頭節點,如果不存在,初始化。
- 使用 CAS 操作將新節點加入隊列尾部,保證線程安全。
5. acquireQueued(Node node, int arg)
- 作用:讓線程在隊列中等待,直到獲取到同步狀態。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // 幫助 GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) cancelAcquire(node);
}
}
-
流程:
- 獲取前驅節點。
- 如果前驅節點是頭節點,則嘗試獲取同步狀態。
- 如果獲取失敗,調用
shouldParkAfterFailedAcquire()判斷是否需要掛起線程。 - 掛起線程並檢查中斷狀態。
6. release(int arg)
- 作用:釋放同步狀態,喚醒後繼線程。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) unparkSuccessor(h);
return true;
}
return false;
}
-
流程:
- 調用
tryRelease(arg)釋放同步狀態。 - 如果釋放成功,則喚醒等待隊列中的後繼線程。
- 調用
7. tryRelease(int arg)
- 作用:嘗試釋放同步狀態。這個方法是抽象方法,需要子類實現。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
- 注意:子類重寫此方法,定義釋放邏輯。
8. unparkSuccessor(Node node)
- 作用:喚醒等待隊列中的下一個線程。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
-
流程:
- 將當前節點的
waitStatus重置。 - 找到下一個需要喚醒的線程節點。
- 使用
LockSupport.unpark()喚醒線程。
- 將當前節點的
9. shouldParkAfterFailedAcquire()
- 作用:檢查節點的狀態,決定線程是否需要掛起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
總結核心方法
| 方法名 | 作用 |
|---|---|
acquire(int arg) |
獲取同步狀態,失敗則排隊等待。 |
tryAcquire(int arg) |
嘗試獲取同步狀態,子類實現。 |
addWaiter(Node.EXCLUSIVE) |
添加節點到等待隊列末尾。 |
enq(Node node) |
使用 CAS 操作將節點加入隊列末尾。 |
acquireQueued() |
線程在隊列中等待,直到獲取同步狀態。 |
release(int arg) |
釋放同步狀態並喚醒等待線程。 |
tryRelease(int arg) |
嘗試釋放同步狀態,子類實現。 |
unparkSuccessor(Node node) |
喚醒等待隊列中的下一個線程。 |
shouldParkAfterFailedAcquire() |
判斷線程是否需要掛起。 |
其他核心概念:
- Node 類:代表等待隊列中的線程節點。
-
狀態管理:
waitStatus:節點狀態,包括SIGNAL(等待喚醒)和CANCELLED等。head和tail:管理等待隊列的頭尾節點。
- CAS 操作:確保節點入隊等操作的線程安全。
這些方法共同配合,實現了 AQS 的底層同步控制機制,包括線程的排隊、阻塞、喚醒等功能。