公眾號首發地址:https://mp.weixin.qq.com/s/c1C2rv3nGCtnfm2n34K6VQ
在 Go 中因為 channel 的存在,sync.Cond 併發原語並不常用。不過在一些開源組件中還能能見到 sync.Cond 的應用,比如 Kubernetes 用它來實現併發等待隊列,這也是 sync.Cond 的典型應用場景。本文將通過源碼和示例帶你學會 sync.Cond 的正確用法。
源碼解讀
我們可以在 sync.Cond 文檔 https://pkg.go.dev/sync@go1.23.0#Cond 中看到其定義和實現的 exported 方法:
type Cond
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
sync.Cond 是一個結構體,NewCond 函數接收一個互斥鎖對象 sync.Locker 並構造一個 sync.Cond 結構體指針。這裏的互斥鎖對象 sync.Locker 是一個接口,sync.Mutex 和 sync.RWMutex 都實現了此接口。這把互斥鎖是 sync.Cond 實現的關鍵所在,也是導致 sync.Cond 經常容易用錯的“罪魁禍首”。其他 3 個方法,就是 sync.Cond 提供給我們用來進行併發控制的全部方法了。
你可能猜到了,Cond 這個單詞是 Condition 的縮寫,所以 sync.Cond 併發原語是圍繞着一個條件來設計的,它的主要功能就是通過一個條件來實現阻塞和喚醒一組需要協作的 goroutine。
當調用 Wait 方法時,當前 goroutine 會被阻塞,直到被其他 goroutine 中調用的 Broadcast 或 Signal 方法喚醒。這看起來有點類似 sync.WaitGroup,不過卻不太一樣,等閲讀完了本文,你就能明白二者的差異了。
NOTE:
如果你對
sync.WaitGroup不熟悉,可以參考我的另一篇文章《Go 併發控制:sync.WaitGroup 詳解》。
目前,我們並沒有看到所謂的條件到底是什麼,這個概念似乎有點抽象,彆着急,一會你就明白了。那麼接下來,我們對 sync.Cond 源碼進行進一步解讀。
sync.Cond 構造函數實現如下:
https://github.com/golang/go/blob/go1.23.0/src/sync/cond.go
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
這裏沒什麼邏輯,只是記錄了傳進來的 sync.Locker 對象。
sync.Cond 結構體定義如下:
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
首先,看過我寫的前幾篇併發編程相關文章的讀者對 noCopy 屬性應該再熟悉不過了,就是用來防止 sync.Cond 結構體被複制用的。不瞭解的讀者可以看一下我的另一篇文章《Go 中空結構體慣用法,我幫你總結全了!》。
接着,就是互斥鎖屬性字段 L,根據註釋可知,在檢查或者修改 condition(即條件)時需要持有鎖。
notify 屬性則是用來記錄被阻塞等待的隊列,它的主要作用是維護一個通知列表,用於在調用 Wait 和 Signal/Broadcast 時高效地協調 goroutines 的阻塞和喚醒。其定義如下:
type notifyList struct {
wait uint32 // 當前進入等待狀態的 goroutine 數量
notify uint32 // 當前已被通知可以繼續運行的 goroutine 數量
lock uintptr // 用於保護 wait/notify 的鎖
head unsafe.Pointer // 等待隊列的頭指針
tail unsafe.Pointer // 等待隊列的尾指針
}
可以看出 notifyList 是一個鏈表實現。
現在還剩下最後一個屬性 checker,這個屬性同樣用於防止 sync.Cond 結構體被複制。noCopy 類型可以輔助 go vet 工具在編譯時做靜態類型檢查,而 copyChecker 則可以在運行時進行動態檢查。其實現如下:
type copyChecker uintptr
// 檢查是否被複制,如果發生複製,則觸發 panic
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
接下來我們看下 sync.Cond 實現的 3 個方法:
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
沒錯,sync.Cond 這 3 個方法竟然只有這麼幾行代碼。這得益於 Go 將複雜的邏輯都交給更底層的 runtime 包去實現了。不過我們大可不必深究更底層的實現,我們只需要梳理清楚這裏面的邏輯,就能理解 sync.Cond 的設計和用法了。
首先,這 3 個方法有一個共性,就是都會調用 c.checker.check() 來檢查對象 Cond 是否被複制,檢查過後才是主邏輯。
Wait 方法會先調用 runtime_notifyListAdd 函數將調用者加入到通知列表中,接着通過 c.L.Unlock() 釋放鎖,然後再調用 runtime_notifyListWait 函數阻塞並等待通知,收到通知被喚醒後,則繼續執行 c.L.Lock() 進行加鎖操作。
所以,根據 Wait 方法的實現,我們大概可以猜到它的使用方法:
c.L.Lock() // 調用 Wait 前先加鎖
// ...
c.Wait() // 調用 Wait 阻塞等待通知,其內部先釋放鎖,然後阻塞等待,最後被喚醒時重新加鎖
// ...
c.L.Unlock() // 調用 Wait 後需要釋放鎖
Signal 方法內部調用了 runtime_notifyListNotifyOne 函數來通知喚醒一個在通知列表中的調用者,並將其從通知列表中移除。
而 Broadcast 方法內部則調用了 runtime_notifyListNotifyAll 函數來通知喚醒整個通知列表中的全部調用者,並清空通知列表。
sync.Cond 源碼要講解的內容就這麼多,現在我再來講解 sync.Cond 的用法你就好理解了。
示例代碼如下:
https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/main.go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
c := sync.NewCond(&sync.Mutex{})
go func() { // 啓動一個子 goroutine 進行等待
fmt.Println("wait before")
c.L.Lock()
c.Wait() // 阻塞並等待通知
c.L.Unlock()
fmt.Println("wait after")
}()
time.Sleep(time.Second)
fmt.Println("signal before")
c.Signal() // 通知喚醒一個阻塞的 goroutine
fmt.Println("signal after")
time.Sleep(time.Second) // 確保子 goroutine 執行完成再退出
}
這個示例非常簡單,啓動一個子 goroutine 並調用 c.Wait() 進行等待,這會使當前子 goroutine 被加入到通知列表並阻塞。在主 goroutine 中等待 1 秒以後調用 c.Signal() 來通知喚醒一個阻塞的 goroutine,這個示例中只有一個 goroutine 在通知列表中,即上面啓動的子 goroutine,所以必然是它被喚醒。程序最後會等待 1 秒中,確保子 goroutine 執行完成再退出。
執行示例代碼,得到如下輸出:
$ go run main.go
wait before
signal before
signal after
wait after
輸出結果符合預期。
但是,上面的示例並沒有發揮 sync.Cond 併發原語真正的作用,這個簡單的示例使用 channel 實現似乎更加合理。並且,你有沒有注意到,這裏根本就沒體現出條件這個概念。
我們重新實現一個 sync.Cond 使用示例如下:
https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/main.go
func main() {
c := sync.NewCond(&sync.Mutex{})
condition := false // 定義一個條件變量
go func() { // 啓動一個子 goroutine 進行等待
fmt.Println("wait before")
c.L.Lock()
for !condition { // 通過循環檢查條件是否滿足
c.Wait() // 阻塞並等待通知
}
fmt.Println("condition met, continue execution")
c.L.Unlock()
fmt.Println("wait after")
}()
time.Sleep(time.Second)
fmt.Println("signal before")
c.L.Lock()
condition = true // 改變條件變量的狀態
c.L.Unlock()
c.Signal() // 通知喚醒一個阻塞的 goroutine
fmt.Println("signal after")
time.Sleep(time.Second) // 確保子 goroutine 執行完成再退出
}
這個示例程序,在原有示例代碼的基礎上,增加了條件變量 condition。在子 goroutine 中調用 c.Wait() 之前使用 for 循環檢查條件是否滿足,如果不滿足,才會調用 c.Wait() 進行阻塞等待,否則,條件滿足則直接執行後續邏輯。
在主 goroutine 中調用 c.Signal() 之前,加鎖保護並修改了條件變量 condition 的值為 true,然後才會通知喚醒被阻塞的子 goroutine。
子 goroutine 被喚醒後,for !condition 判斷條件不在成立,程序會退出循環向下繼續執行。
這才是 sync.Cond 的典型實用場景。
執行示例代碼,得到如下輸出:
$ go run main.go
wait before
signal before
signal after
condition met, continue execution
wait after
其實,在 sync.Cond 源碼中,Wait 方法註釋部分已經給出正確用法:
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
所以,Wait 方法的正確用法是結合互斥鎖和條件變量一起來使用的。
那麼,Wait 方法為什麼要這樣設計呢?其實就是為了讓用户能夠在併發安全的情況下,對某個條件進行檢查,來決定是否繼續等待還是向下執行代碼。而在調用 Signal/Broadcast 方法前,能夠在併發安全的情況下對條件變量進行修改。
那麼到現在,再回過頭理解為什麼構造 sync.Cond 對象時需要一把互斥鎖這件事,也就説的通了。這把鎖由調用方傳遞進來,那麼調用方就可以藉助這把鎖,併發安全的修改條件變量。而在 Wait 方法內部,會在調用 runtime_notifyListWait 函數阻塞當前 goroutine 之前釋放鎖,這樣用户才有機會使用同一把鎖,在業務代碼中,加鎖並修改條件變量。然後調用 Signal/Broadcast 來通知喚醒 Wait 方法。Wait 方法喚醒後,會再次加鎖,這樣我們在外層使用 for !condition 檢查條件變量時就是併發安全的。如果條件成立,則由調用方釋放鎖,並繼續執行業務代碼。
我們可以總結出正確使用 sync.Cond 的套路:
- 要在調用
Wait方法之前加鎖,調用後釋放鎖,並且需要一個for循環來不斷的檢測條件變量是否滿足。 - 在業務代碼中加鎖來併發安全的修改條件變量。
- 每次修改條件變量後,都要調用
Signal/Broadcast方法來喚醒被Wait方法阻塞的 goroutine。
你一定要把這個套路當作 sync.Cond 的使用模板,印在你的腦海中。
那麼接下來,我們以一個更加真實的案例,來體會 sync.Cond 的用法。
使用示例
我帶你使用 sync.Cond 實現一個併發等待隊列,以此來徹底掌握 sync.Cond。
這裏聲明瞭一個接口,來定義隊列需要實現的幾個方法:
type Interface interface {
Add(item any) // 元素入隊
Get() (item any, shutdown bool) // 元素出隊
Len() int // 獲取隊列長度
ShutDown() // 關閉隊列
ShuttingDown() bool // 隊列是否已經關閉
}
既然是隊列,那麼就要有入隊操作 Add 和出隊操作 Get,Len 用來獲取隊列當前長度,調用 ShutDown 方法可以關閉隊列,關閉後的隊列無法再進行入隊操作,ShuttingDown 方法則返回隊列是否已經關閉。
接下來,我們就來實現這個接口。
首先,我們定義一個結構體 Queue 作為併發等待隊列的實現:
https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/queue/queue.go
// Queue 併發等待隊列
type Queue struct {
// 條件變量
cond *sync.Cond
// 隊列
queue []any
// 隊列是否關閉的標識
shuttingDown bool
}
Queue 有 3 個屬性:
使用指針類型的 sync.Cond 作為 Queue 的第一個屬性,因為 sync.NewCond 返回的就是指針類型。
使用一個支持任意類型的切片 []any 作為隊列,用於保存隊列中每一項元素。
最後的 bool 類型屬性 shuttingDown 用來標識隊列是否已經被關閉,為 true 表示關閉。
我們再為併發等待隊列定義一個構造函數 New:
// New 創建一個併發等待隊列
func New() *Queue {
return &Queue{
cond: sync.NewCond(&sync.Mutex{}),
}
}
接着,實現入隊方法 Add:
// Add 元素入隊,如果隊列已經關閉,則直接返回,無法入隊
func (q *Queue) Add(item any) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown { // 如果隊列已經關閉,則直接返回,不再入隊
return
}
q.queue = append(q.queue, item) // 入隊
q.cond.Signal() // 喚醒一個等待者,通知隊列中有數據了
}
當我們將元素 item 加入到隊列 q.queue 中後,就會調用 q.cond.Signal() 來喚醒一個等待者。所以 q.queue 就是我們的條件。注意,Add 方法內部的邏輯都進行了加鎖操作。
然後,就應該要實現出隊方法 Get 了:
// Get 從隊列中獲取一個元素,如果隊列為空則阻塞等待
// 第二個返回值標識隊列是否已經關閉,已關閉則返回 true,無法獲取到數據
func (q *Queue) Get() (item any, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait() // 如果隊列為空且未關閉,阻塞等待隊列中有數據時被喚醒
}
if len(q.queue) == 0 { // 如果此時隊列為空,那麼 q.shuttingDown 必然為 true,説明隊列已經被關閉了
return nil, true
}
// NOTE: 如果 queue 不為空,shuttingDown 可能為 true 也可能為 false,都繼續往下執行
// 即使標記隊列已經被關閉了,也要清空 queue
// 出隊邏輯
item = q.queue[0]
q.queue[0] = nil // 主動清除引用,幫助 GC 回收
q.queue = q.queue[1:]
return item, false
}
Get 方法返回兩個值,第一個值 item 是出隊元素,第二個值 shutdown 標識隊列是否已經關閉。
這裏在是否調用 q.cond.Wait() 進行阻塞的判斷條件是 for len(q.queue) == 0 && !q.shuttingDown,因為不止 q.queue 這一個判別條件,當隊列被關閉,也不應該阻塞在這裏,因為永遠也不會獲得元素。
這裏有一個值得注意的點,就是出隊時,我們將 q.queue[0] 置為了 nil,這樣會解除切片底層數組對 item 的引用,讓 GC 儘早對其進行回收,避免極端情況出現內存泄漏。
現在,我們已經實現了一個併發等待隊列的核心邏輯,入隊和出隊。入隊時會檢測隊列是否已經關閉,如果隊列已經關閉,則直接返回,不再入隊。出隊時會檢測隊列是否為空,同時檢測隊列是否已經關閉,如果隊列未關閉,且為空,則阻塞等待,直到有值或隊列被關閉。
其他幾個方法都比較簡單,我一併列出來:
// ShutDown 關閉隊列
func (q *Queue) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true // 標記隊列關閉
q.cond.Broadcast()
}
// ShuttingDown 隊列是否關閉
func (q *Queue) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.shuttingDown // 返回隊列是否關閉標識
}
// Len 獲取隊列長度
func (q *Queue) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue) // 返回隊列當前長度
}
至此,一個併發等待隊列就實現完成了。
我們可以為這個併發等待隊列編寫一個測試用例:
https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/queue/queue_test.go
func TestBasic(t *testing.T) {
tests := []struct {
queue *queue.Queue
}{
{
queue: queue.New(),
},
{
queue: queue.New(),
},
}
for _, test := range tests {
// If something is seriously wrong this test will never complete.
// Start producers
const producers = 50
producerWG := sync.WaitGroup{}
producerWG.Add(producers)
for i := 0; i < producers; i++ {
go func(i int) {
defer producerWG.Done()
for j := 0; j < 50; j++ {
test.queue.Add(i)
time.Sleep(time.Millisecond)
}
}(i)
}
// Start consumers
const consumers = 10
consumerWG := sync.WaitGroup{}
consumerWG.Add(consumers)
for i := 0; i < consumers; i++ {
go func(i int) {
defer consumerWG.Done()
for {
item, quit := test.queue.Get()
if item == "added after shutdown!" {
t.Errorf("Got an item added after shutdown.")
}
if quit {
return
}
t.Logf("Worker %v: processing %v", i, item)
}
}(i)
}
producerWG.Wait()
test.queue.ShutDown()
test.queue.Add("added after shutdown!")
consumerWG.Wait()
if test.queue.Len() != 0 {
t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len())
}
}
}
這裏併發啓動了 50 個生產者 goroutine,每個 goroutine 向隊列中寫入 50 個元素。啓動了 10 個消費者 goroutine,來併發的消費隊列。在生產者生產完成後,會調用 test.queue.ShutDown() 關閉隊列,然後再次嘗試向隊列中添加一個元素,等待消費者消費完成,最終判斷隊列長隊是否為 0。
執行這個測試用例,得到如下輸出:
$ go test -v -run=TestBasic
=== RUN TestBasic
queue_test.go:58: Worker 9: processing 2
queue_test.go:58: Worker 9: processing 16
...
queue_test.go:58: Worker 1: processing 36
queue_test.go:58: Worker 5: processing 13
queue_test.go:58: Worker 7: processing 24
--- PASS: TestBasic (0.26s)
PASS
ok github.com/jianghushinian/blog-go-example/sync/cond/queue 0.679s
這個輸出日誌中省略了大量的 t.Logf() 打印,不過這足以演示隊列的正確性。
更多測試用例,你可以在這裏看到:https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/queue/queue_test.go。
其實,我們實現的這個併發等待隊列,正是 Kubernetes client-go 中非常關鍵的一個組件 workqueue 的精簡版。搞懂了這個併發等待隊列的實現,再去看 workqueue 的源碼就很容易上手了,祝你好運 :)。
總結
本文對 Go 中的 sync.Cond 併發原語進行了講解,並帶你看了其源碼的實現,以及介紹瞭如何使用。
你一定要記得我們總結出來的 sync.Cond 使用套路,不要用錯。我們最終實現的併發等待隊列 Queue 是 Kubernetes client-go 中關鍵組件 workqueue 的微小實現,你也一定要掌握。
本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。
希望此文能對你有所啓發。
聯繫我
- 公眾號:Go編程世界
- 微信:jianghushinian
- 郵箱:jianghushinian007@outlook.com
- 博客:https://jianghushinian.cn
- GitHub:https://github.com/jianghushinian