博客 / 詳情

返回

Go 併發控制:sync.Cond 詳解

公眾號首發地址:https://mp.weixin.qq.com/s/c1C2rv3nGCtnfm2n34K6VQ

在 Go 中因為 channel 的存在,sync.Cond 併發原語並不常用。不過在一些開源組件中還能能見到 sync.Cond 的應用,比如 Kubernetes 用它來實現併發等待隊列,這也是 sync.Cond 的典型應用場景。本文將通過源碼和示例帶你學會 sync.Cond 的正確用法。

源碼解讀

我們可以在 sync.Cond 文檔 https://pkg.go.dev/sync@go1.23.0#Cond 中看到其定義和實現的 exported 方法:

type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()

sync.Cond 是一個結構體,NewCond 函數接收一個互斥鎖對象 sync.Locker 並構造一個 sync.Cond 結構體指針。這裏的互斥鎖對象 sync.Locker 是一個接口,sync.Mutexsync.RWMutex 都實現了此接口。這把互斥鎖是 sync.Cond 實現的關鍵所在,也是導致 sync.Cond 經常容易用錯的“罪魁禍首”。其他 3 個方法,就是 sync.Cond 提供給我們用來進行併發控制的全部方法了。

你可能猜到了,Cond 這個單詞是 Condition 的縮寫,所以 sync.Cond 併發原語是圍繞着一個條件來設計的,它的主要功能就是通過一個條件來實現阻塞和喚醒一組需要協作的 goroutine

當調用 Wait 方法時,當前 goroutine 會被阻塞,直到被其他 goroutine 中調用的 BroadcastSignal 方法喚醒。這看起來有點類似 sync.WaitGroup,不過卻不太一樣,等閲讀完了本文,你就能明白二者的差異了。

NOTE:

如果你對 sync.WaitGroup 不熟悉,可以參考我的另一篇文章《Go 併發控制:sync.WaitGroup 詳解》。

目前,我們並沒有看到所謂的條件到底是什麼,這個概念似乎有點抽象,彆着急,一會你就明白了。那麼接下來,我們對 sync.Cond 源碼進行進一步解讀。

sync.Cond 構造函數實現如下:

https://github.com/golang/go/blob/go1.23.0/src/sync/cond.go
func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}

這裏沒什麼邏輯,只是記錄了傳進來的 sync.Locker 對象。

sync.Cond 結構體定義如下:

type Cond struct {
    noCopy noCopy

    // L is held while observing or changing the condition
    L Locker

    notify  notifyList
    checker copyChecker
}

首先,看過我寫的前幾篇併發編程相關文章的讀者對 noCopy 屬性應該再熟悉不過了,就是用來防止 sync.Cond 結構體被複制用的。不瞭解的讀者可以看一下我的另一篇文章《Go 中空結構體慣用法,我幫你總結全了!》。

接着,就是互斥鎖屬性字段 L,根據註釋可知,在檢查或者修改 condition(即條件)時需要持有鎖。

notify 屬性則是用來記錄被阻塞等待的隊列,它的主要作用是維護一個通知列表,用於在調用 WaitSignal/Broadcast 時高效地協調 goroutines 的阻塞和喚醒。其定義如下:

type notifyList struct {
    wait   uint32         // 當前進入等待狀態的 goroutine 數量
    notify uint32         // 當前已被通知可以繼續運行的 goroutine 數量
    lock   uintptr        // 用於保護 wait/notify 的鎖
    head   unsafe.Pointer // 等待隊列的頭指針
    tail   unsafe.Pointer // 等待隊列的尾指針
}

可以看出 notifyList 是一個鏈表實現。

現在還剩下最後一個屬性 checker,這個屬性同樣用於防止 sync.Cond 結構體被複制。noCopy 類型可以輔助 go vet 工具在編譯時做靜態類型檢查,而 copyChecker 則可以在運行時進行動態檢查。其實現如下:

type copyChecker uintptr

// 檢查是否被複制,如果發生複製,則觸發 panic
func (c *copyChecker) check() {
    if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
        !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
        uintptr(*c) != uintptr(unsafe.Pointer(c)) {
        panic("sync.Cond is copied")
    }
}

接下來我們看下 sync.Cond 實現的 3 個方法:

func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

沒錯,sync.Cond 這 3 個方法竟然只有這麼幾行代碼。這得益於 Go 將複雜的邏輯都交給更底層的 runtime 包去實現了。不過我們大可不必深究更底層的實現,我們只需要梳理清楚這裏面的邏輯,就能理解 sync.Cond 的設計和用法了。

首先,這 3 個方法有一個共性,就是都會調用 c.checker.check() 來檢查對象 Cond 是否被複制,檢查過後才是主邏輯。

Wait 方法會先調用 runtime_notifyListAdd 函數將調用者加入到通知列表中,接着通過 c.L.Unlock() 釋放鎖,然後再調用 runtime_notifyListWait 函數阻塞並等待通知,收到通知被喚醒後,則繼續執行 c.L.Lock() 進行加鎖操作。

所以,根據 Wait 方法的實現,我們大概可以猜到它的使用方法:

c.L.Lock() // 調用 Wait 前先加鎖
// ...
c.Wait() // 調用 Wait 阻塞等待通知,其內部先釋放鎖,然後阻塞等待,最後被喚醒時重新加鎖
// ...
c.L.Unlock() // 調用 Wait 後需要釋放鎖

Signal 方法內部調用了 runtime_notifyListNotifyOne 函數來通知喚醒一個在通知列表中的調用者,並將其從通知列表中移除。

Broadcast 方法內部則調用了 runtime_notifyListNotifyAll 函數來通知喚醒整個通知列表中的全部調用者,並清空通知列表。

sync.Cond 源碼要講解的內容就這麼多,現在我再來講解 sync.Cond 的用法你就好理解了。

示例代碼如下:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/main.go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    c := sync.NewCond(&sync.Mutex{})

    go func() { // 啓動一個子 goroutine 進行等待
        fmt.Println("wait before")
        c.L.Lock()
        c.Wait() // 阻塞並等待通知
        c.L.Unlock()
        fmt.Println("wait after")
    }()

    time.Sleep(time.Second)

    fmt.Println("signal before")
    c.Signal() // 通知喚醒一個阻塞的 goroutine
    fmt.Println("signal after")

    time.Sleep(time.Second) // 確保子 goroutine 執行完成再退出
}

這個示例非常簡單,啓動一個子 goroutine 並調用 c.Wait() 進行等待,這會使當前子 goroutine 被加入到通知列表並阻塞。在主 goroutine 中等待 1 秒以後調用 c.Signal() 來通知喚醒一個阻塞的 goroutine,這個示例中只有一個 goroutine 在通知列表中,即上面啓動的子 goroutine,所以必然是它被喚醒。程序最後會等待 1 秒中,確保子 goroutine 執行完成再退出。

執行示例代碼,得到如下輸出:

$ go run main.go
wait before
signal before
signal after
wait after

輸出結果符合預期。

但是,上面的示例並沒有發揮 sync.Cond 併發原語真正的作用,這個簡單的示例使用 channel 實現似乎更加合理。並且,你有沒有注意到,這裏根本就沒體現出條件這個概念。

我們重新實現一個 sync.Cond 使用示例如下:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/main.go
func main() {
    c := sync.NewCond(&sync.Mutex{})
    condition := false // 定義一個條件變量

    go func() { // 啓動一個子 goroutine 進行等待
        fmt.Println("wait before")
        c.L.Lock()
        for !condition { // 通過循環檢查條件是否滿足
            c.Wait() // 阻塞並等待通知
        }
        fmt.Println("condition met, continue execution")
        c.L.Unlock()
        fmt.Println("wait after")
    }()

    time.Sleep(time.Second)

    fmt.Println("signal before")
    c.L.Lock()
    condition = true // 改變條件變量的狀態
    c.L.Unlock()
    c.Signal() // 通知喚醒一個阻塞的 goroutine
    fmt.Println("signal after")

    time.Sleep(time.Second) // 確保子 goroutine 執行完成再退出
}

這個示例程序,在原有示例代碼的基礎上,增加了條件變量 condition。在子 goroutine 中調用 c.Wait() 之前使用 for 循環檢查條件是否滿足,如果不滿足,才會調用 c.Wait() 進行阻塞等待,否則,條件滿足則直接執行後續邏輯。

在主 goroutine 中調用 c.Signal() 之前,加鎖保護並修改了條件變量 condition 的值為 true,然後才會通知喚醒被阻塞的子 goroutine。

子 goroutine 被喚醒後,for !condition 判斷條件不在成立,程序會退出循環向下繼續執行。

這才是 sync.Cond 的典型實用場景。

執行示例代碼,得到如下輸出:

$ go run main.go
wait before
signal before
signal after
condition met, continue execution
wait after

其實,在 sync.Cond 源碼中,Wait 方法註釋部分已經給出正確用法:

//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()

所以,Wait 方法的正確用法是結合互斥鎖條件變量一起來使用的。

那麼,Wait 方法為什麼要這樣設計呢?其實就是為了讓用户能夠在併發安全的情況下,對某個條件進行檢查,來決定是否繼續等待還是向下執行代碼。而在調用 Signal/Broadcast 方法前,能夠在併發安全的情況下對條件變量進行修改。

那麼到現在,再回過頭理解為什麼構造 sync.Cond 對象時需要一把互斥鎖這件事,也就説的通了。這把鎖由調用方傳遞進來,那麼調用方就可以藉助這把鎖,併發安全的修改條件變量。而在 Wait 方法內部,會在調用 runtime_notifyListWait 函數阻塞當前 goroutine 之前釋放鎖,這樣用户才有機會使用同一把鎖,在業務代碼中,加鎖並修改條件變量。然後調用 Signal/Broadcast 來通知喚醒 Wait 方法。Wait 方法喚醒後,會再次加鎖,這樣我們在外層使用 for !condition 檢查條件變量時就是併發安全的。如果條件成立,則由調用方釋放鎖,並繼續執行業務代碼。

我們可以總結出正確使用 sync.Cond 的套路:

  1. 要在調用 Wait 方法之前加鎖,調用後釋放鎖,並且需要一個 for 循環來不斷的檢測條件變量是否滿足。
  2. 在業務代碼中加鎖來併發安全的修改條件變量。
  3. 每次修改條件變量後,都要調用 Signal/Broadcast 方法來喚醒被 Wait 方法阻塞的 goroutine。

你一定要把這個套路當作 sync.Cond 的使用模板,印在你的腦海中。

那麼接下來,我們以一個更加真實的案例,來體會 sync.Cond 的用法。

使用示例

我帶你使用 sync.Cond 實現一個併發等待隊列,以此來徹底掌握 sync.Cond

這裏聲明瞭一個接口,來定義隊列需要實現的幾個方法:

type Interface interface {
    Add(item any)                   // 元素入隊
    Get() (item any, shutdown bool) // 元素出隊
    Len() int                       // 獲取隊列長度
    ShutDown()                      // 關閉隊列
    ShuttingDown() bool             // 隊列是否已經關閉
}

既然是隊列,那麼就要有入隊操作 Add 和出隊操作 GetLen 用來獲取隊列當前長度,調用 ShutDown 方法可以關閉隊列,關閉後的隊列無法再進行入隊操作,ShuttingDown 方法則返回隊列是否已經關閉。

接下來,我們就來實現這個接口。

首先,我們定義一個結構體 Queue 作為併發等待隊列的實現:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/queue/queue.go
// Queue 併發等待隊列
type Queue struct {
    // 條件變量
    cond *sync.Cond

    // 隊列
    queue []any

    // 隊列是否關閉的標識
    shuttingDown bool
}

Queue 有 3 個屬性:

使用指針類型的 sync.Cond 作為 Queue 的第一個屬性,因為 sync.NewCond 返回的就是指針類型。

使用一個支持任意類型的切片 []any 作為隊列,用於保存隊列中每一項元素。

最後的 bool 類型屬性 shuttingDown 用來標識隊列是否已經被關閉,為 true 表示關閉。

我們再為併發等待隊列定義一個構造函數 New

// New 創建一個併發等待隊列
func New() *Queue {
    return &Queue{
        cond: sync.NewCond(&sync.Mutex{}),
    }
}

接着,實現入隊方法 Add

// Add 元素入隊,如果隊列已經關閉,則直接返回,無法入隊
func (q *Queue) Add(item any) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    if q.shuttingDown { // 如果隊列已經關閉,則直接返回,不再入隊
        return
    }

    q.queue = append(q.queue, item) // 入隊
    q.cond.Signal()                 // 喚醒一個等待者,通知隊列中有數據了
}

當我們將元素 item 加入到隊列 q.queue 中後,就會調用 q.cond.Signal() 來喚醒一個等待者。所以 q.queue 就是我們的條件。注意,Add 方法內部的邏輯都進行了加鎖操作。

然後,就應該要實現出隊方法 Get 了:

// Get 從隊列中獲取一個元素,如果隊列為空則阻塞等待
// 第二個返回值標識隊列是否已經關閉,已關閉則返回 true,無法獲取到數據
func (q *Queue) Get() (item any, shutdown bool) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    for len(q.queue) == 0 && !q.shuttingDown {
        q.cond.Wait() // 如果隊列為空且未關閉,阻塞等待隊列中有數據時被喚醒
    }
    if len(q.queue) == 0 { // 如果此時隊列為空,那麼 q.shuttingDown 必然為 true,説明隊列已經被關閉了
        return nil, true
    }

    // NOTE: 如果 queue 不為空,shuttingDown 可能為 true 也可能為 false,都繼續往下執行
    // 即使標記隊列已經被關閉了,也要清空 queue

    // 出隊邏輯
    item = q.queue[0]
    q.queue[0] = nil // 主動清除引用,幫助 GC 回收
    q.queue = q.queue[1:]

    return item, false
}

Get 方法返回兩個值,第一個值 item 是出隊元素,第二個值 shutdown 標識隊列是否已經關閉。

這裏在是否調用 q.cond.Wait() 進行阻塞的判斷條件是 for len(q.queue) == 0 && !q.shuttingDown,因為不止 q.queue 這一個判別條件,當隊列被關閉,也不應該阻塞在這裏,因為永遠也不會獲得元素。

這裏有一個值得注意的點,就是出隊時,我們將 q.queue[0] 置為了 nil,這樣會解除切片底層數組對 item 的引用,讓 GC 儘早對其進行回收,避免極端情況出現內存泄漏。

現在,我們已經實現了一個併發等待隊列的核心邏輯,入隊和出隊。入隊時會檢測隊列是否已經關閉,如果隊列已經關閉,則直接返回,不再入隊。出隊時會檢測隊列是否為空,同時檢測隊列是否已經關閉,如果隊列未關閉,且為空,則阻塞等待,直到有值或隊列被關閉。

其他幾個方法都比較簡單,我一併列出來:

// ShutDown 關閉隊列
func (q *Queue) ShutDown() {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    q.shuttingDown = true // 標記隊列關閉
    q.cond.Broadcast()
}

// ShuttingDown 隊列是否關閉
func (q *Queue) ShuttingDown() bool {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    return q.shuttingDown // 返回隊列是否關閉標識
}

// Len 獲取隊列長度
func (q *Queue) Len() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    return len(q.queue) // 返回隊列當前長度
}

至此,一個併發等待隊列就實現完成了。

我們可以為這個併發等待隊列編寫一個測試用例:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/queue/queue_test.go
func TestBasic(t *testing.T) {
    tests := []struct {
        queue *queue.Queue
    }{
        {
            queue: queue.New(),
        },
        {
            queue: queue.New(),
        },
    }
    for _, test := range tests {
        // If something is seriously wrong this test will never complete.

        // Start producers
        const producers = 50
        producerWG := sync.WaitGroup{}
        producerWG.Add(producers)
        for i := 0; i < producers; i++ {
            go func(i int) {
                defer producerWG.Done()
                for j := 0; j < 50; j++ {
                    test.queue.Add(i)
                    time.Sleep(time.Millisecond)
                }
            }(i)
        }

        // Start consumers
        const consumers = 10
        consumerWG := sync.WaitGroup{}
        consumerWG.Add(consumers)
        for i := 0; i < consumers; i++ {
            go func(i int) {
                defer consumerWG.Done()
                for {
                    item, quit := test.queue.Get()
                    if item == "added after shutdown!" {
                        t.Errorf("Got an item added after shutdown.")
                    }
                    if quit {
                        return
                    }
                    t.Logf("Worker %v: processing %v", i, item)
                }
            }(i)
        }

        producerWG.Wait()
        test.queue.ShutDown()
        test.queue.Add("added after shutdown!")
        consumerWG.Wait()
        if test.queue.Len() != 0 {
            t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len())
        }
    }
}

這裏併發啓動了 50 個生產者 goroutine,每個 goroutine 向隊列中寫入 50 個元素。啓動了 10 個消費者 goroutine,來併發的消費隊列。在生產者生產完成後,會調用 test.queue.ShutDown() 關閉隊列,然後再次嘗試向隊列中添加一個元素,等待消費者消費完成,最終判斷隊列長隊是否為 0。

執行這個測試用例,得到如下輸出:

$ go test -v -run=TestBasic
=== RUN   TestBasic
    queue_test.go:58: Worker 9: processing 2
    queue_test.go:58: Worker 9: processing 16
    ...
    queue_test.go:58: Worker 1: processing 36
    queue_test.go:58: Worker 5: processing 13
    queue_test.go:58: Worker 7: processing 24
--- PASS: TestBasic (0.26s)
PASS
ok      github.com/jianghushinian/blog-go-example/sync/cond/queue       0.679s

這個輸出日誌中省略了大量的 t.Logf() 打印,不過這足以演示隊列的正確性。

更多測試用例,你可以在這裏看到:https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/queue/queue_test.go。

其實,我們實現的這個併發等待隊列,正是 Kubernetes client-go 中非常關鍵的一個組件 workqueue 的精簡版。搞懂了這個併發等待隊列的實現,再去看 workqueue 的源碼就很容易上手了,祝你好運 :)。

總結

本文對 Go 中的 sync.Cond 併發原語進行了講解,並帶你看了其源碼的實現,以及介紹瞭如何使用。

你一定要記得我們總結出來的 sync.Cond 使用套路,不要用錯。我們最終實現的併發等待隊列 Queue 是 Kubernetes client-go 中關鍵組件 workqueue 的微小實現,你也一定要掌握。

本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。

希望此文能對你有所啓發。

聯繫我

  • 公眾號:Go編程世界
  • 微信:jianghushinian
  • 郵箱:jianghushinian007@outlook.com
  • 博客:https://jianghushinian.cn
  • GitHub:https://github.com/jianghushinian
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.