概述
AQS ( Abstract Queued Synchronizer )是一個抽象的隊列同步器,通過維護一個共享資源狀態( Volatile Int State )來表示同步狀態 和一個先進先出( FIFO )的線程等待隊列來完成資源獲取的排隊工作,通過CAS完成對State值的修改。
AQS整體框架如下:
當有自定義同步器接入時,只需重寫第一層所需要的部分方法即可,不需要關注底層具體的實現流程。當自定義同步器進行加鎖或者解鎖操作時,先經過第一層的API進入AQS內部方法,然後經過第二層進行鎖的獲取,接着對於獲取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依賴於第五層的基礎數據提供層
原理
AQS 為每個共享資源都設置一個共享資源鎖,線程在需要訪問共享資源時首先需要獲取共享資源鎖,如果獲取到了共享資源鎖,便可以在當前線程中使用該共享資源,如果獲取不到,則將該線程放入線程等待隊列,等待下一次資源調度,流程圖如下所示:
Java中的大部分同步類(Lock、Semaphore、ReentrantLock等)都是基於AbstractQueuedSynchronizer(簡稱為AQS)實現的。
底層結構
state:狀態
Abstract Queued Synchronizer 維護了 volatile int 類型的變量,用於表示當前的同步狀態。volatile雖然不能保證操作的原子性,但是能保證當前變量state的可見性。
state的訪問方式有三種: getState()、setState()和 compareAndSetState(),均是原子操作,其中,compareAndSetState的實現依賴於 Unsafe的compareAndSwaplnt()
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
CLH隊列
Craig、Landin and Hagersten隊列,是單向鏈表,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是通過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。
AQS使用一個Volatile的int類型的成員變量來表示同步狀態,通過內置的FIFO隊列來完成資源獲取的排隊工作,通過CAS完成對State值的修改。
AQS的獨佔式和共享式
-
獨佔式:只有一個線程能執行,具體的 Java 實現有 ReentrantLock。
-
共享式:多個線程可同時執行,具體的 Java 實現有 Semaphore和CountDownLatch。
AQS只是一個框架 ,只定義了一個接口,具體資源的獲取、釋放都由自定義同步器去實現。不同的自定義同步器爭用共享資源的方式也不同,自定義同步器在實現時只需實現共享資源state的獲取與釋放方式即可,至於具體線程等待隊列的維護,如獲取資源失敗入隊、喚醒出隊等, AQS 已經在頂層實現好(就是模板方法模式),不需要具體的同步器再做處理。自定義同步器實現時主要實現以下幾種方法:
-
以ReentrantLock為例,ReentrantLock中的state初始值為0表示無鎖狀態。在線程執行 tryAcquire()獲取該鎖後ReentrantLock中的state+1,這時該線程獨佔ReentrantLock鎖,其他線程在通過tryAcquire() 獲取鎖時均會失敗,直到該線程釋放鎖後state再次為0,其他線程才有機會獲取該鎖。該線程在釋放鎖之前可以重複獲取此鎖,每獲取一次便會執行一次state+1, 因此ReentrantLock也屬於可重入鎖。 但獲取多少次鎖就要釋放多少次鎖,這樣才能保證state最終為0。如果獲取鎖的次數多於釋放鎖的次數,則會出現該線程一直持有該鎖的情況;如果獲取鎖的次數少於釋放鎖的次數,則運行中的程序會報鎖異常。
-
以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一致)。這N個子線程是並行執行的,每個子線程執行完後countDown()一次,state會CAS減1。等到所有子線程都執行完後(即state=0),會unpark()主調用線程,然後主調用線程就會從await()函數返回,繼續後面的動作。
-
以Semaphore為例,state則代表可以同時訪問的線程數量,也可能理解為訪問的許可證(permit)數量。每個線程訪問(acquire)時需要拿到對應的許可證,否則進行阻塞,訪問結束則返還(release)許可證。state只能在Semaphore的構造方法中進行初始化,後續不能進行修改。
一般來説,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。
Node節點
Node即為上面CLH變體隊列中的節點。
Node結點是每一個等待獲取資源的線程的封裝,其包含了需要同步的線程本身及其等待狀態waitStatus
Node中幾個方法和屬性值的含義:
-
waitStatus:當前節點在隊列中的狀態
-
thread:表示處於該節點的線程
-
prev:前驅指針
-
predecessor:返回前驅節點,沒有的話拋出npe
-
nextWaiter:指向下一個處於CONDITION狀態的節點(由於本篇文章不講述Condition Queue隊列,這個指針不多介紹)
-
next:後繼指針
等待狀態waitStatus
waitStatus有下面幾個枚舉值:如是否被阻塞、是否等待喚醒、是否已經被取消等。共有5種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。
-
CANCELLED(1):表示當前結點已取消調度,不再想去獲取資源了。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀態,進入該狀態後的結點將不會再變化。
-
SIGNAL(-1):表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新為SIGNAL。
-
CONDITION(-2):表示結點等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
-
PROPAGATE(-3):共享模式下,前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。
-
0:新結點入隊時的默認狀態。
注意,負值表示結點處於有效等待狀態,而正值表示結點已被取消。所以源碼中很多地方用>0、<0來判斷結點的狀態是否正常。
源碼
以ReentrantLock的非公平鎖為例,將加鎖和解鎖的交互流程單獨拎出來強調一下
加鎖:
- 通過ReentrantLock的加鎖方法Lock進行加鎖操作。
- 會調用到內部類 Sync的Lock方法,由於Sync#lock是抽象方法,根據 ReentrantLock初始化選擇的公平鎖和非公平鎖,執行相關內部類的Lock方法,本質上都會執行AQS的 Acquire 方法。
- AQS的 Acquire 方法會執行 tryAcquire 方法,但是由於tryAcquire需要自定義同步器實現,因此執行了ReentrantLock中的tryAcquire方法,由於ReentrantLock是通過公平鎖和非公平鎖內部類實現的tryAcquire方法,因此會根據鎖類型不同,執行不同的tryAcquire。
- tryAcquire是獲取鎖邏輯,獲取失敗後,會執行框架AQS的後續邏輯,跟ReentrantLock自定義同步器無關。
解鎖:
- 通過ReentrantLock的解鎖方法Unlock進行解鎖。
- Unlock會調用內部類Sync的Release方法,該方法繼承於AQS。
- Release中會調用tryRelease方法,tryRelease需要自定義同步器實現,tryRelease只在ReentrantLock中的Sync實現,因此可以看出,釋放鎖的過程,並不區分是否為公平鎖。
- 釋放成功後,所有處理由AQS框架完成,與自定義同步器無關。
acquire(int)
此方法是獨佔模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當然不僅僅只限於lock()。獲取到資源後,線程就可以去執行其臨界區代碼了。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
函數流程如下:
- tryAcquire()嘗試直接去獲取資源,如果成功則直接返回(這裏體現了非公平鎖,每個線程獲取鎖時會嘗試直接搶佔加塞一次,而CLH隊列中可能還有別的線程在等待);
- addWaiter()將該線程加入等待隊列的尾部,並標記為獨佔模式;
- acquireQueued()使線程阻塞在等待隊列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
- 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。
關於整個函數流程詳解,可以往下看
tryAcquire(int)
此方法嘗試去獲取獨佔資源。如果獲取成功,則直接返回true,否則直接返回false。這也正是tryLock()的語義,當然不僅僅只限於tryLock()。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
這裏是AQS的方法,所以直接throw異常,而沒有具體的實現。原因就在於AQS只是一個框架,具體資源的獲取/釋放方式交由自定義同步器去實現。
這裏之所以沒有定義成abstract,是因為獨佔模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實現另一模式下的接口。
ReentrantLock實現公平鎖非公平鎖則主要體現在tryAcquire的實現上:
公平鎖中實現的tryAcquire:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平鎖中實現的tryAcquire:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
-
公平鎖中多了一層 !hasQueuedPredecessors() 的判斷,這是公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。如果返回False,説明當前線程可以獲取共享資源;如果返回True,説明隊列中存在有效節點,當前線程必須加入到等待隊列中。
-
而在非公平鎖中,沒有這個判斷,直接嘗試獲取鎖,能獲取到鎖則不用加入等待隊列。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
這裏的判斷 h != t && ((s = h.next) == null || s.thread != Thread.currentThread());為什麼要判斷的頭結點的下一個節點?第一個節點儲存的數據是什麼?
雙向鏈表中,第一個節點為虛節點,其實並不存儲任何信息,只是佔位。真正的第一個有數據的節點,是在第二個節點開始的。當h != t時: 如果(s = h.next) == null,等待隊列正在有線程進行初始化,但只是進行到了Tail指向Head,沒有將Head指向Tail,此時隊列中有元素,需要返回True。 如果(s = h.next) != null,説明此時隊列中至少有一個有效節點。如果此時s.thread == Thread.currentThread(),説明等待隊列的第一個有效節點中的線程與當前線程相同,那麼當前線程是可以獲取資源的;如果s.thread != Thread.currentThread(),説明等待隊列的第一個有效節點線程與當前線程不同,當前線程必須加入進等待隊列。
addWaiter(Node)
此方法用於將當前線程加入到等待隊列的隊尾,並返回當前線程所在的結點。
private Node addWaiter(Node mode) {
//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速方式直接放到隊尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失敗則通過enq入隊。
enq(node);
return node;
}
主要的流程如下:
- 通過當前的線程和鎖模式新建一個節點。
- Pred指針指向尾節點Tail。
- 將New中Node的Prev指針指向Pred。
- 通過compareAndSetTail方法,完成尾節點的設置。這個方法主要是對tailOffset和Expect進行比較,如果tailOffset的Node和Expect的Node地址是相同的,那麼設置Tail的值為Update的值。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
static {
try {
stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception ex) {
throw new Error(ex);
}
}
從AQS的靜態代碼塊可以看出,都是獲取一個對象的屬性相對於該對象在內存當中的偏移量,這樣我們就可以根據這個偏移量在對象內存當中找到這個屬性。tailOffset指的是tail對應的偏移量,所以這個時候會將new出來的Node置為當前隊列的尾節點。同時,由於是雙向鏈表,也需要將前一個節點指向尾節點。
如果Pred指針是Null(説明等待隊列中沒有元素),或者當前Pred指針和Tail指向的位置不同(説明被別的線程已經修改),就需要enq入隊
private Node enq(final Node node) {
//CAS"自旋",直到成功加入隊尾
for (;;) {
Node t = tail;
if (t == null) { // 隊列為空,創建一個空的標誌結點作為head結點,並將tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果沒有被初始化,需要進行初始化一個頭結點出來。但請注意,初始化的頭結點並不是當前線程節點,而是調用了無參構造函數的節點。如果經歷了初始化或者併發導致隊列中有元素,則與之前的方法相同。其實,addWaiter就是一個在雙端鏈表添加尾節點的操作,需要注意的是,雙端鏈表的頭結點是一個無參構造函數的頭結點。
acquireQueued(Node, int)
通過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。addWaiter()返回的是一個包含該線程的Node。而這個Node會作為參數,進入到acquireQueued方法中。acquireQueued方法可以對排隊中的線程進行“獲鎖”操作。那麼下一步就是:如果獲取不到鎖,那麼就進入阻塞狀態休息,直到其他線程徹底釋放資源後喚醒自己,自己再拿到資源,然後就可以去幹自己想幹的事了。
acquireQueued:在等待隊列中排隊拿號(中間沒其它事幹可以阻塞休息),直到拿到號後再返回。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//標記是否成功拿到資源
try {
boolean interrupted = false;//標記等待過程中是否被中斷過
//CAS“自旋”!
for (;;) {
final Node p = node.predecessor();//拿到前驅
//如果前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取資源,也就是當前節點在真實數據隊列的首部,就嘗試獲取鎖(別忘了頭結點是虛節點)。
if (p == head && tryAcquire(arg)) {
setHead(node);// 獲取鎖成功,頭指針移動到當前node
p.next = null; // setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結點。也就意味着之前拿完資源的結點出隊了!
failed = false; // 成功獲取資源
return interrupted;//返回等待過程中是否被中斷過
}
// 説明p為頭節點且當前沒有獲取到鎖(可能是非公平鎖被搶佔了)或者 是p不為頭結點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus為-1),防止無限循環浪費資源。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待過程中被中斷過,哪怕只有那麼一次,就將interrupted標記為true
}
} finally {
if (failed) //説明發生了意料之外的異常,將節點移除,避免影響到其他節點
cancelAcquire(node);
}
}
setHead方法是把當前節點置為虛節點,但並沒有修改waitStatus,因為它是一直需要用的數據。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
acquireQueued函數的具體流程:
從上圖可以看出,跳出當前循環的條件是當“前置節點是頭結點,且當前線程獲取鎖成功”。為了防止因死循環導致CPU資源被浪費,我們會判斷前置節點的狀態來決定是否要將當前線程掛起,shouldParkAfterFailedAcquire代碼:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 靠前驅節點判斷當前線程是否應該被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取頭結點的節點狀態
int ws = pred.waitStatus;
// 説明頭結點處於喚醒狀態
if (ws == Node.SIGNAL)
return true;
// 通過枚舉值我們知道waitStatus>0是取消狀態
if (ws > 0) {
do {
// 循環向前查找取消節點,把取消節點從隊列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 設置前任節點等待狀態為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt主要用於掛起當前線程,阻塞調用棧,返回當前線程的中斷狀態。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//調用park()使線程進入waiting狀態
return Thread.interrupted();//如果被喚醒,查看自己是不是被中斷的。
}
具體掛起流程用流程圖表示如下(shouldParkAfterFailedAcquire流程):
整個流程中,如果前驅結點的狀態不是SIGNAL,那麼自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿號。
park()會讓當前線程進入waiting狀態。在此狀態下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()會清除當前線程的中斷標記位。
那麼shouldParkAfterFailedAcquire中取消節點是怎麼生成的呢?什麼時候會把一個節點的waitStatus設置為-1?
是在什麼時間釋放節點通知到被掛起的線程呢?
CANCELLED狀態節點生成
回看acquireQueued方法中的Finally代碼:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
...
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
...
failed = false;
...
}
...
} finally {
if (failed)
cancelAcquire(node);
}
}
顯然,當failed為true時才會執行方法cancelAcquire,那什麼情況下failed為true呢?try代碼段執行過程中出現異常。
這裏不知道哪裏會出現異常?假設tryAcquire出現的異常,那麼acquire方法就已經不會往後執行,也就不會執行到acquireQueued
通過cancelAcquire方法,將Node的狀態標記為CANCELLED。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void cancelAcquire(Node node) {
// 將無效節點過濾
if (node == null)
return;
// 設置該節點不關聯任何線程,也就是虛節點
node.thread = null;
Node pred = node.prev;
// 通過前驅節點,跳過取消狀態的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 獲取過濾後的前驅節點的後繼節點
Node predNext = pred.next;
// 把當前node的狀態設置為CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果當前節點是尾節點,將從後往前的第一個非取消狀態的節點設置為尾節點
// 更新失敗的話,則進入else,如果更新成功,將tail的後繼節點設置為null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果當前節點不是head的後繼節點,1:判斷當前節點前驅節點的是否為SIGNAL,2:如果不是,則把前驅節點設置為SINGAL看是否成功
// 如果1和2中有一個為true,再判斷當前節點的線程是否為null
// 如果上述條件都滿足,把當前節點的前驅節點的後繼指針指向當前節點的後繼節點
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果當前節點是head的後繼節點,或者上述條件不滿足,那就喚醒當前節點的後繼節點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
cancelAcquire方法的流程:
-
獲取當前節點的前驅節點,如果前驅節點的狀態是CANCELLED,那就一直往前遍歷,找到第一個waitStatus <= 0的節點,將找到的Pred節點和當前Node關聯,將當前Node設置為CANCELLED。
-
根據當前節點的位置,考慮以下三種情況:
-
當前節點是尾節點。
-
當前節點是Head的後繼節點。
-
當前節點不是Head的後繼節點,也不是尾節點。
-
當前節點是尾節點:
當前節點是Head的後繼節點:
當前節點不是Head的後繼節點,也不是尾節點:
通過上面的流程,我們對於CANCELLED節點狀態的產生和變化已經有了大致的瞭解,但是為什麼所有的變化都是對Next指針進行了操作,而沒有對Prev指針進行操作呢?什麼情況下會對Prev指針進行操作?
執行cancelAcquire的時候,當前節點的前置節點可能已經從隊列中出去了(已經執行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),如果此時修改Prev指針,有可能會導致Prev指向另一個已經移除隊列的Node,因此這塊變化Prev指針不安全。
shouldParkAfterFailedAcquire方法中,會執行下面的代碼,其實就是在處理Prev指針。shouldParkAfterFailedAcquire是獲取鎖失敗的情況下才會執行,進入該方法後,説明共享資源已被獲取,當前節點之前的節點都不會出現變化,因此這個時候變更Prev指針比較安全。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
release(int)
此方法是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列裏的其他線程來獲取資源。這也正是unlock()的語義,當然不僅僅只限於unlock()。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到頭結點
// 頭結點不為空並且頭結點的waitStatus不是初始化節點情況,解除線程掛起狀態
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//喚醒等待隊列裏的下一個線程
return true;
}
return false;
}
根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自定義同步器在設計tryRelease()
這裏的判斷條件為什麼是h != null && h.waitStatus != 0?
-
h null Head還沒初始化。初始情況下,head null,第一個節點入隊,Head會被初始化一個虛擬節點。所以説,這裏如果還沒來得及入隊,就會出現head == null 的情況。
-
h != null && waitStatus == 0 表明後繼節點對應的線程仍在運行中,不需要喚醒。
-
h != null && waitStatus < 0 表明後繼節點可能被阻塞了,需要喚醒。
tryRelease(int)
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
跟tryAcquire()一樣,這個方法是需要獨佔模式的自定義同步器去實現的。正常來説,tryRelease()都會成功的,因為這是獨佔模式,該線程來釋放資源,那麼它肯定已經拿到獨佔資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自義定同步器在實現時,如果已經徹底釋放資源(state=0),要返回true,否則返回false。
// java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//在未重入的情況下,getState() = 1,減去releases 1,因此c 為 0
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);//獨佔鎖線程設置為null
}
setState(c);//恢復默認
return free;
}
unparkSuccessor(Node)
此方法用於喚醒等待隊列中下一個線程。
private void unparkSuccessor(Node node) {
//這裏,node一般為當前線程所在的結點。
int ws = node.waitStatus;
if (ws < 0)//置零當前線程所在的結點狀態,允許失敗。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一個需要喚醒的結點s
if (s == null || s.waitStatus > 0) {//如果為空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 從後向前找。
if (t.waitStatus <= 0)//從這裏可以看出,<=0的結點,都是還有效的結點。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒
}
這個函數並不複雜。一句話概括:用unpark()喚醒等待隊列中最前邊的那個未放棄線程s。此時,再和acquireQueued()聯繫起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關係,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裏既然s已經是等待隊列中最前邊的那個未放棄線程了,那麼通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立了),然後s把自己設置成head標杆結點,表示自己已經獲取到資源了,acquire()也返回了!
在隊列中查找時是從後向前找的,為什麼這麼做?
從源碼上看,先找到後繼結點s,如果s狀態正常那麼直接喚醒。但有兩種異常情況,會導致next鏈不一致:
- s==null,在新結點入隊時可能會出現
- s.waitStatus > 0,中間有節點取消時會出現(如超時)
關於併發問題,addWaiter()入隊操作和cancelAcquire()取消排隊操作都會造成next鏈的不一致,而prev鏈是強一致的,所以這時從後往前找是最安全的。
為什麼prev鏈是強一致的?
因為addWaiter()裏每次compareAndSetTail(pred, node)之前都有node.prev = pred,即使compareAndSetTail失敗,enq()會反覆嘗試,直到成功。一旦compareAndSetTail成功,該node.prev就成功掛在之前的tail結點上了,而且是唯一的,這時其他新結點的prev只能嘗試往新tail結點上掛。這裏的組合用法非常巧妙,能保證CAS之前的prev鏈強一致,但不能保證CAS後的next鏈強一致。
acquireShared(int)
此方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源為止,整個過程忽略中斷。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
這裏tryAcquireShared()依然需要自定義同步器去實現。但是AQS已經把其返回值的語義定義好了:負值代表獲取失敗;0代表獲取成功,但沒有剩餘資源;正數表示獲取成功,還有剩餘資源,其他線程還可以去獲取。所以這裏acquireShared()的流程就是:
- tryAcquireShared()嘗試獲取資源,成功則直接返回;
- 失敗則通過doAcquireShared()進入等待隊列,直到獲取到資源為止才返回。
doAcquireShared(int)
此方法用於將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源後才返回。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入隊列尾部
boolean failed = true;//是否成功標誌
try {
boolean interrupted = false;//等待過程中是否被中斷過的標誌
for (;;) {
final Node p = node.predecessor();//前驅
if (p == head) {//如果到head的下一個,因為head是拿到資源的線程,此時node被喚醒,很可能是head用完資源來喚醒自己的
int r = tryAcquireShared(arg);//嘗試獲取資源
if (r >= 0) {//成功
setHeadAndPropagate(node, r);//將head指向自己,還有剩餘資源可以再喚醒之後的線程
p.next = null; // help GC
if (interrupted)//如果等待過程中被打斷過,此時將中斷補上。
selfInterrupt();
failed = false;
return;
}
}
//判斷狀態,尋找安全點,進入waiting狀態,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這裏跟acquireQueued()的流程並沒有太大區別。只不過這裏將補中斷的selfInterrupt()放到doAcquireShared()裏了,而獨佔模式是放到acquireQueued()之外,但實際上都一樣。
跟獨佔模式比,還有一點需要注意的是,這裏只有線程是head.next時(“老二”),才會去嘗試獲取資源,有剩餘的話還會喚醒之後的隊友。
那麼問題就來了,假如老大用完後釋放了5個資源,而老二需要6個,老三需要1個,老四需要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會繼續park()等待其他線程釋放資源,也更不會去喚醒老三和老四了。獨佔模式,同一時刻只有一個線程去執行,這樣做未嘗不可;但共享模式下,多個線程是可以同時執行的,現在因為老二的資源需求量大,而把後面量小的老三和老四也都卡住了。當然,這並不是問題,只是AQS保證嚴格按照入隊順序喚醒罷了(保證公平,但降低了併發)。
setHeadAndPropagate(Node, int):此方法在setHead()的基礎上多了一步,就是自己甦醒的同時,如果條件符合(比如還有剩餘資源),還會去喚醒後繼結點,畢竟是共享模式!
private void setHeadAndPropagate(Node node, int propagate) {
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//head指向自己
//如果還有剩餘量,繼續喚醒下一個鄰居線程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
releaseShared()
此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列裏的其他線程來獲取資源。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//嘗試釋放資源
doReleaseShared();//喚醒後繼結點
return true;
}
return false;
}
此方法的流程也比較簡單,一句話:釋放掉資源後,喚醒後繼。跟獨佔模式下的release()相似,但有一點稍微需要注意:獨佔模式下的tryRelease()在完全釋放掉資源(state=0)後,才會返回true去喚醒其他線程,這主要是基於獨佔下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的線程併發執行,那麼擁有資源的線程在釋放掉部分資源時就可以喚醒後繼等待結點。例如,資源總量是13,A(5)和B(7)分別獲取到資源併發運行,C(4)來時只剩1個資源就需要等待。A在運行過程中釋放掉2個資源量,然後tryReleaseShared(2)返回true喚醒C,C一看只有3個仍不夠繼續等待;隨後B又釋放2個,tryReleaseShared(2)返回true喚醒C,C一看有5個夠自己用了,然後C就可以跟A和B一起運行。而ReentrantReadWriteLock讀鎖的tryReleaseShared()只有在完全釋放掉資源(state=0)才返回true,所以自定義同步器可以根據需要決定tryReleaseShared()的返回值
doReleaseShared()
此方法主要用於喚醒後繼
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//喚醒後繼
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)// head發生變化
break;
}
}
應用
Mutex是一個不可重入的互斥鎖實現。鎖資源(AQS裏的state)只有兩種狀態:0表示未鎖定,1表示鎖定。核心源碼:
class Mutex implements Lock, java.io.Serializable {
// 自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 判斷是否鎖定狀態
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 嘗試獲取資源,立即返回。成功則返回true,否則false。
public boolean tryAcquire(int acquires) {
assert acquires == 1; // 這裏限定只能為1個量
if (compareAndSetState(0, 1)) {//state為0才設置為1,不可重入!
setExclusiveOwnerThread(Thread.currentThread());//設置為當前線程獨佔資源
return true;
}
return false;
}
// 嘗試釋放資源,立即返回。成功則為true,否則false。
protected boolean tryRelease(int releases) {
assert releases == 1; // 限定為1個量
if (getState() == 0)//既然來釋放,那肯定就是已佔有狀態了。只是為了保險,多層判斷!
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);//釋放資源,放棄佔有狀態
return true;
}
}
// 真正同步類的實現都依賴繼承於AQS的自定義同步器!
private final Sync sync = new Sync();
//lock<-->acquire。兩者語義一樣:獲取資源,即便等待,直到成功才返回。
public void lock() {
sync.acquire(1);
}
//tryLock<-->tryAcquire。兩者語義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。
public boolean tryLock() {
return sync.tryAcquire(1);
}
//unlock<-->release。兩者語文一樣:釋放資源。
public void unlock() {
sync.release(1);
}
//鎖是否佔有狀態
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實現方式都差不多,不同的地方就在獲取-釋放資源的方式tryAcquire-tryRelelase。
ReentrantLock 的使用
ReentrantLock 的使用方式與 synchronized 關鍵字類似,都是通過加鎖和釋放鎖來實現同步的。我們來看看 ReentrantLock 的使用方式,以非公平鎖為例:
public class ReentrantLockTest {
private static final ReentrantLock lock = new ReentrantLock();
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
代碼很簡單,兩個線程分別對 count 變量進行 10000 次累加操作,最後輸出 count 的值。我們來看看運行結果:
20000
可以看到,兩個線程對 count 變量進行了 20000 次累加操作,説明 ReentrantLock 是支持重入性的。再來看看公平鎖的使用方式,只需要將 ReentrantLock 的構造方法改為公平鎖即可:
private static final ReentrantLock lock = new ReentrantLock(true);
運行結果為:
20000
可以看到,公平鎖的運行結果與非公平鎖的運行結果一致,這是因為公平鎖的實現方式與非公平鎖的實現方式基本一致,只是在獲取鎖時增加了判斷當前節點是否有前驅節點的邏輯判斷。
- 公平鎖: 按照線程請求鎖的順序獲取鎖,即先到先得。
- 非公平鎖: 線程獲取鎖的順序可能與請求鎖的順序不同,可能導致某些線程獲取鎖的速度較快。
需要注意的是,使用 ReentrantLock 時,鎖必須在 try 代碼塊開始之前獲取,並且加鎖之前不能有異常拋出,否則在 finally 塊中就無法釋放鎖(ReentrantLock 的鎖必須在 finally 中手動釋放)。
錯誤示例:
Lock lock = new XxxLock();
// ...
try {
// 如果在此拋出異常,會直接執行 finally 塊的代碼
doSomething();
// 不管鎖是否成功,finally 塊都會執行
lock.lock();
doOthers();
} finally {
lock.unlock();
}
正確示例:
Lock lock = new XxxLock();
// ...
lock.lock();
try {
doSomething();
doOthers();
} finally {
lock.unlock();
}