本文首發於公眾號:Hunter後端
原文鏈接:Golang基礎筆記十五之sync
這一篇筆記介紹 Golang 中的 sync 模塊。
sync 包主要提供了基礎的同步原語,比如互斥鎖,讀寫鎖,等待組等,用於解決併發編程中的線程安全問題,以下是本篇筆記目錄:
- WaitGroup-等待組
- sync.Mutex-互斥鎖
- sync.RWMutex-讀寫鎖
- sync.Once-一次性執行
- sync.Pool-對象池
- sync.Cond-條件變量
- sync.Map
1、WaitGroup-等待組
前面在第十篇我們介紹 goroutine 和 channel 的時候,在使用 goroutine 的時候介紹有一段代碼如下:
package main
import (
"fmt"
"time"
)
func PrintGoroutineInfo() {
fmt.Println("msg from goroutine")
}
func main() {
go PrintGoroutineInfo()
time.Sleep(1 * time.Millisecond)
fmt.Println("msg from main")
}
在這裏,我們開啓了一個協程調用 PrintGoroutineInfo() 函數,然後使用 time.Sleep() 來等待它調用結束。
然而在開發中,我們不能確定這個函數多久才能調用完畢,也無法使用準確的 sleep 時間來等待,那麼這裏就可以使用到 sync 模塊的 WaitGroup 函數來等待一個或多個 goroutine 執行完畢。
下面是使用示例:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func SleepRandSeconds(wg *sync.WaitGroup) {
defer wg.Done()
sleepSeconds := rand.Intn(3)
fmt.Printf("sleep %d seconds\n", sleepSeconds)
time.Sleep(time.Duration(sleepSeconds) * time.Second)
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go SleepRandSeconds(&wg)
go SleepRandSeconds(&wg)
wg.Wait()
fmt.Println("函數執行完畢")
}
在這裏,我們通過 var wg sync.WaitGroup 定義了一個等待組,並通過 wg.Add(2) 表示添加了需要等待的併發數,在併發中我們將 &wg 傳入並通過 wg.Done() 減少需要等待的併發數。
在 wg.Done() 函數內部,使用 wg.Add(-1) 減少需要等待的併發數,在 main 函數中,使用 wg.Wait() 進入阻塞狀態,當等待的併發都完成後,此函數就會返回,完成等待並接着往後執行。
2、sync.Mutex-互斥鎖
1. 數據競態與互斥鎖
當多個 goroutine 併發訪問同一個共享資源,且至少有一個訪問是寫操作時,就會發生數據競態,造成的結果就是程序每次運行的結果表現可能會不一致。
比如下面的示例:
var balance int
func AddFunc() {
balance += 1
}
func main() {
for range 100 {
go AddFunc()
}
time.Sleep(5 * time.Second)
fmt.Println("balance is: ", balance)
}
多次執行上面的代碼,最終輸出的 balance 的值可能都不一致。
如果一個變量在多個 goroutine 同時訪問時,不會出現比如數據不一致或程序崩潰的情況,那麼我們就稱其是併發安全的。
我們可以使用 go run -race main.go 的方式來檢測數據競態,執行檢測後,會輸出數據競態的一些信息,比如發生在代碼的多少行,一共發生了多少次數據競態:
==================
WARNING: DATA RACE
Read at 0x000003910df8 by goroutine 7:
main.AddFunc()
/../main.go:13 +0x24
Previous write at 0x000003910df8 by goroutine 6:
main.AddFunc()
/../main.go:13 +0x3c
Goroutine 7 (running) created at:
main.main()
/../main.go:18 +0x32
Goroutine 6 (finished) created at:
main.main()
/../main.go:18 +0x32
==================
balance is: 98
Found 3 data race(s)
exit status 66
而要避免這種數據競態的發生,我們可以限制在同一時間只能有一個 goroutine 訪問同一個變量,這種方法稱為互斥機制。
我們可以通過緩衝通道和 sync.Mutex 來實現這種互斥鎖的操作。
2. 緩衝通道實現互斥鎖
我們可以通過容量為 1 的緩衝通道來實現互斥鎖的操作,保證同一時間只有一個 goroutine 訪問同一個變量,下面是修改後的代碼:
var sema = make(chan struct{}, 1)
var balance int
func AddFunc() {
sema <- struct{}{}
balance += 1
<-sema
}
func main() {
for range 100 {
go AddFunc()
}
time.Sleep(3 * time.Second)
fmt.Println("balance is: ", balance)
}
在上面這段代碼裏,我們定義了 sema 這個容量為 1 的通道,在每個 AddFunc() 併發中,對變量 balance 執行讀寫操作前,我們先往通道里寫入了一條數據,這樣其他併發在執行該函數時,由於也會先往通道里寫入數據,而這個時候通道已經滿了,所以會處於堵塞狀態,這就相當於獲取鎖。
直到 balance 寫操作完成,從通道里讀取數據,通道為空,相當於釋放鎖,這個時候其他併發才可以往通道里寫入數據重新拿到鎖。
這樣我們通過往通道里寫入和讀取數據保證了同一時間只有一個 goroutine 在對 balance 進行寫操作,從而實現互斥鎖的操作。
3. sync.Mutex
在 sync 包中,sync.Mutex 直接為我們實現了互斥鎖的操作,它的操作如下:
var mutex sync.Mutex // 互斥鎖的定義
mutex.Lock() // 獲取鎖
mutex.Unlock() // 釋放鎖
那麼使用 sync.Mutex 實現上面的邏輯,代碼如下:
var mutex sync.Mutex
var balance int
func AddFunc() {
mutex.Lock()
defer mutex.Unlock()
balance += 1
}
func main() {
for range 100 {
go AddFunc()
}
time.Sleep(3 * time.Second)
fmt.Println("balance is: ", balance)
}
3、sync.RWMutex-讀寫鎖
在上面介紹的 sync.Mutex 互斥鎖中,限制了同一時間只能有一個 goroutine 訪問某個變量,包括讀和寫,但這種情況並非是最理想的,比如在讀多寫少的場景下。
那麼 Golang 裏的 sync.RWMutex 為我們提供了讀寫鎖的操作,它會允許多個讀操作的併發,而寫操作會阻塞所有讀和寫。
讀寫鎖的基本規則如下:
- 當一個 goroutine 獲取了讀鎖後,其他 goroutine 仍然可以獲取讀鎖,但不能獲取寫鎖。
- 當一個 goroutine 獲取了寫鎖後,其他 goroutine 無論是讀鎖還是寫鎖都不能獲取,必須等待該寫鎖釋放。
- 當有寫操作在等待時,避免寫操作長期飢餓,會優先處理寫鎖請求。
下面是讀寫鎖的用法:
var rwMu sync.RWMutex
rwMu.RLock() // 獲取讀鎖
rwMu.RUnlock() // 釋放讀鎖
rwMu.Lock() // 獲取寫鎖
rwMu.Unlock() // 釋放寫鎖
下面是讀寫鎖在函數中的用法示例:
func ReadBalance() {
rwMu.RLock()
fmt.Println("get read lock")
defer rwMu.RUnlock()
fmt.Println("balance: ", balance)
}
func WriteBalance() {
rwMu.Lock()
fmt.Println("get write lock")
defer rwMu.Unlock()
balance += 1
}
4、sync.Once-一次性執行
sync.Once 可用於確保函數只被執行一次,常用於初始化操作,且可以用於延遲加載。
提供的方法是 Do(f func()),參數內容是一個需要被執行的函數 f,這個方法實現的功能是隻有在第一次被調用的時候會執行 f 函數進行初始化。
下面是該方法的使用示例:
import (
"fmt"
"sync"
)
type Config struct {
// 配置信息
}
var (
instance *Config
once sync.Once
)
func LoadConfig() {
fmt.Println("初始化配置...")
instance = &Config{}
// 加載配置的邏輯
}
func GetConfig() *Config {
once.Do(LoadConfig)
return instance
}
func main() {
c1 := GetConfig()
c2 := GetConfig()
fmt.Println(c1 == c2) // 輸出: true(同一個實例)
}
在這裏,雖然 GetConfig() 函數執行了兩遍,但是其內部的調用的 LoadConfig 函數卻只執行了一次,因為 sync.Once 會在內部記錄該函數是否已經初始化。
sync.Once 是個結構體,其結構如下:
type Once struct {
done atomic.Uint32
m Mutex
}
其中,done 字段用於記錄需要執行的函數 f 是否已經被執行,其對應的 Do() 方法內部會先根據 done 字段判斷,如果已經被執行過則直接返回,而如果沒有則會先執行一次。
而 m 字段表示的互斥鎖則用於在 Do() 方法內部調用的 doSlow() 中使用,用於確保併發情況下目標函數只被執行一次,在 f 函數執行結束後,done 參數會被置為 1,表示該函數已經被執行,這樣再次調用 Do() 方法時,判斷 done 字段的值為 1 則不會再執行此函數。
5、sync.Pool-對象池
sync.Pool,對象池,我們可以將一些生命週期短且創建成本高的對象存在其中,從而避免頻繁的創建和銷燬對象,以減少內存分配和垃圾回收壓力。
簡單地説就是複用對象。
1. 基礎用法
下面以複用一個字節緩衝區為例介紹一下對象池的基礎用法。
1) 創建對象池
創建對象池的操作如下:
var bufferPool = sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
可以看到,這裏對 sync.Pool 的 New 字段賦值了一個函數,返回的是一個字節緩衝區。
2) 從池中獲取對象
從對象池中獲取該對象的操作使用 Get() 操作:
buf := bufferPool.Get().(*bytes.Buffer)
3) 將對象放回池中
對該字節緩衝區使用完畢後可以將該對象再放回池中:
bufferPool.Put(buf)
2. 使用示例
import (
"bytes"
"fmt"
"sync"
)
var bufferPool = sync.Pool{
New: func() interface{} {
fmt.Println("create bytes buffer")
return &bytes.Buffer{}
},
}
func LogMessage(msg string) {
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
buf.Reset()
buf.WriteString(msg)
fmt.Println(buf.String())
}
func main() {
LogMessage("hello world")
LogMessage("hello world")
LogMessage("hello world")
}
在上面的代碼中,我們先定義了 bufferPool,然後在 LogMessage() 函數中,先使用 Get() 獲取該字節緩衝對象,因為這裏返回的數據是接口類型,所以這裏將其轉為了對應的類型,然後使用 buf.Reset() 重置了之前的記錄後寫入新的數據,最後使用的 defer 操作將此對象又放回了對象池。
6、sync.Cond-條件變量
sync.Cond 用於等待特定條件發生後再繼續執行,可用於生產者-消費者的模式。
創建一個條件變量,參數只有一個,那就是鎖,下面代碼裏用的是互斥鎖:
cond = sync.NewCond(&sync.Mutex{})
返回的 cond 對外暴露的字段 L 就是我們輸入的鎖。
下面用一個生產者的代碼示例來介紹 cond 的幾個相關函數。
在這裏定義了 queue 作為隊列,其中擁有需要處理的數據,queueMaxSize 字段為限制的最大隊列長度。
var (
queue []int
queueMaxSize int = 5
cond = sync.NewCond(&sync.Mutex{})
)
func Producer() {
for {
cond.L.Lock()
for len(queue) == queueMaxSize {
fmt.Println("produce queue max size, wait")
cond.Wait()
}
queue = append(queue, 1)
fmt.Println("produce queue")
cond.Signal() // 通知消費者
cond.L.Unlock()
time.Sleep(100 * time.Millisecond)
}
}
在定義的 Producer() 函數中,有一個死循環,內部先使用 cond.L.Lock() 獲取鎖,然後判斷生產的數據是否有消費者消費,如果隊列滿了的話,則進入等待。
1. cond.Wait()
在上面的代碼中,我們使用 cond.Wait() 進入了等待狀態。
在 Wait() 函數內部,先對前面的鎖進行釋放操作,然後進入阻塞狀態,直到其他 gouroutine 通過 Signal() 函數喚醒後重新獲取鎖。
2. cond.Signal()
前面往隊列裏添加數據後,通過 cond.Signal() 函數通知消費者,消費者在另一個函數中就可以被喚醒,然後進行處理,同時這個函數後面將鎖釋放 cond.L.Unlock()。
3. cond.Broadcast()
前面的 Signal() 函數是喚醒一個等待的 goroutine,cond.Broadcast() 函數則可以喚醒所有等待的 goroutine。
下面提供一下生產者-消費者的全部處理代碼:
package main
import (
"fmt"
"sync"
"time"
)
var (
queue []int
queueMaxSize int = 5
cond = sync.NewCond(&sync.Mutex{})
)
func Producer() {
for {
cond.L.Lock()
for len(queue) == queueMaxSize {
fmt.Println("produce queue max size, wait")
cond.Wait()
}
queue = append(queue, 1)
fmt.Println("produce queue")
cond.Signal() // 通知消費者
cond.L.Unlock()
time.Sleep(100 * time.Millisecond)
}
}
func Consumer() {
for {
cond.L.Lock()
for len(queue) == 0 {
fmt.Println("wait for produce")
cond.Wait() // 等待並釋放鎖
}
fmt.Println("consume queue")
item := queue[0]
queue = queue[1:]
cond.Signal() // 通知生產者
cond.L.Unlock()
ProcessItem(item)
}
}
func ProcessItem(i int) {
fmt.Println("process i: ", i)
}
func main() {
go Producer()
go Consumer()
time.Sleep(1 * time.Second)
}
7、sync.Map
sync 模塊提供了 sync.Map 用來存儲鍵值對,但是和之前介紹的 map 不一樣的是,sync.Map 是併發安全的,而且無需初始化,並且在操作方法上與原來的 map 不一樣。
1. 併發安全
原生的 map 是非併發安全的,如果多個 goroutine 對其進行同時讀寫會觸發錯誤,比如下面的操作:
import (
"fmt"
"time"
)
var originMap = make(map[string]int)
func UpdateMapKey() {
originMap["a"] += 1
}
func GetMapKey() {
a := originMap["a"]
fmt.Println(a)
}
func main() {
originMap["a"] = 0
for range 100 {
go UpdateMapKey()
go GetMapKey()
}
time.Sleep(1 * time.Second)
fmt.Println("originMap: ", originMap)
}
但是 sync.Map 是併發安全的,內部會通過互斥鎖的操作允許多個 goroutine 安全地讀寫,下面是使用 sync.Map 對上面邏輯的改寫,後面我們會具體介紹其操作方法:
import (
"fmt"
"sync"
"time"
)
var originMap sync.Map
func UpdateMapKey() {
for {
oldValue, loaded := originMap.Load("a")
if !loaded {
if _, ok := originMap.LoadOrStore("a", 1); ok {
return
}
} else {
newValue := oldValue.(int) + 1
if originMap.CompareAndSwap("a", oldValue, newValue) {
return
}
}
}
}
func GetMapKey() {
a, _ := originMap.Load("a")
fmt.Println(a)
}
func main() {
originMap.Store("a", 0)
for range 100 {
go UpdateMapKey()
go GetMapKey()
}
time.Sleep(1 * time.Second)
a, _ := originMap.Load("a")
fmt.Println("originMap: ", a)
}
2. 初始化
原生的 map 進行初始化,有下面兩種操作方法:
var originMap = make(map[string]int)
var originMap = map[string]int{}
sync.Map 可以直接聲明使用:
var m sync.Map
m.Store("a", 0)
3. 操作方法
這裏先介紹 sync.Map 增刪改查的基礎操作:
1) 增
增加一個 key 的操作如下:
originMap.Store("a", 1)
2) 刪
刪除一個 key 的操作如下:
originMap.Delete("a", 1)
3) 改
修改操作還是可以用 Store() 方法,而且可以修改為不同數據類型:
m.Store("a", "123")
4) 查
查詢操作可以使用 Load() 方法,返回對應的 value 值以及是否存在:
m.Store("a", 1)
v, ok := m.Load("a")
if ok {
fmt.Printf("exist value:%v\n", v.(int))
} else {
fmt.Printf("key not exists")
}
5) 遍歷
遍歷操作如下:
m.Store("a", 1)
m.Range(func(key, value any) bool {
fmt.Println(key, value)
return true
})
4. 原子性條件操作
上面的這些方法可以實現基礎的增刪改查操作,但是如果我們有一個需求,比如前面的獲取一個 key 的 value,然後在原值的基礎上 +1 再存入,大概邏輯如下:
v, ok := m.Load(key)
v = v.(int)
v +=1
m.Store(key, v)
但是在這個操作中,如果有其他 goroutine 已經修改了 v 的值,那麼我們這裏的操作就相當於污染了源數據,而為了避免這個可能,我們可以使用一些原子性條件操作,以實現併發操作。
1) CompareAndSwap()
CompareAndSwap() 是一個更新操作,傳入 3 個參數,key,oldValue 和 newValue,僅當 key 的結果為 oldValue 的時候,將結果更新為 newValue,使用示例如下:
key := "a"
m.Store(key, 1)
swapped := m.CompareAndSwap(key, 1, 2)
fmt.Printf("當 value 為 1 的時候,將 value 從 1 修改為 2, 是否更新結果 %v\n", swapped)
swapped = m.CompareAndSwap(key, 1, 3)
fmt.Printf("當 value 為 1 的時候,將 value 從 1 修改為 3, 是否更新結果 %v\n", swapped)
所以在上面我們要對結果進行 +1 的代碼操作為:
newValue := oldValue.(int) + 1
if originMap.CompareAndSwap("a", oldValue, newValue) {
return
}
2) CompareAndDelete()
CompareAndDelete 是一個原子性的刪除操作,接受兩個參數,key 和 oldValue,僅當 key 的值為 oldValue 時刪除該 key,返回結果為是否刪除:
key := "a"
m.Store(key, 1)
deleted := m.CompareAndDelete(key, 1)
if deleted {
fmt.Printf("當 key 的 value 為 %v 時,刪除\n", 1)
} else {
fmt.Printf(" key 的 value 不為 %v 時,不執行刪除\n", 1)
}
3) LoadAndDelete()
LoadAndDelete 表示是否加載某個 key 的值並刪除該 key,無論該 key 是否存在,參數為 key,返回值為 value 和是否存在該 key:
key := "a"
m.Store(key, 1)
value, loaded := m.LoadAndDelete(key)
fmt.Printf("是否存在 %v, value 為 %v\n", loaded, value)
value, loaded = m.LoadAndDelete(key)
fmt.Printf("是否存在 %v, value 為 %v\n", loaded, value)
4) LoadOrStore()
LoadOrStore 方法為不存在 key 則存入,存在的話則返回該值:
key := "a"
value, loaded := m.LoadOrStore(key, 1)
fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)
value, loaded = m.LoadOrStore(key, 1)
fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)