博客 / 詳情

返回

Golang基礎筆記十五之sync

本文首發於公眾號:Hunter後端

原文鏈接:Golang基礎筆記十五之sync

這一篇筆記介紹 Golang 中的 sync 模塊。

sync 包主要提供了基礎的同步原語,比如互斥鎖,讀寫鎖,等待組等,用於解決併發編程中的線程安全問題,以下是本篇筆記目錄:

  1. WaitGroup-等待組
  2. sync.Mutex-互斥鎖
  3. sync.RWMutex-讀寫鎖
  4. sync.Once-一次性執行
  5. sync.Pool-對象池
  6. sync.Cond-條件變量
  7. sync.Map

1、WaitGroup-等待組

前面在第十篇我們介紹 goroutine 和 channel 的時候,在使用 goroutine 的時候介紹有一段代碼如下:

package main

import (
    "fmt"
    "time"
)

func PrintGoroutineInfo() {
    fmt.Println("msg from goroutine")
}

func main() {
    go PrintGoroutineInfo()
    time.Sleep(1 * time.Millisecond)
    fmt.Println("msg from main")
}

在這裏,我們開啓了一個協程調用 PrintGoroutineInfo() 函數,然後使用 time.Sleep() 來等待它調用結束。

然而在開發中,我們不能確定這個函數多久才能調用完畢,也無法使用準確的 sleep 時間來等待,那麼這裏就可以使用到 sync 模塊的 WaitGroup 函數來等待一個或多個 goroutine 執行完畢。

下面是使用示例:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func SleepRandSeconds(wg *sync.WaitGroup) {
    defer wg.Done()
    sleepSeconds := rand.Intn(3)
    fmt.Printf("sleep %d seconds\n", sleepSeconds)
    time.Sleep(time.Duration(sleepSeconds) * time.Second)
}

func main() {
    var wg sync.WaitGroup

    wg.Add(2)
    go SleepRandSeconds(&wg)
    go SleepRandSeconds(&wg)

    wg.Wait()
    fmt.Println("函數執行完畢")
}

在這裏,我們通過 var wg sync.WaitGroup 定義了一個等待組,並通過 wg.Add(2) 表示添加了需要等待的併發數,在併發中我們將 &wg 傳入並通過 wg.Done() 減少需要等待的併發數。

wg.Done() 函數內部,使用 wg.Add(-1) 減少需要等待的併發數,在 main 函數中,使用 wg.Wait() 進入阻塞狀態,當等待的併發都完成後,此函數就會返回,完成等待並接着往後執行。

2、sync.Mutex-互斥鎖

1. 數據競態與互斥鎖

當多個 goroutine 併發訪問同一個共享資源,且至少有一個訪問是寫操作時,就會發生數據競態,造成的結果就是程序每次運行的結果表現可能會不一致。

比如下面的示例:

var balance int

func AddFunc() {
    balance += 1
}
func main() {
    for range 100 {
        go AddFunc()
    }
    time.Sleep(5 * time.Second)
    fmt.Println("balance is: ", balance)
}

多次執行上面的代碼,最終輸出的 balance 的值可能都不一致。

如果一個變量在多個 goroutine 同時訪問時,不會出現比如數據不一致或程序崩潰的情況,那麼我們就稱其是併發安全的。

我們可以使用 go run -race main.go 的方式來檢測數據競態,執行檢測後,會輸出數據競態的一些信息,比如發生在代碼的多少行,一共發生了多少次數據競態:

==================
WARNING: DATA RACE
Read at 0x000003910df8 by goroutine 7:
  main.AddFunc()
      /../main.go:13 +0x24

Previous write at 0x000003910df8 by goroutine 6:
  main.AddFunc()
      /../main.go:13 +0x3c

Goroutine 7 (running) created at:
  main.main()
      /../main.go:18 +0x32

Goroutine 6 (finished) created at:
  main.main()
   /../main.go:18 +0x32
    
==================
balance is:  98
Found 3 data race(s)
exit status 66

而要避免這種數據競態的發生,我們可以限制在同一時間只能有一個 goroutine 訪問同一個變量,這種方法稱為互斥機制。

我們可以通過緩衝通道和 sync.Mutex 來實現這種互斥鎖的操作。

2. 緩衝通道實現互斥鎖

我們可以通過容量為 1 的緩衝通道來實現互斥鎖的操作,保證同一時間只有一個 goroutine 訪問同一個變量,下面是修改後的代碼:

var sema = make(chan struct{}, 1)
var balance int

func AddFunc() {
    sema <- struct{}{}
    balance += 1
    <-sema
}
func main() {
    for range 100 {
        go AddFunc()
    }

    time.Sleep(3 * time.Second)
    fmt.Println("balance is: ", balance)
}

在上面這段代碼裏,我們定義了 sema 這個容量為 1 的通道,在每個 AddFunc() 併發中,對變量 balance 執行讀寫操作前,我們先往通道里寫入了一條數據,這樣其他併發在執行該函數時,由於也會先往通道里寫入數據,而這個時候通道已經滿了,所以會處於堵塞狀態,這就相當於獲取鎖。

直到 balance 寫操作完成,從通道里讀取數據,通道為空,相當於釋放鎖,這個時候其他併發才可以往通道里寫入數據重新拿到鎖。

這樣我們通過往通道里寫入和讀取數據保證了同一時間只有一個 goroutine 在對 balance 進行寫操作,從而實現互斥鎖的操作。

3. sync.Mutex

在 sync 包中,sync.Mutex 直接為我們實現了互斥鎖的操作,它的操作如下:

var mutex sync.Mutex  // 互斥鎖的定義
mutex.Lock()  // 獲取鎖
mutex.Unlock()  // 釋放鎖

那麼使用 sync.Mutex 實現上面的邏輯,代碼如下:

var mutex sync.Mutex
var balance int

func AddFunc() {
    mutex.Lock()
    defer mutex.Unlock()
    balance += 1
}
func main() {
    for range 100 {
        go AddFunc()
    }

    time.Sleep(3 * time.Second)
    fmt.Println("balance is: ", balance)
}

3、sync.RWMutex-讀寫鎖

在上面介紹的 sync.Mutex 互斥鎖中,限制了同一時間只能有一個 goroutine 訪問某個變量,包括讀和寫,但這種情況並非是最理想的,比如在讀多寫少的場景下。

那麼 Golang 裏的 sync.RWMutex 為我們提供了讀寫鎖的操作,它會允許多個讀操作的併發,而寫操作會阻塞所有讀和寫。

讀寫鎖的基本規則如下:

  1. 當一個 goroutine 獲取了讀鎖後,其他 goroutine 仍然可以獲取讀鎖,但不能獲取寫鎖。
  2. 當一個 goroutine 獲取了寫鎖後,其他 goroutine 無論是讀鎖還是寫鎖都不能獲取,必須等待該寫鎖釋放。
  3. 當有寫操作在等待時,避免寫操作長期飢餓,會優先處理寫鎖請求。

下面是讀寫鎖的用法:

var rwMu sync.RWMutex

rwMu.RLock()  // 獲取讀鎖
rwMu.RUnlock()  // 釋放讀鎖

rwMu.Lock()  // 獲取寫鎖
rwMu.Unlock()  // 釋放寫鎖

下面是讀寫鎖在函數中的用法示例:

func ReadBalance() {
    rwMu.RLock()
    fmt.Println("get read lock")
    defer rwMu.RUnlock()
    fmt.Println("balance: ", balance)
}

func WriteBalance() {
    rwMu.Lock()
    fmt.Println("get write lock")
    defer rwMu.Unlock()
    balance += 1
}

4、sync.Once-一次性執行

sync.Once 可用於確保函數只被執行一次,常用於初始化操作,且可以用於延遲加載。

提供的方法是 Do(f func()),參數內容是一個需要被執行的函數 f,這個方法實現的功能是隻有在第一次被調用的時候會執行 f 函數進行初始化。

下面是該方法的使用示例:

import (
    "fmt"
    "sync"
)

type Config struct {
    // 配置信息
}

var (
    instance *Config
    once     sync.Once
)

func LoadConfig() {
    fmt.Println("初始化配置...")
    instance = &Config{}
    // 加載配置的邏輯
}

func GetConfig() *Config {
    once.Do(LoadConfig)
    return instance
}

func main() {
    c1 := GetConfig()
    c2 := GetConfig()
    fmt.Println(c1 == c2) // 輸出: true(同一個實例)
}

在這裏,雖然 GetConfig() 函數執行了兩遍,但是其內部的調用的 LoadConfig 函數卻只執行了一次,因為 sync.Once 會在內部記錄該函數是否已經初始化。

sync.Once 是個結構體,其結構如下:

type Once struct {
    done atomic.Uint32
    m    Mutex
}

其中,done 字段用於記錄需要執行的函數 f 是否已經被執行,其對應的 Do() 方法內部會先根據 done 字段判斷,如果已經被執行過則直接返回,而如果沒有則會先執行一次。

m 字段表示的互斥鎖則用於在 Do() 方法內部調用的 doSlow() 中使用,用於確保併發情況下目標函數只被執行一次,在 f 函數執行結束後,done 參數會被置為 1,表示該函數已經被執行,這樣再次調用 Do() 方法時,判斷 done 字段的值為 1 則不會再執行此函數。

5、sync.Pool-對象池

sync.Pool,對象池,我們可以將一些生命週期短且創建成本高的對象存在其中,從而避免頻繁的創建和銷燬對象,以減少內存分配和垃圾回收壓力。

簡單地説就是複用對象。

1. 基礎用法

下面以複用一個字節緩衝區為例介紹一下對象池的基礎用法。

1) 創建對象池

創建對象池的操作如下:

var bufferPool = sync.Pool{
    New: func() interface{} {
        return &bytes.Buffer{}
    },
}

可以看到,這裏對 sync.PoolNew 字段賦值了一個函數,返回的是一個字節緩衝區。

2) 從池中獲取對象

從對象池中獲取該對象的操作使用 Get() 操作:

buf := bufferPool.Get().(*bytes.Buffer)
3) 將對象放回池中

對該字節緩衝區使用完畢後可以將該對象再放回池中:

bufferPool.Put(buf)

2. 使用示例

import (
    "bytes"
    "fmt"
    "sync"
)

var bufferPool = sync.Pool{
    New: func() interface{} {
        fmt.Println("create bytes buffer")
        return &bytes.Buffer{}
    },
}

func LogMessage(msg string) {
    buf := bufferPool.Get().(*bytes.Buffer)
    defer bufferPool.Put(buf)

    buf.Reset()
    buf.WriteString(msg)
    fmt.Println(buf.String())
}

func main() {
    LogMessage("hello world")
    LogMessage("hello world")
    LogMessage("hello world")
}

在上面的代碼中,我們先定義了 bufferPool,然後在 LogMessage() 函數中,先使用 Get() 獲取該字節緩衝對象,因為這裏返回的數據是接口類型,所以這裏將其轉為了對應的類型,然後使用 buf.Reset() 重置了之前的記錄後寫入新的數據,最後使用的 defer 操作將此對象又放回了對象池。

6、sync.Cond-條件變量

sync.Cond 用於等待特定條件發生後再繼續執行,可用於生產者-消費者的模式。

創建一個條件變量,參數只有一個,那就是鎖,下面代碼裏用的是互斥鎖:

cond = sync.NewCond(&sync.Mutex{})

返回的 cond 對外暴露的字段 L 就是我們輸入的鎖。

下面用一個生產者的代碼示例來介紹 cond 的幾個相關函數。

在這裏定義了 queue 作為隊列,其中擁有需要處理的數據,queueMaxSize 字段為限制的最大隊列長度。

var (
    queue        []int
    queueMaxSize int = 5
    cond             = sync.NewCond(&sync.Mutex{})
)

func Producer() {
    for {
        cond.L.Lock()
        for len(queue) == queueMaxSize {
            fmt.Println("produce queue max size, wait")
            cond.Wait()
        }
        queue = append(queue, 1)
        fmt.Println("produce queue")
        cond.Signal() // 通知消費者
        cond.L.Unlock()
        time.Sleep(100 * time.Millisecond)
    }
}

在定義的 Producer() 函數中,有一個死循環,內部先使用 cond.L.Lock() 獲取鎖,然後判斷生產的數據是否有消費者消費,如果隊列滿了的話,則進入等待。

1. cond.Wait()

在上面的代碼中,我們使用 cond.Wait() 進入了等待狀態。

Wait() 函數內部,先對前面的鎖進行釋放操作,然後進入阻塞狀態,直到其他 gouroutine 通過 Signal() 函數喚醒後重新獲取鎖。

2. cond.Signal()

前面往隊列裏添加數據後,通過 cond.Signal() 函數通知消費者,消費者在另一個函數中就可以被喚醒,然後進行處理,同時這個函數後面將鎖釋放 cond.L.Unlock()

3. cond.Broadcast()

前面的 Signal() 函數是喚醒一個等待的 goroutine,cond.Broadcast() 函數則可以喚醒所有等待的 goroutine。

下面提供一下生產者-消費者的全部處理代碼:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    queue        []int
    queueMaxSize int = 5
    cond             = sync.NewCond(&sync.Mutex{})
)

func Producer() {
    for {
        cond.L.Lock()
        for len(queue) == queueMaxSize {
            fmt.Println("produce queue max size, wait")
            cond.Wait()
        }
        queue = append(queue, 1)
        fmt.Println("produce queue")
        cond.Signal() // 通知消費者
        cond.L.Unlock()
        time.Sleep(100 * time.Millisecond)
    }
}

func Consumer() {
    for {
        cond.L.Lock()
        for len(queue) == 0 {
            fmt.Println("wait for produce")
            cond.Wait() // 等待並釋放鎖
        }
        fmt.Println("consume queue")
        item := queue[0]
        queue = queue[1:]
        cond.Signal() // 通知生產者
        cond.L.Unlock()
        ProcessItem(item)
    }
}

func ProcessItem(i int) {
    fmt.Println("process i: ", i)
}

func main() {
    go Producer()
    go Consumer()
    time.Sleep(1 * time.Second)
}

7、sync.Map

sync 模塊提供了 sync.Map 用來存儲鍵值對,但是和之前介紹的 map 不一樣的是,sync.Map 是併發安全的,而且無需初始化,並且在操作方法上與原來的 map 不一樣。

1. 併發安全

原生的 map 是非併發安全的,如果多個 goroutine 對其進行同時讀寫會觸發錯誤,比如下面的操作:

import (
    "fmt"
    "time"
)

var originMap = make(map[string]int)

func UpdateMapKey() {
    originMap["a"] += 1
}

func GetMapKey() {
    a := originMap["a"]
    fmt.Println(a)
}

func main() {
    originMap["a"] = 0
    for range 100 {
        go UpdateMapKey()
        go GetMapKey()
    }
    time.Sleep(1 * time.Second)
    fmt.Println("originMap: ", originMap)
}

但是 sync.Map 是併發安全的,內部會通過互斥鎖的操作允許多個 goroutine 安全地讀寫,下面是使用 sync.Map 對上面邏輯的改寫,後面我們會具體介紹其操作方法:

import (
    "fmt"
    "sync"
    "time"
)

var originMap sync.Map

func UpdateMapKey() {
    for {
        oldValue, loaded := originMap.Load("a")
        if !loaded {
            if _, ok := originMap.LoadOrStore("a", 1); ok {
                return
            }
        } else {
            newValue := oldValue.(int) + 1
            if originMap.CompareAndSwap("a", oldValue, newValue) {
                return
            }
        }
    }
}

func GetMapKey() {
    a, _ := originMap.Load("a")
    fmt.Println(a)
}

func main() {
    originMap.Store("a", 0)
    for range 100 {
        go UpdateMapKey()
        go GetMapKey()
    }
    time.Sleep(1 * time.Second)
    a, _ := originMap.Load("a")
    fmt.Println("originMap: ", a)
}

2. 初始化

原生的 map 進行初始化,有下面兩種操作方法:

var originMap = make(map[string]int)
var originMap = map[string]int{}

sync.Map 可以直接聲明使用:

var m sync.Map
m.Store("a", 0)

3. 操作方法

這裏先介紹 sync.Map 增刪改查的基礎操作:

1) 增

增加一個 key 的操作如下:

originMap.Store("a", 1)
2) 刪

刪除一個 key 的操作如下:

originMap.Delete("a", 1)
3) 改

修改操作還是可以用 Store() 方法,而且可以修改為不同數據類型:

m.Store("a", "123")
4) 查

查詢操作可以使用 Load() 方法,返回對應的 value 值以及是否存在:

m.Store("a", 1)
v, ok := m.Load("a")
if ok {
    fmt.Printf("exist value:%v\n", v.(int))
} else {
    fmt.Printf("key not exists")
}
5) 遍歷

遍歷操作如下:

m.Store("a", 1)
m.Range(func(key, value any) bool {
    fmt.Println(key, value)
    return true
})

4. 原子性條件操作

上面的這些方法可以實現基礎的增刪改查操作,但是如果我們有一個需求,比如前面的獲取一個 key 的 value,然後在原值的基礎上 +1 再存入,大概邏輯如下:

v, ok := m.Load(key)
v = v.(int)
v +=1
m.Store(key, v)

但是在這個操作中,如果有其他 goroutine 已經修改了 v 的值,那麼我們這裏的操作就相當於污染了源數據,而為了避免這個可能,我們可以使用一些原子性條件操作,以實現併發操作。

1) CompareAndSwap()

CompareAndSwap() 是一個更新操作,傳入 3 個參數,key,oldValue 和 newValue,僅當 key 的結果為 oldValue 的時候,將結果更新為 newValue,使用示例如下:

key := "a"
m.Store(key, 1)

swapped := m.CompareAndSwap(key, 1, 2)
fmt.Printf("當 value 為 1 的時候,將 value 從 1 修改為 2, 是否更新結果 %v\n", swapped)
swapped = m.CompareAndSwap(key, 1, 3)
fmt.Printf("當 value 為 1 的時候,將 value 從 1 修改為 3, 是否更新結果 %v\n", swapped)

所以在上面我們要對結果進行 +1 的代碼操作為:

newValue := oldValue.(int) + 1
if originMap.CompareAndSwap("a", oldValue, newValue) {
    return
}
2) CompareAndDelete()

CompareAndDelete 是一個原子性的刪除操作,接受兩個參數,key 和 oldValue,僅當 key 的值為 oldValue 時刪除該 key,返回結果為是否刪除:

key := "a"
m.Store(key, 1)
deleted := m.CompareAndDelete(key, 1)
if deleted {
    fmt.Printf("當 key 的 value 為 %v 時,刪除\n", 1)
} else {
    fmt.Printf(" key 的 value 不為 %v 時,不執行刪除\n", 1)
}
3) LoadAndDelete()

LoadAndDelete 表示是否加載某個 key 的值並刪除該 key,無論該 key 是否存在,參數為 key,返回值為 value 和是否存在該 key:

key := "a"
m.Store(key, 1)

value, loaded := m.LoadAndDelete(key)
fmt.Printf("是否存在 %v, value 為 %v\n", loaded, value)
value, loaded = m.LoadAndDelete(key)
fmt.Printf("是否存在 %v, value 為 %v\n", loaded, value)
4) LoadOrStore()

LoadOrStore 方法為不存在 key 則存入,存在的話則返回該值:

key := "a"

value, loaded := m.LoadOrStore(key, 1)
fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)
value, loaded = m.LoadOrStore(key, 1)
fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)
user avatar hebeiniunai 頭像 c9eulmav 頭像 u_15288318 頭像
3 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.