博客 / 詳情

返回

Condition底層機制剖析:多線程等待與通知機制

概述

Condition 是一個多線程協調通信的工具類,可以讓某些線程一起等待某個條件(condition),只有滿足條件時,線程才會被喚醒。

  • 在使用Lock之前,使用的最多的同步方式應該是synchronized關鍵字來實現同步方式了。配合Object的wait()、notify()系列方法可以實現等待/通知模式。

  • Condition接口也提供了類似Object的監視器方法,與Lock配合可以實現等待/通知模式,但是這兩者在使用方式以及功能特性上還是有差別的。

Object和Condition接口的一些對比。

對比項 Object 監視器方法 Condition
前置條件 獲取對象的監視器鎖 調用 Lock.lock() 獲取鎖調用 Lock.newCondition() 獲取 Condition 對象
調用方法 直接調用如:object.wait() 直接調用如:condition.await()
等待隊列個數 一個 多個
當前線程釋放鎖並進入等待隊列 支持 支持
當前線程釋放鎖並進入等待隊列,在等待狀態中不響應中斷 不支持 支持
當前線程釋放鎖並進入超時等待狀態 支持 支持
當前線程釋放鎖並進入等待狀態到將來的某個時間 不支持 支持
喚醒等待隊列中的一個線程 支持 支持
喚醒等待隊列中的全部線程 支持 支持

接口的介紹與示例

首先需要明白condition對象是依賴於lock對象的,意思就是説condition對象需要通過lock對象進行創建出來(調用Lock對象的newCondition()方法)。condition的使用方式非常的簡單。但是需要注意在調用方法前獲取鎖。

/**
 * condition使用示例:
 * 1、condition的使用必須要配合鎖使用,調用方法時必須要獲取鎖
 * 2、condition的創建依賴於Lock lock.newCondition();
 */
public class ConditionUseCase {
    /**
     * 創建鎖
     */
    public Lock readLock = new ReentrantLock();
    /**
     * 創建條件
     */
    public Condition condition = readLock.newCondition();

    public static void main(String[] args) {
        ConditionUseCase useCase = new ConditionUseCase();
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> {
            //獲取鎖進行等待
            useCase.conditionWait();
        });
        executorService.execute(() -> {
            //獲取鎖進行喚起讀鎖
            useCase.conditionSignal();
        });
    }

    /**
     * 等待線程
     */
    public void conditionWait() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "拿到鎖了");
            System.out.println(Thread.currentThread().getName() + "等待信號");
            condition.await();
            System.out.println(Thread.currentThread().getName() + "拿到信號");
        } catch (Exception e) {

        } finally {
            readLock.unlock();
        }
    }

    /**
     * 喚起線程
     */
    public void conditionSignal() {
        readLock.lock();
        try {
            //睡眠5s 線程1啓動
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + "拿到鎖了");
            condition.signal();
            System.out.println(Thread.currentThread().getName() + "發出信號");
        } catch (Exception e) {

        } finally {
            //釋放鎖
            readLock.unlock();
        }
    }

}

//執行結果
1 pool-1-thread-1拿到鎖了
 2 pool-1-thread-1等待信號 ---釋放鎖-線程等待 t1
 3 pool-1-thread-2拿到鎖了
 4 pool-1-thread-2發出信號 --- 喚起線程t2釋放鎖
 5 pool-1-thread-1拿到信號---t1繼續執行

如示例所示,一般都會將Condition對象作為成員變量。當調用await()方法後,當前線程會釋放鎖並在此等待,而其他線程調用Condition對象的signal()方法,通知當前線程後,當前線程才從await()方法返回,並且在返回前已經獲取了鎖。

接口常用方法

condition可以通俗的理解為條件隊列。當一個線程在調用了await方法以後,直到線程等待的某個條件為真的時候才會被喚醒。這種方式為線程提供了更加簡單的等待/通知模式。Condition必須要配合鎖一起使用,因為對共享狀態變量的訪問發生在多線程環境下。一個Condition的實例必須與一個Lock綁定,因此Condition一般都是作為Lock的內部實現。

  1. await() :造成當前線程在接到信號或被中斷之前一直處於等待狀態。
  2. boolean await(long time, TimeUnit unit) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態---》是否超時,超時異常
  3. awaitNanos(long nanosTimeout) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。返回值表示剩餘時間,如果在nanosTimesout之前喚醒,那麼返回值 = nanosTimeout - 消耗時間,如果返回值 <= 0 ,則可以認定它已經超時了。
  4. awaitUninterruptibly() :造成當前線程在接到信號之前一直處於等待狀態。【注意:該方法對中斷不敏感】。
  5. awaitUntil(Date deadline) :造成當前線程在接到信號、被中斷或到達指定最後期限之前一直處於等待狀態。如果沒有到指定時間就被通知,則返回true,否則表示到了指定時間,返回返回false。
  6. signal() :喚醒一個等待線程。該線程從等待方法返回前必須獲得與Condition相關的鎖。
  7. signalAll() :喚醒所有等待線程。能夠從等待方法返回的線程必須獲得與Condition相關的鎖。

這裏順便回顧一下 Object 類的主要方法:

  1. wait():線程等待直到被通知或者中斷。
  2. wait(long timeout):線程等待指定的時間,或被通知,或被中斷。
  3. wait(long timeout, int nanos):線程等待指定的時間,或被通知,或被中斷。
  4. notify():喚醒一個等待的線程。
  5. notifyAll():喚醒所有等待的線程。

原理解析

Condition是AQS的內部類。可以通過 Lock.newCondition() 方法獲取 Condition 對象,而 Lock 對於同步狀態的實現都是通過內部的自定義同步器實現的,newCondition() 方法也不例外,所以,Condition 接口的唯一實現類是同步器 AQS 的內部類 ConditionObject,因為 Condition 的操作需要獲取相關的鎖,所以作為同步器的內部類也比較合理,該類定義如下:

public class ConditionObject implements Condition, java.io.Serializable

等待隊列

前面我們學過,AQS 內部維護了一個先進先出(FIFO)的雙端隊列,並使用了兩個引用 head 和 tail 用於標識隊列的頭部和尾部。

Condition 內部也使用了同樣的方式,內部維護了一個先進先出(FIFO)的單向隊列,我們把它稱為等待隊列。該隊列是 Condition 對象實現等待 / 通知功能的關鍵。等待隊列是一個FIFO的隊列,在隊列中的每個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調用了Condition.await()方法,那麼該線程將會釋放鎖、構造成節點加入等待隊列並進入等待狀態。

事實上,節點的定義複用了 AQS 中 Node 節點的定義,也就是説,同步隊列和等待隊列中節點類型都是 AQS 的靜態內部類 AbstractQueuedSynchronized.Node。一個 Condition 包含一個等待隊列,Condition 擁有首節點(firstWaiter)和尾節點(lastWaiter)。當前線程調用 Condition.await() 方法之後,將會以當前線程構造節點,並將節點從尾部加入等待隊列,等待隊列的基本結構如下所示。

  • 等待隊列分為首節點和尾節點。當一個線程調用Condition.await()方法,將會以當前線程構造節點,並將節點從尾部加入等待隊列。
  • 新增節點就是將尾部節點指向新增的節點。節點引用更新本來就是在獲取鎖以後的操作,所以不需要CAS保證。同時也是線程安全的操作。
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

在 Object 的監視器模型上,一個對象擁有一個同步隊列和等待隊列,而併發包中的 Lock(更確切地説是同步器)擁有一個同步隊列和多個等待隊列

等待 await 方法

  • 當線程調用了await方法以後。線程就作為隊列中的一個節點被加入到等待隊列中去了。同時會釋放鎖的擁有。

  • 當從await方法返回的時候。一定會獲取condition相關聯的鎖。當等待隊列中的節點被喚醒的時候,則喚醒節點的線程開始嘗試獲取同步狀態。

    • 如果不是通過 其他線程調用Condition.signal()方法喚醒,而是對等待線程進行中斷,則會拋出InterruptedException異常信息。

    • 通知調用Condition的signal()方法,將會喚醒在等待隊列中等待最長時間的節點(條件隊列裏的首節點),在喚醒節點前,會將節點移到同步隊列中。

當前線程加入到等待隊列中如圖所示:

源碼如下:

public final void await() throws InterruptedException {
    // 檢測線程中斷狀態
    if (Thread.interrupted())
        throw new InterruptedException();
    // 將當前線程包裝為Node節點加入等待隊列
    Node node = addConditionWaiter();
    // 釋放同步狀態,也就是釋放鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 檢測該節點是否在同步隊中,如果不在,則説明該線程還不具備競爭鎖的資格,則繼續等待
    while (!isOnSyncQueue(node)) {
        // 掛起線程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 競爭同步狀態
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清理條件隊列中的不是在等待條件的節點
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

調用該方法的線程是成功獲取了鎖的線程,也就是同步隊列中的首節點,該方法會將當前線程構造節點並加入等待隊列中,然後釋放同步狀態,喚醒同步隊列中的後繼節點,然後當前線程會進入等待狀態。

可能會有這樣幾個問題:

  1. 怎樣將當前線程添加到等待隊列中?
  2. 釋放鎖的過程是?
  3. 怎樣才能從 await 方法中退出?

問題1:怎樣將當前線程添加到等待隊列中?

調用 addConditionWaiter 方法會將當前線程添加到等待隊列中,源碼如下:

private Node addConditionWaiter() {
    // 尾節點
    Node t = lastWaiter;
    // 尾節點如果不是CONDITION狀態,則表示該節點不處於等待狀態,需要清理節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 根據當前線程創建Node節點
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 將該節點加入等待隊列的末尾
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

首先將 t 指向尾節點,如果尾節點不為空並且它的waitStatus!=-2(-2 為 CONDITION,表示正在等待 Condition 條件),則將不處於等待狀態的節點從等待隊列中移除,並且將 t 指向新的尾節點。

然後將當前線程封裝成 waitStatus 為-2 的節點追加到等待隊列尾部。

如果尾節點為空,則表明隊列為空,將首尾節點都指向當前節點。

img

如果尾節點不為空,表明隊列中有其他節點,則將當前尾節點的 nextWaiter 指向當前節點,將當前節點置為尾節點。

img

簡單總結一下,這段代碼的作用就是通過尾插入的方式將當前線程封裝的 Node 插入到等待隊列中,同時可以看出,Condtion 的等待隊列是一個不帶頭節點的鏈式隊列,之前我們學習 AQS 時知道同步隊列是一個帶頭節點的鏈式隊列,這是兩者的一個區別。

關於頭節點的作用,我們這裏簡單説明一下。

不帶頭節點是指在鏈表數據結構中,鏈表的第一個節點就是實際存儲的第一個數據元素,而不是一個特定的"頭"節點,該節點不包含實際的數據。

1)不帶頭節點的鏈表:

  • 鏈表的第一個節點就是第一個實際的數據節點。
  • 當鏈表為空時,頭引用(通常稱為 head)指向 null。

2)帶頭節點的鏈表:

  • 鏈表有一個特殊的節點作為鏈表的開頭,這個特殊的節點稱為頭節點。
  • 頭節點通常不存儲任何實際數據,或者它的數據字段不被使用。
  • 無論鏈表是否為空,頭節點總是存在的。當鏈表為空時,頭節點的下一個節點指向 null。
  • 使用頭節點可以簡化某些鏈表操作,因為你不必特殊處理第一個元素的插入和刪除。

為了更好地解釋這兩種鏈表結構,我將為每種結構提供一個簡單的整數鏈表插入方法的示例。

1)不帶頭節點的鏈表

public class Node {
    public int data;
    public Node next;

    public Node(int data) {
        this.data = data;
        this.next = null;
    }
}

public class LinkedListWithoutHead {
    public Node head;

    public void insert(int value) {
        Node newNode = new Node(value);
        if (head == null) {
            head = newNode;
        } else {
            Node temp = head;
            while (temp.next != null) {
                temp = temp.next;
            }
            temp.next = newNode;
        }
    }
}

2)帶頭節點的鏈表

public class NodeWithHead {
    public int data;
    public NodeWithHead next;

    public NodeWithHead(int data) {
        this.data = data;
        this.next = null;
    }
}

public class LinkedListWithHead {
    private NodeWithHead head;

    public LinkedListWithHead() {
        head = new NodeWithHead(-1);  // 初始化頭節點
    }

    public void insert(int value) {
        NodeWithHead newNode = new NodeWithHead(value);
        NodeWithHead temp = head;
        while (temp.next != null) {
            temp = temp.next;
        }
        temp.next = newNode;
    }
}

這下是不是就徹底明白了?説明白了頭節點,我們再回到 Condition 的 await 方法。

問題 2:釋放鎖的過程是?

將當前線程加入到等待隊列之後,需要釋放同步狀態,該操作通過 fullyRelease(Node) 方法來完成:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 獲取同步狀態
        int savedState = getState();
        // 釋放鎖
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

這段代碼也很容易理解,調用 AQS 的模板方法 release 釋放 AQS 的同步狀態並且喚醒在同步隊列中頭節點的後繼節點引用的線程,如果釋放成功則正常返回,若失敗的話就拋出異常。

問題3:怎樣才能從 await 方法中退出?

怎樣從 await 方法退出呢?現在回過頭再來看 await 方法,其中有這樣一段邏輯:

while (!isOnSyncQueue(node)) {
	// 3. 當前線程進入到等待狀態
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

isOnSyncQueue 方法用於判斷當前線程所在的 Node 是否在同步隊列中:

final boolean isOnSyncQueue(Node node) {
    // 節點狀態為CONDITION,或者前驅節點為null,返回false
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 後繼節點不為null,那麼肯定在同步隊列中
    if (node.next != null) // If has successor, it must be on queue
        return true;
    
    return findNodeFromTail(node);
}

如果當前節點的 waitStatus=-2,説明它在等待隊列中,返回 false;如果當前節點有前驅節點,則證明它在 AQS 隊列中,但是前驅節點為空,説明它是頭節點,而頭節點是不參與鎖競爭的,也返回 false。

如果當前節點既不在等待隊列中,又不是 AQS 中的頭節點且存在 next 節點,説明它存在於 AQS 中,直接返回 true。

這裏有必要給大家看一下同步隊列與等待隊列的關係圖了。

img

當線程第一次調用 condition.await 方法時,會進入到這個 while 循環,然後通過 LockSupport.park(this) 使當前線程進入等待狀態,那麼要想退出 await,第一個前提條件就是要先退出這個 while 循環,出口就只兩個地方:

  1. 走到 break 退出 while 循環;
  2. while 循環中的邏輯判斷為 false。

出現第 1 種情況的條件是,當前等待的線程被中斷後代碼會走到 break 退出,第 2 種情況是當前節點被移動到了同步隊列中(即另外一個線程調用了 condition 的 signal 或者 signalAll 方法),while 中邏輯判斷為 false 後結束 while 循環。

總結一下,退出 await 方法的前提條件是當前線程被中斷或者調用 condition.signal 或者 condition.signalAll 使當前節點移動到同步隊列後

當退出 while 循環後會調用acquireQueued(node, savedState),該方法的作用是在自旋過程中線程不斷嘗試獲取同步狀態,直到成功(線程獲取到 lock)。這樣也説明了退出 await 方法必須是已經獲得了 condition 引用(關聯)的 lock

到目前為止,上文提到的三個問題,我們都通過閲讀源碼的方式找到了答案,也加深了對 await 方法的理解。await 方法示意圖如下:

await方法示意圖

如圖,調用 condition.await 方法的線程必須是已經獲得了 lock 的線程,也就是當前線程是同步隊列中的頭節點。調用該方法後會使得當前線程所封裝的 Node 尾插入到等待隊列中。

超時機制的支持

condition 還額外支持超時機制,使用者可調用 awaitNanos、awaitUtil 這兩個方法,實現原理基本上與 AQS 中的 tryAcquire 方法如出一轍。

不響應中斷的支持

要想不響應中斷可以調用 condition.awaitUninterruptibly() 方法,該方法的源碼如下:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

這段方法與上面的 await 方法基本一致,只不過減少了對中斷的處理。

通知-signal/signalAll 實現原理

調用 condition 的 signal 或者 signalAll 方法可以將等待隊列中等待時間最長的節點移動到同步隊列中,使得該節點能夠有機會獲得 lock。等待隊列是先進先出(FIFO)的,所以等待隊列的頭節點必然會是等待時間最長的節點,也就是每次調用 condition 的 signal 方法都會將頭節點移動到同步隊列中。

在調用signal()方法之前必須先判斷是否獲取到了鎖。接着獲取等待隊列的首節點,將其移動到同步隊列並且利用LockSupport喚醒節點中的線程。節點從等待隊列移動到同步隊列如下圖所示:

被喚醒的線程將從await方法中的while循環中退出。隨後加入到同步狀態的競爭當中去。成功獲取到競爭的線程則會返回到await方法之前的狀態。

源碼如下:

調用 Condition 的 signal() 方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步隊列中。Condition 的 signal() 方法如下所示:

public final void signal() {
    // 判斷是否是當前線程獲取了鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 喚醒等待隊列的首節點
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

該方法最終調用 doSignal(Node) 方法來喚醒節點:

private void doSignal(Node first) {
    do {
        // 把等待隊列的首節點移除之後,要修改首結點
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

真正對頭節點做處理的邏輯,將節點移動到同步隊列是通過 transferForSignal(Node) 方法完成的:

final boolean transferForSignal(Node node) {
    // 嘗試將該節點的狀態從CONDITION修改為0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    
    // 將節點加入到同步隊列尾部,返回該節點的前驅節點
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果前驅節點的狀態為CANCEL或者修改waitStatus失敗,則直接喚醒當前線程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

這段代碼主要做了兩件事情:

  1. 將頭節點的狀態更改為 CONDITION;
  2. 調用 enq 方法,將該節點尾插入到同步隊列中,關於 enq 方法請看 AQS 的底層實現這篇文章。

節點從等待隊列移動到同步隊列的過程如下圖所示:

被喚醒後的線程,將從 await() 方法中的 while 循環中退出(因為此時 isOnSyncQueue(Node) 方法返回 true),進而調用 acquireQueued() 方法加入到獲取同步狀態的競爭中。

成功獲取了鎖之後,被喚醒的線程將從先前調用的 await() 方法返回,此時,該線程已經成功獲取了鎖。

signalAll()

sigllAll 與 sigal 方法的區別體現在 doSignalAll 方法上,前面我們已經知道 doSignal 方法只會對等待隊列的頭節點進行操作, signalAll() 方法相當於對等待隊列的每個節點均執行一次 signal() 方法,效果就是將等待隊列中的所有節點移動到同步隊列中。doSignalAll 的源碼如下:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

該方法會將等待隊列中的每一個節點都移入到同步隊列中,即“通知”當前調用 condition.await() 方法的每一個線程。

await 與 signal/signalAll

文章開篇提到的等待/通知機制,通過 condition 的 await 和 signal/signalAll 方法就可以實現,而這種機制能夠解決最經典的問題就是“生產者與消費者問題”

await、signal 和 signalAll 方法就像一個開關,控制着線程 A(等待方)和線程 B(通知方)。它們之間的關係可以用下面這幅圖來説明,會更貼切:

線程 awaitThread 先通過 lock.lock() 方法獲取鎖,成功後調用 condition.await 方法進入等待隊列,而另一個線程 signalThread 通過 lock.lock() 方法獲取鎖成功後調用了 condition.signal 或者 signalAll 方法,使得線程 awaitThread 能夠有機會移入到同步隊列中,當其他線程釋放 lock 後使得線程 awaitThread 能夠有機會獲取 lock,從而使得線程 awaitThread 能夠從 await 方法中退出並執行後續操作。如果 awaitThread 獲取 lock 失敗會直接進入到同步隊列。

總結

  • 調用await方法後,將當前線程加入Condition等待隊列中。當前線程釋放鎖。否則別的線程就無法拿到鎖而發生死鎖。自旋(while)掛起,不斷檢測節點是否在同步隊列中了,如果是則嘗試獲取鎖,否則掛起。

  • 當線程被signal方法喚醒,被喚醒的線程將從await()方法中的while循環中退出來,然後調用acquireQueued()方法競爭同步狀態。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.