博客 / 詳情

返回

AQS深度探索:以ReentrantLock看Java併發編程的高效實現

概述

AQS ( Abstract Queued Synchronizer )是一個抽象的隊列同步器,通過維護一個共享資源狀態( Volatile Int State )來表示同步狀態 和一個先進先出( FIFO )的線程等待隊列來完成資源獲取的排隊工作,通過CAS完成對State值的修改。

AQS整體框架如下:

當有自定義同步器接入時,只需重寫第一層所需要的部分方法即可,不需要關注底層具體的實現流程。當自定義同步器進行加鎖或者解鎖操作時,先經過第一層的API進入AQS內部方法,然後經過第二層進行鎖的獲取,接着對於獲取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依賴於第五層的基礎數據提供層

原理

AQS 為每個共享資源都設置一個共享資源鎖,線程在需要訪問共享資源時首先需要獲取共享資源鎖,如果獲取到了共享資源鎖,便可以在當前線程中使用該共享資源,如果獲取不到,則將該線程放入線程等待隊列,等待下一次資源調度,流程圖如下所示:

Java中的大部分同步類(Lock、Semaphore、ReentrantLock等)都是基於AbstractQueuedSynchronizer(簡稱為AQS)實現的。

底層結構

state:狀態

Abstract Queued Synchronizer 維護了 volatile int 類型的變量,用於表示當前的同步狀態。volatile雖然不能保證操作的原子性,但是能保證當前變量state的可見性。

state的訪問方式有三種: getState()、setState()和 compareAndSetState(),均是原子操作,其中,compareAndSetState的實現依賴於 Unsafe的compareAndSwaplnt()

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private volatile int state;

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

CLH隊列

Craig、Landin and Hagersten隊列,是單向鏈表,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是通過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。

AQS使用一個Volatile的int類型的成員變量來表示同步狀態,通過內置的FIFO隊列來完成資源獲取的排隊工作,通過CAS完成對State值的修改。

AQS的獨佔式和共享式

  • 獨佔式:只有一個線程能執行,具體的 Java 實現有 ReentrantLock。

  • 共享式:多個線程可同時執行,具體的 Java 實現有 Semaphore和CountDownLatch。

AQS只是一個框架 ,只定義了一個接口,具體資源的獲取、釋放都由自定義同步器去實現。不同的自定義同步器爭用共享資源的方式也不同,自定義同步器在實現時只需實現共享資源state的獲取與釋放方式即可,至於具體線程等待隊列的維護,如獲取資源失敗入隊、喚醒出隊等, AQS 已經在頂層實現好(就是模板方法模式),不需要具體的同步器再做處理。自定義同步器實現時主要實現以下幾種方法:

  • 以ReentrantLock為例,ReentrantLock中的state初始值為0表示無鎖狀態。在線程執行 tryAcquire()獲取該鎖後ReentrantLock中的state+1,這時該線程獨佔ReentrantLock鎖,其他線程在通過tryAcquire() 獲取鎖時均會失敗,直到該線程釋放鎖後state再次為0,其他線程才有機會獲取該鎖。該線程在釋放鎖之前可以重複獲取此鎖,每獲取一次便會執行一次state+1, 因此ReentrantLock也屬於可重入鎖。 但獲取多少次鎖就要釋放多少次鎖,這樣才能保證state最終為0。如果獲取鎖的次數多於釋放鎖的次數,則會出現該線程一直持有該鎖的情況;如果獲取鎖的次數少於釋放鎖的次數,則運行中的程序會報鎖異常。

  • 以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一致)。這N個子線程是並行執行的,每個子線程執行完後countDown()一次,state會CAS減1。等到所有子線程都執行完後(即state=0),會unpark()主調用線程,然後主調用線程就會從await()函數返回,繼續後面的動作。

  • 以Semaphore為例,state則代表可以同時訪問的線程數量,也可能理解為訪問的許可證(permit)數量。每個線程訪問(acquire)時需要拿到對應的許可證,否則進行阻塞,訪問結束則返還(release)許可證。state只能在Semaphore的構造方法中進行初始化,後續不能進行修改。

一般來説,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。

Node節點

Node即為上面CLH變體隊列中的節點。

Node結點是每一個等待獲取資源的線程的封裝,其包含了需要同步的線程本身及其等待狀態waitStatus

Node中幾個方法和屬性值的含義:

  • waitStatus:當前節點在隊列中的狀態

  • thread:表示處於該節點的線程

  • prev:前驅指針

  • predecessor:返回前驅節點,沒有的話拋出npe

  • nextWaiter:指向下一個處於CONDITION狀態的節點(由於本篇文章不講述Condition Queue隊列,這個指針不多介紹)

  • next:後繼指針

等待狀態waitStatus

waitStatus有下面幾個枚舉值:如是否被阻塞、是否等待喚醒、是否已經被取消等。共有5種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。

  • CANCELLED(1):表示當前結點已取消調度,不再想去獲取資源了。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀態,進入該狀態後的結點將不會再變化。

  • SIGNAL(-1):表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新為SIGNAL。

  • CONDITION(-2):表示結點等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。

  • PROPAGATE(-3):共享模式下,前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。

  • 0:新結點入隊時的默認狀態。

注意,負值表示結點處於有效等待狀態,而正值表示結點已被取消。所以源碼中很多地方用>0、<0來判斷結點的狀態是否正常。

源碼

以ReentrantLock的非公平鎖為例,將加鎖和解鎖的交互流程單獨拎出來強調一下

加鎖:

  1. 通過ReentrantLock的加鎖方法Lock進行加鎖操作。
  2. 會調用到內部類 Sync的Lock方法,由於Sync#lock是抽象方法,根據 ReentrantLock初始化選擇的公平鎖和非公平鎖,執行相關內部類的Lock方法,本質上都會執行AQS的 Acquire 方法。
  3. AQS的 Acquire 方法會執行 tryAcquire 方法,但是由於tryAcquire需要自定義同步器實現,因此執行了ReentrantLock中的tryAcquire方法,由於ReentrantLock是通過公平鎖和非公平鎖內部類實現的tryAcquire方法,因此會根據鎖類型不同,執行不同的tryAcquire。
  4. tryAcquire是獲取鎖邏輯,獲取失敗後,會執行框架AQS的後續邏輯,跟ReentrantLock自定義同步器無關。

解鎖:

  1. 通過ReentrantLock的解鎖方法Unlock進行解鎖。
  2. Unlock會調用內部類Sync的Release方法,該方法繼承於AQS。
  3. Release中會調用tryRelease方法,tryRelease需要自定義同步器實現,tryRelease只在ReentrantLock中的Sync實現,因此可以看出,釋放鎖的過程,並不區分是否為公平鎖。
  4. 釋放成功後,所有處理由AQS框架完成,與自定義同步器無關。

acquire(int)

此方法是獨佔模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當然不僅僅只限於lock()。獲取到資源後,線程就可以去執行其臨界區代碼了。

public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
}

函數流程如下:

  1. tryAcquire()嘗試直接去獲取資源,如果成功則直接返回(這裏體現了非公平鎖,每個線程獲取鎖時會嘗試直接搶佔加塞一次,而CLH隊列中可能還有別的線程在等待);
  2. addWaiter()將該線程加入等待隊列的尾部,並標記為獨佔模式;
  3. acquireQueued()使線程阻塞在等待隊列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
  4. 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

關於整個函數流程詳解,可以往下看

tryAcquire(int)

此方法嘗試去獲取獨佔資源。如果獲取成功,則直接返回true,否則直接返回false。這也正是tryLock()的語義,當然不僅僅只限於tryLock()。

protected boolean tryAcquire(int arg) {
     throw new UnsupportedOperationException();
}

這裏是AQS的方法,所以直接throw異常,而沒有具體的實現。原因就在於AQS只是一個框架,具體資源的獲取/釋放方式交由自定義同步器去實現。

這裏之所以沒有定義成abstract,是因為獨佔模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實現另一模式下的接口。

ReentrantLock實現公平鎖非公平鎖則主要體現在tryAcquire的實現上:

公平鎖中實現的tryAcquire:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
           if (!hasQueuedPredecessors() &&  //公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
           }
     }
     else if (current == getExclusiveOwnerThread()) {
           int nextc = c + acquires;
           if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
     }
     return false;
}

非公平鎖中實現的tryAcquire:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
     final Thread current = Thread.currentThread();
     int c = getState();
     if (c == 0) {
           if (compareAndSetState(0, acquires)) {
               setExclusiveOwnerThread(current);
               return true;
           }
      }
      else if (current == getExclusiveOwnerThread()) {
           int nextc = c + acquires;
           if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
      }
      return false;
}
  • 公平鎖中多了一層 !hasQueuedPredecessors() 的判斷,這是公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。如果返回False,説明當前線程可以獲取共享資源;如果返回True,説明隊列中存在有效節點,當前線程必須加入到等待隊列中。

  • 而在非公平鎖中,沒有這個判斷,直接嘗試獲取鎖,能獲取到鎖則不用加入等待隊列。

public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

這裏的判斷 h != t && ((s = h.next) == null || s.thread != Thread.currentThread());為什麼要判斷的頭結點的下一個節點?第一個節點儲存的數據是什麼?

雙向鏈表中,第一個節點為虛節點,其實並不存儲任何信息,只是佔位。真正的第一個有數據的節點,是在第二個節點開始的。當h != t時: 如果(s = h.next) == null,等待隊列正在有線程進行初始化,但只是進行到了Tail指向Head,沒有將Head指向Tail,此時隊列中有元素,需要返回True。 如果(s = h.next) != null,説明此時隊列中至少有一個有效節點。如果此時s.thread == Thread.currentThread(),説明等待隊列的第一個有效節點中的線程與當前線程相同,那麼當前線程是可以獲取資源的;如果s.thread != Thread.currentThread(),説明等待隊列的第一個有效節點線程與當前線程不同,當前線程必須加入進等待隊列。

addWaiter(Node)

此方法用於將當前線程加入到等待隊列的隊尾,並返回當前線程所在的結點。

private Node addWaiter(Node mode) {
    //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);

    //嘗試快速方式直接放到隊尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    //上一步失敗則通過enq入隊。
    enq(node);
    return node;
}

主要的流程如下:

  1. 通過當前的線程和鎖模式新建一個節點。
  2. Pred指針指向尾節點Tail。
  3. 將New中Node的Prev指針指向Pred。
  4. 通過compareAndSetTail方法,完成尾節點的設置。這個方法主要是對tailOffset和Expect進行比較,如果tailOffset的Node和Expect的Node地址是相同的,那麼設置Tail的值為Update的值。
// java.util.concurrent.locks.AbstractQueuedSynchronizer

static {
    try {
        stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
    } catch (Exception ex) { 
    throw new Error(ex); 
  }
}

從AQS的靜態代碼塊可以看出,都是獲取一個對象的屬性相對於該對象在內存當中的偏移量,這樣我們就可以根據這個偏移量在對象內存當中找到這個屬性。tailOffset指的是tail對應的偏移量,所以這個時候會將new出來的Node置為當前隊列的尾節點。同時,由於是雙向鏈表,也需要將前一個節點指向尾節點。

如果Pred指針是Null(説明等待隊列中沒有元素),或者當前Pred指針和Tail指向的位置不同(説明被別的線程已經修改),就需要enq入隊

private Node enq(final Node node) {
    //CAS"自旋",直到成功加入隊尾
    for (;;) {
        Node t = tail;
        if (t == null) { // 隊列為空,創建一個空的標誌結點作為head結點,並將tail也指向它。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//正常流程,放入隊尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

如果沒有被初始化,需要進行初始化一個頭結點出來。但請注意,初始化的頭結點並不是當前線程節點,而是調用了無參構造函數的節點。如果經歷了初始化或者併發導致隊列中有元素,則與之前的方法相同。其實,addWaiter就是一個在雙端鏈表添加尾節點的操作,需要注意的是,雙端鏈表的頭結點是一個無參構造函數的頭結點。

acquireQueued(Node, int)

通過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。addWaiter()返回的是一個包含該線程的Node。而這個Node會作為參數,進入到acquireQueued方法中。acquireQueued方法可以對排隊中的線程進行“獲鎖”操作。那麼下一步就是:如果獲取不到鎖,那麼就進入阻塞狀態休息,直到其他線程徹底釋放資源後喚醒自己,自己再拿到資源,然後就可以去幹自己想幹的事了。

acquireQueued:在等待隊列中排隊拿號(中間沒其它事幹可以阻塞休息),直到拿到號後再返回。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//標記是否成功拿到資源
    try {
        boolean interrupted = false;//標記等待過程中是否被中斷過

        //CAS“自旋”!
        for (;;) {
            final Node p = node.predecessor();//拿到前驅
            //如果前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取資源,也就是當前節點在真實數據隊列的首部,就嘗試獲取鎖(別忘了頭結點是虛節點)。
            if (p == head && tryAcquire(arg)) {
                setHead(node);// 獲取鎖成功,頭指針移動到當前node
                p.next = null; // setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結點。也就意味着之前拿完資源的結點出隊了!
                failed = false; // 成功獲取資源
                return interrupted;//返回等待過程中是否被中斷過
            }

            // 説明p為頭節點且當前沒有獲取到鎖(可能是非公平鎖被搶佔了)或者 是p不為頭結點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus為-1),防止無限循環浪費資源。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;//如果等待過程中被中斷過,哪怕只有那麼一次,就將interrupted標記為true
        }
    } finally {
        if (failed) //説明發生了意料之外的異常,將節點移除,避免影響到其他節點
            cancelAcquire(node);
    }
}

setHead方法是把當前節點置為虛節點,但並沒有修改waitStatus,因為它是一直需要用的數據。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

acquireQueued函數的具體流程:

從上圖可以看出,跳出當前循環的條件是當“前置節點是頭結點,且當前線程獲取鎖成功”。為了防止因死循環導致CPU資源被浪費,我們會判斷前置節點的狀態來決定是否要將當前線程掛起,shouldParkAfterFailedAcquire代碼:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

// 靠前驅節點判斷當前線程是否應該被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 獲取頭結點的節點狀態
        int ws = pred.waitStatus;
        // 説明頭結點處於喚醒狀態
        if (ws == Node.SIGNAL)
            return true; 
        // 通過枚舉值我們知道waitStatus>0是取消狀態
        if (ws > 0) {
            do {
                // 循環向前查找取消節點,把取消節點從隊列中剔除
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 設置前任節點等待狀態為SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
}

parkAndCheckInterrupt主要用於掛起當前線程,阻塞調用棧,返回當前線程的中斷狀態。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//調用park()使線程進入waiting狀態
    return Thread.interrupted();//如果被喚醒,查看自己是不是被中斷的。
}

具體掛起流程用流程圖表示如下(shouldParkAfterFailedAcquire流程):

整個流程中,如果前驅結點的狀態不是SIGNAL,那麼自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿號。

park()會讓當前線程進入waiting狀態。在此狀態下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()會清除當前線程的中斷標記位。

那麼shouldParkAfterFailedAcquire中取消節點是怎麼生成的呢?什麼時候會把一個節點的waitStatus設置為-1?

是在什麼時間釋放節點通知到被掛起的線程呢?

CANCELLED狀態節點生成

回看acquireQueued方法中的Finally代碼:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
        ...
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    ...
                    failed = false;
                    ...
                }
                ...
        } finally {
            if (failed)
                cancelAcquire(node);
            }
}

顯然,當failed為true時才會執行方法cancelAcquire,那什麼情況下failed為true呢?try代碼段執行過程中出現異常。

這裏不知道哪裏會出現異常?假設tryAcquire出現的異常,那麼acquire方法就已經不會往後執行,也就不會執行到acquireQueued

通過cancelAcquire方法,將Node的狀態標記為CANCELLED。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void cancelAcquire(Node node) {
  // 將無效節點過濾
    if (node == null)
        return;
  // 設置該節點不關聯任何線程,也就是虛節點
    node.thread = null;
    Node pred = node.prev;
  // 通過前驅節點,跳過取消狀態的node
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
  // 獲取過濾後的前驅節點的後繼節點
    Node predNext = pred.next;
  // 把當前node的狀態設置為CANCELLED
    node.waitStatus = Node.CANCELLED;
  // 如果當前節點是尾節點,將從後往前的第一個非取消狀態的節點設置為尾節點
  // 更新失敗的話,則進入else,如果更新成功,將tail的後繼節點設置為null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
    // 如果當前節點不是head的後繼節點,1:判斷當前節點前驅節點的是否為SIGNAL,2:如果不是,則把前驅節點設置為SINGAL看是否成功
    // 如果1和2中有一個為true,再判斷當前節點的線程是否為null
    // 如果上述條件都滿足,把當前節點的前驅節點的後繼指針指向當前節點的後繼節點
        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
      // 如果當前節點是head的後繼節點,或者上述條件不滿足,那就喚醒當前節點的後繼節點
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

cancelAcquire方法的流程:

  1. 獲取當前節點的前驅節點,如果前驅節點的狀態是CANCELLED,那就一直往前遍歷,找到第一個waitStatus <= 0的節點,將找到的Pred節點和當前Node關聯,將當前Node設置為CANCELLED。

  2. 根據當前節點的位置,考慮以下三種情況:

    1. 當前節點是尾節點。

    2. 當前節點是Head的後繼節點。

    3. 當前節點不是Head的後繼節點,也不是尾節點。

當前節點是尾節點:

當前節點是Head的後繼節點:

當前節點不是Head的後繼節點,也不是尾節點:

通過上面的流程,我們對於CANCELLED節點狀態的產生和變化已經有了大致的瞭解,但是為什麼所有的變化都是對Next指針進行了操作,而沒有對Prev指針進行操作呢?什麼情況下會對Prev指針進行操作?

執行cancelAcquire的時候,當前節點的前置節點可能已經從隊列中出去了(已經執行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),如果此時修改Prev指針,有可能會導致Prev指向另一個已經移除隊列的Node,因此這塊變化Prev指針不安全。

shouldParkAfterFailedAcquire方法中,會執行下面的代碼,其實就是在處理Prev指針。shouldParkAfterFailedAcquire是獲取鎖失敗的情況下才會執行,進入該方法後,説明共享資源已被獲取,當前節點之前的節點都不會出現變化,因此這個時候變更Prev指針比較安全。

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

release(int)

此方法是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列裏的其他線程來獲取資源。這也正是unlock()的語義,當然不僅僅只限於unlock()。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到頭結點
        // 頭結點不為空並且頭結點的waitStatus不是初始化節點情況,解除線程掛起狀態
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//喚醒等待隊列裏的下一個線程
        return true;
    }
    return false;
}

根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自定義同步器在設計tryRelease()

這裏的判斷條件為什麼是h != null && h.waitStatus != 0?

  • h null Head還沒初始化。初始情況下,head null,第一個節點入隊,Head會被初始化一個虛擬節點。所以説,這裏如果還沒來得及入隊,就會出現head == null 的情況。

  • h != null && waitStatus == 0 表明後繼節點對應的線程仍在運行中,不需要喚醒。

  • h != null && waitStatus < 0 表明後繼節點可能被阻塞了,需要喚醒。

tryRelease(int)

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

跟tryAcquire()一樣,這個方法是需要獨佔模式的自定義同步器去實現的。正常來説,tryRelease()都會成功的,因為這是獨佔模式,該線程來釋放資源,那麼它肯定已經拿到獨佔資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自義定同步器在實現時,如果已經徹底釋放資源(state=0),要返回true,否則返回false。

// java.util.concurrent.locks.ReentrantLock.Sync#tryRelease

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;//在未重入的情況下,getState() = 1,減去releases 1,因此c 為 0
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);//獨佔鎖線程設置為null
    }
    setState(c);//恢復默認
    return free;
}

unparkSuccessor(Node)

此方法用於喚醒等待隊列中下一個線程。

private void unparkSuccessor(Node node) {
    //這裏,node一般為當前線程所在的結點。
    int ws = node.waitStatus;
    if (ws < 0)//置零當前線程所在的結點狀態,允許失敗。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一個需要喚醒的結點s
    if (s == null || s.waitStatus > 0) {//如果為空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 從後向前找。
            if (t.waitStatus <= 0)//從這裏可以看出,<=0的結點,都是還有效的結點。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//喚醒
}

這個函數並不複雜。一句話概括:用unpark()喚醒等待隊列中最前邊的那個未放棄線程s。此時,再和acquireQueued()聯繫起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關係,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裏既然s已經是等待隊列中最前邊的那個未放棄線程了,那麼通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立了),然後s把自己設置成head標杆結點,表示自己已經獲取到資源了,acquire()也返回了!

在隊列中查找時是從後向前找的,為什麼這麼做?

從源碼上看,先找到後繼結點s,如果s狀態正常那麼直接喚醒。但有兩種異常情況,會導致next鏈不一致:

  1. s==null,在新結點入隊時可能會出現

  1. s.waitStatus > 0,中間有節點取消時會出現(如超時)

關於併發問題,addWaiter()入隊操作和cancelAcquire()取消排隊操作都會造成next鏈的不一致,而prev鏈是強一致的,所以這時從後往前找是最安全的。

為什麼prev鏈是強一致的?

因為addWaiter()裏每次compareAndSetTail(pred, node)之前都有node.prev = pred,即使compareAndSetTail失敗,enq()會反覆嘗試,直到成功。一旦compareAndSetTail成功,該node.prev就成功掛在之前的tail結點上了,而且是唯一的,這時其他新結點的prev只能嘗試往新tail結點上掛。這裏的組合用法非常巧妙,能保證CAS之前的prev鏈強一致,但不能保證CAS後的next鏈強一致。

acquireShared(int)

此方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源為止,整個過程忽略中斷。

public final void acquireShared(int arg) {
     if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

這裏tryAcquireShared()依然需要自定義同步器去實現。但是AQS已經把其返回值的語義定義好了:負值代表獲取失敗;0代表獲取成功,但沒有剩餘資源;正數表示獲取成功,還有剩餘資源,其他線程還可以去獲取。所以這裏acquireShared()的流程就是:

  1. tryAcquireShared()嘗試獲取資源,成功則直接返回;
  2. 失敗則通過doAcquireShared()進入等待隊列,直到獲取到資源為止才返回。

doAcquireShared(int)

此方法用於將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源後才返回。

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//加入隊列尾部
    boolean failed = true;//是否成功標誌
    try {
        boolean interrupted = false;//等待過程中是否被中斷過的標誌
        for (;;) {
            final Node p = node.predecessor();//前驅
            if (p == head) {//如果到head的下一個,因為head是拿到資源的線程,此時node被喚醒,很可能是head用完資源來喚醒自己的
                int r = tryAcquireShared(arg);//嘗試獲取資源
                if (r >= 0) {//成功
                    setHeadAndPropagate(node, r);//將head指向自己,還有剩餘資源可以再喚醒之後的線程
                    p.next = null; // help GC
                    if (interrupted)//如果等待過程中被打斷過,此時將中斷補上。
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }

            //判斷狀態,尋找安全點,進入waiting狀態,等着被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這裏跟acquireQueued()的流程並沒有太大區別。只不過這裏將補中斷的selfInterrupt()放到doAcquireShared()裏了,而獨佔模式是放到acquireQueued()之外,但實際上都一樣。

跟獨佔模式比,還有一點需要注意的是,這裏只有線程是head.next時(“老二”),才會去嘗試獲取資源,有剩餘的話還會喚醒之後的隊友。

那麼問題就來了,假如老大用完後釋放了5個資源,而老二需要6個,老三需要1個,老四需要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會繼續park()等待其他線程釋放資源,也更不會去喚醒老三和老四了。獨佔模式,同一時刻只有一個線程去執行,這樣做未嘗不可;但共享模式下,多個線程是可以同時執行的,現在因為老二的資源需求量大,而把後面量小的老三和老四也都卡住了。當然,這並不是問題,只是AQS保證嚴格按照入隊順序喚醒罷了(保證公平,但降低了併發)。

setHeadAndPropagate(Node, int):此方法在setHead()的基礎上多了一步,就是自己甦醒的同時,如果條件符合(比如還有剩餘資源),還會去喚醒後繼結點,畢竟是共享模式!

private void setHeadAndPropagate(Node node, int propagate) {

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);//head指向自己
     //如果還有剩餘量,繼續喚醒下一個鄰居線程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

releaseShared()

此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列裏的其他線程來獲取資源。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//嘗試釋放資源
        doReleaseShared();//喚醒後繼結點
        return true;
    }
    return false;
}

此方法的流程也比較簡單,一句話:釋放掉資源後,喚醒後繼。跟獨佔模式下的release()相似,但有一點稍微需要注意:獨佔模式下的tryRelease()在完全釋放掉資源(state=0)後,才會返回true去喚醒其他線程,這主要是基於獨佔下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的線程併發執行,那麼擁有資源的線程在釋放掉部分資源時就可以喚醒後繼等待結點。例如,資源總量是13,A(5)和B(7)分別獲取到資源併發運行,C(4)來時只剩1個資源就需要等待。A在運行過程中釋放掉2個資源量,然後tryReleaseShared(2)返回true喚醒C,C一看只有3個仍不夠繼續等待;隨後B又釋放2個,tryReleaseShared(2)返回true喚醒C,C一看有5個夠自己用了,然後C就可以跟A和B一起運行。而ReentrantReadWriteLock讀鎖的tryReleaseShared()只有在完全釋放掉資源(state=0)才返回true,所以自定義同步器可以根據需要決定tryReleaseShared()的返回值

doReleaseShared()

此方法主要用於喚醒後繼

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);//喚醒後繼
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head發生變化
            break;
    }
}

應用

Mutex是一個不可重入的互斥鎖實現。鎖資源(AQS裏的state)只有兩種狀態:0表示未鎖定,1表示鎖定。核心源碼:

class Mutex implements Lock, java.io.Serializable {
    // 自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判斷是否鎖定狀態
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 嘗試獲取資源,立即返回。成功則返回true,否則false。
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 這裏限定只能為1個量
            if (compareAndSetState(0, 1)) {//state為0才設置為1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//設置為當前線程獨佔資源
                return true;
            }
            return false;
        }

        // 嘗試釋放資源,立即返回。成功則為true,否則false。
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 限定為1個量
            if (getState() == 0)//既然來釋放,那肯定就是已佔有狀態了。只是為了保險,多層判斷!
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);//釋放資源,放棄佔有狀態
            return true;
        }
    }

    // 真正同步類的實現都依賴繼承於AQS的自定義同步器!
    private final Sync sync = new Sync();

    //lock<-->acquire。兩者語義一樣:獲取資源,即便等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    //tryLock<-->tryAcquire。兩者語義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    //unlock<-->release。兩者語文一樣:釋放資源。
    public void unlock() {
        sync.release(1);
    }

    //鎖是否佔有狀態
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實現方式都差不多,不同的地方就在獲取-釋放資源的方式tryAcquire-tryRelelase。

ReentrantLock 的使用

ReentrantLock 的使用方式與 synchronized 關鍵字類似,都是通過加鎖和釋放鎖來實現同步的。我們來看看 ReentrantLock 的使用方式,以非公平鎖為例:

public class ReentrantLockTest {
    private static final ReentrantLock lock = new ReentrantLock();
    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                lock.lock();
                try {
                    count++;
                } finally {
                    lock.unlock();
                }
            }
        });
        Thread thread2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                lock.lock();
                try {
                    count++;
                } finally {
                    lock.unlock();
                }
            }
        });
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(count);
    }
}

代碼很簡單,兩個線程分別對 count 變量進行 10000 次累加操作,最後輸出 count 的值。我們來看看運行結果:

20000

可以看到,兩個線程對 count 變量進行了 20000 次累加操作,説明 ReentrantLock 是支持重入性的。再來看看公平鎖的使用方式,只需要將 ReentrantLock 的構造方法改為公平鎖即可:

private static final ReentrantLock lock = new ReentrantLock(true);

運行結果為:

20000

可以看到,公平鎖的運行結果與非公平鎖的運行結果一致,這是因為公平鎖的實現方式與非公平鎖的實現方式基本一致,只是在獲取鎖時增加了判斷當前節點是否有前驅節點的邏輯判斷。

  • 公平鎖: 按照線程請求鎖的順序獲取鎖,即先到先得。
  • 非公平鎖: 線程獲取鎖的順序可能與請求鎖的順序不同,可能導致某些線程獲取鎖的速度較快。

需要注意的是,使用 ReentrantLock 時,鎖必須在 try 代碼塊開始之前獲取,並且加鎖之前不能有異常拋出,否則在 finally 塊中就無法釋放鎖(ReentrantLock 的鎖必須在 finally 中手動釋放)。

錯誤示例:

Lock lock = new XxxLock();
// ...
try {
    // 如果在此拋出異常,會直接執行 finally 塊的代碼
    doSomething();
    // 不管鎖是否成功,finally 塊都會執行
    lock.lock();
    doOthers();

} finally {
    lock.unlock();
}

正確示例:

Lock lock = new XxxLock();
// ...
lock.lock();
try {
    doSomething();
    doOthers();
} finally {
    lock.unlock();
}
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.