动态

详情 返回 返回

Go 併發編程:如何實現一個併發安全的 map - 动态 详情

上週發佈的文章「Go 併發控制:sync.Map 詳解」有讀者反饋説我寫的太難了,上來就挑戰源碼,對新手不夠友好。所以這篇文章算作補充,從入門到進階的順序講解一下在 Go 中如何自己實現一個併發安全的 map

內置 map

首先,我們來測試一下 Go 語言內置 map 併發安全性,示例如下:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/map/concurrent-map/main.go
package main

import "fmt"

// NOTE: Go 內置 map 不支持併發讀寫

func main() {
    m := make(map[string]int)

    // 併發讀寫 map
    go func() {
        for {
            m["k"] = 1
            fmt.Println("set k:", 1)
        }
    }()

    go func() {
        for {
            v, _ := m["k"]
            fmt.Println("read k:", v)
        }
    }()

    select {}
}

這裏開了兩個 goroutine 分別用來讀寫 map 對象變量 m,並且每個 goroutine 中都採用無限循環來不停的讀寫 m,這樣就會造成大量併發操作。程序最後使用空的 select{} 語句來實現阻塞。

執行示例代碼,你將得到 panic,輸出日誌太長我就不貼出來了,主要報錯信息為 fatal error: concurrent map read and map write。這提示我們存在併發讀寫 map 的操作,所以説 Go 內置 map 不是併發安全的。

實現併發安全 map

要實現 map 的併發安全,我們首先想到的就是互斥鎖 sync.Mutex。不過 Go 語言還在 sync.Mutex 的基礎上提供了讀寫鎖 sync.RWMutex,能夠更大的程度的提升讀多寫少場景的併發性能,所以我們選用讀寫鎖來實現併發安全的 map

實現實例如下:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/map/concurrent-map/mutex_map.go
type RWMutexMap struct {
    rw sync.RWMutex
    m  map[string]int
}

func NewRWMutexMap() *RWMutexMap {
    return &RWMutexMap{
        m: make(map[string]int),
    }
}

func (m *RWMutexMap) Set(key string, v int) {
    m.rw.Lock()
    defer m.rw.Unlock()
    m.m[key] = v
}

func (m *RWMutexMap) Get(key string) (int, bool) {
    m.rw.RLock()
    defer m.rw.RUnlock()
    v, ok := m.m[key]
    return v, ok
}

func (m *RWMutexMap) Del(key string) {
    m.rw.Lock()
    defer m.rw.Unlock()
    delete(m.m, key)
}

這裏我們定義 RWMutexMap 結構用來表示併發安全的 map,其包含兩個屬性,字段 rw 即為讀寫鎖,用來保證併發安全,m 字段就是普通的 map,用來存取數據。

Set 方法和 Del 方法屬於寫操作,所以在修改 m 字段數據時都加了寫鎖 m.rw.Lock(),並在操作完成後釋放寫鎖 defer m.rw.Unlock()。而 Get 方法屬於讀操作,所以在獲取 m 字段數據時加了讀鎖 m.rw.RLock(),並在獲取完成後釋放讀鎖 defer m.rw.RUnlock()

這裏有必要解釋一下讀鎖和寫鎖的區別:

寫鎖就是互斥鎖,不可以被多個 goroutine 同時持有,比如在調用寫鎖 m.rw.Lock() 時,如果有讀鎖或寫鎖已經被其他 goroutine 持有,則當前 goroutine 會被阻塞。

讀鎖類似可重入鎖,多個 goroutine 可以同時持有讀鎖,比如在調用讀鎖 m.rw.RLock() 時,如果有其他 goroutine 也持有讀鎖,則當前 goroutine 還可以繼續獲取這把讀鎖,但此時如果有 goroutine 嘗試獲取寫鎖則會被阻塞。

所以説讀寫鎖對讀多寫少的場景更加適用,讀寫鎖允許多個 goroutine 同時持有讀鎖,但只允許一個 goroutine 持有寫鎖。

此時,我們可以寫出同樣的測試代碼,來測試這個使用讀寫鎖實現的併發安全 map 的效果:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/map/concurrent-map/mutex_map_test.go
// NOTE: 這個測試不會終止
func TestRWMutexMap(t *testing.T) {
    m := NewRWMutexMap()

    // 併發讀寫 map
    go func() {
        for {
            m.Set("k", 1)
            fmt.Println("set k:", 1)
        }
    }()

    go func() {
        for {
            v, _ := m.Get("k")
            fmt.Println("read k:", v)
        }
    }()

    select {}
}

這個示例不會終止,會交替打印 readset 的日誌。

沒錯,實現一個併發安全的 map 就是如此的簡單,這麼一對比,sync.Map 的實現確實很複雜。

分片 map

我們知道加鎖會使程序的性能急劇下降,讀寫鎖能夠在讀多寫少的場景中降低互斥鎖對性能的影響。那麼我們實現的併發安全 map 是否還有可優化空間呢?答案是肯定的。

我們可以採用分片的思想,將一個大的 map 拆成多個更小的 map,然後讓每個小 map 持有各自的鎖,以此來減小鎖的粒度,減少併發情況下加鎖的次數。

實現示例如下:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/map/concurrent-map/sharding_map.go
package main

import (
    "hash/maphash"
    "sync"
)

var seed = maphash.MakeSeed()

func hashKey(key string) uint64 {
    return maphash.String(seed, key)
}

type ShardingMap struct {
    locks  []sync.RWMutex
    shards []map[string]int
}

func NewShardingMap(size int) *ShardingMap {
    sm := &ShardingMap{
        locks:  make([]sync.RWMutex, size),
        shards: make([]map[string]int, size),
    }
    for i := 0; i < size; i++ {
        sm.shards[i] = make(map[string]int)
    }
    return sm
}

func (m *ShardingMap) getShardIdx(key string) uint64 {
    hash := hashKey(key)
    return hash % uint64(len(m.shards))
}

func (m *ShardingMap) Set(key string, value int) {
    idx := m.getShardIdx(key)
    m.locks[idx].Lock()
    defer m.locks[idx].Unlock()
    m.shards[idx][key] = value
}

func (m *ShardingMap) Get(key string) (int, bool) {
    idx := m.getShardIdx(key)
    m.locks[idx].RLock()
    defer m.locks[idx].RUnlock()
    value, ok := m.shards[idx][key]
    return value, ok
}

func (m *ShardingMap) Del(key string) {
    idx := m.getShardIdx(key)
    m.locks[idx].Lock()
    defer m.locks[idx].Unlock()
    delete(m.shards[idx], key)
}

ShardingMap 表示一個採用分片機制的併發安全 map,它有兩個屬性 locksshards,它們都是切片類型,locks 用來保存每個分片的讀寫鎖,shards 用來保存每個分片的數據。

通過函數 NewShardingMap 可以創建一個 ShardingMap 對象,它接收一個參數 size 用來初始化分片數量,即 locksshards 的切片長度。在構造 ShardingMap 對象時會為 shards 中的每個 map 進行初始化,而由於 sync.RWMutex 零值可用,所以不必對 locks 中的每把讀寫鎖進行初始化操作。

getShardIdx 方法非常重要,會被所有方法使用,它可以計算給定 key 所在的分片索引。首先它調用輔助函數 hashKey 對給定 key 計算出一個 hash 值,然後用這個 hash 值對分片長度做取模運算,這樣就能得到一個值必然會落到某個分片的索引。這也是分片 map 的精髓所在,給定一個 key 為其計算一個分片索引值,然後將其存入對應的分片,這個過程不用加鎖。如果 hashKey 函數實現的足夠好,那麼每個分片中分佈的數據量也是比較均勻的。

接下來就是 map 的基本操作方法 SetGetDel 的實現了,它們內部都會先調用 m.getShardIdx(key) 獲取分片索引,然後對指定的分片進行加鎖,最後再操作對應分片中的數據。這樣,當多個 goroutine 併發讀寫 map 時,只要對應的 key 不在同一個分片中,那麼就能降低加鎖次數,從而提升程序性能。只有對應的 key 計算後在同一個分片中,多個 goroutine 之間才需要搶鎖。

看到這,你應該能夠想到,如果我們要用 map 存儲大量數據,則使用分片機制的併發安全 map 實現效果更好。

符合 Go 哲學的併發安全 map?

文章寫到這裏,其實 Go 中常見的實現併發安全 map 的方式基本就介紹完了。不過,這時候好像缺了點什麼,Go 併發之道大家可是更推薦使用 channel 而非互斥鎖,那麼我們何不用 channel 來實現一個符合 Go 哲學的併發安全 map 呢?

實現示例如下:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/map/concurrent-map/channel_map.go
type ChannelMap struct {
    cmd chan command
    m   map[string]int
}

type command struct {
    action string // "get", "set", "delete"
    key    string
    value  int
    result chan<- result
}

type result struct {
    value int
    ok    bool
}

func NewChannelMap() *ChannelMap {
    sm := &ChannelMap{
        cmd: make(chan command),
        m:   make(map[string]int),
    }
    go sm.run()
    return sm
}

func (m *ChannelMap) run() {
    for cmd := range m.cmd {
        switch cmd.action {
        case "get":
            value, ok := m.m[cmd.key]
            cmd.result <- result{value, ok}
        case "set":
            m.m[cmd.key] = cmd.value
        case "delete":
            delete(m.m, cmd.key)
        }
    }
}

func (m *ChannelMap) Set(key string, value int) {
    m.cmd <- command{action: "set", key: key, value: value}
}

func (m *ChannelMap) Get(key string) (int, bool) {
    res := make(chan result)
    m.cmd <- command{action: "get", key: key, result: res}
    r := <-res
    return r.value, r.ok
}

func (m *ChannelMap) Del(key string) {
    m.cmd <- command{action: "delete", key: key}
}

ChannelMap 結構體表示我們用 channel 實現的併發安全 map,它有兩個屬性,cmd 字段是一個 chan command 類型用來傳遞操作指令和操作結果,m 字段用來存儲 map 數據。

command 結構是用來在 channel 中傳遞信息的,action 表示操作指令,比如 getsetdelete 操作,key 用來記錄我們在調用 SetGetDel 方法時要操作的 map 中的 key,而 value 則用來記錄調用 Set 方法時指定的 value 值,最後一個 result 字段用來記錄調用 SetGetDel 方法時返回的結果。

構造函數 NewChannelMap 可以創建一個 ChannelMap 對象,其內部不僅會初始化一個 ChannelMap 對象,還會調用 go sm.run() 啓動一個新的 goroutine 來執行後台任務。

這個後台任務就是 ChannelMap 的核心所在,在 run 方法內部,使用 for 循環來不斷的從 m.cmd 這個 channel 中獲取操作指令,根據 cmd.action 我們能夠判斷出用户是調用了 SetGetDel 中的哪個方法,接着做相應的操作。比如如果是調用了 Get 方法,就使用 m.m[cmd.key]map 中獲取 key 對應的值 value,然後構造 result 對象,並將其通過 cmd.result 這個 channel 傳遞給調用方。

接下來看到 SetGetDel 這幾個方法的實現,你就能徹底理解 ChannelMap 的設計思路了。還是拿 Get 方法舉例,這裏先是構造了一個 chan result 對象用來保存結果,並將它和給定的 key 一起構造出一個 command 對象傳遞給 m.cmd。當 command 被傳遞過去後,run 方法內部的 for 循環就能拿到這個 command 對象了,然後將結果通過我們構造的 chan result 對象再傳遞回來,一次 Get 操作就完成了。

其他兩個方法同理,這裏的核心就是通過 m.cmd 這個無緩衝的 channel 實現併發控制,來保證多個 goroutine 同時操作 map 時的併發安全。

如何選擇

現在,我們掌握了多種併發安全 map 的實現,比如互斥鎖、讀寫鎖、分片以及 channel 實現,還有不要忘記 Go 內置的 sync.Map 實現。那麼,我們到底該如何選擇使用哪種實現呢?

我們可以對其進行一輪基準測試,來測試下不同 map 實現的性能差距。示例如下:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/map/concurrent-map/sharding_map_test.go
// 讀寫次數一致
func BenchmarkSyncMap(b *testing.B) {
    var m sync.Map
    var wg sync.WaitGroup
    for i := 0; i < b.N; i++ {
        wg.Add(1)
        go func(k int) {
            defer wg.Done()
            key := fmt.Sprintf("key%d", k)
            m.Store(key, k)
            m.Load(key)
        }(i % 100000) // 使用 100000 個不同的 key
    }
    wg.Wait()
}

這是讀寫次數一致的場景下對 sync.Map 進行的基準測試代碼,其他幾種 map 的基準測試代碼一樣,我就不都貼出來了。

執行示例代碼,可以得到如下輸出:

$ go test -bench="Map$" -benchmem -run="^$"
goos: darwin
goarch: amd64
pkg: github.com/jianghushinian/blog-go-example/sync/concurrent-map
cpu: Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
BenchmarkChannelMap-4             551652              2670 ns/op             478 B/op          6 allocs/op
BenchmarkRWMutexMap-4            1000000              1474 ns/op             196 B/op          4 allocs/op
BenchmarkMutexMap-4              2156851               571.5 ns/op            68 B/op          4 allocs/op
BenchmarkShardingMap-4           2752004               444.5 ns/op            66 B/op          3 allocs/op
BenchmarkSyncMap-4               1765488               618.8 ns/op           115 B/op          7 allocs/op
PASS
ok      github.com/jianghushinian/blog-go-example/sync/concurrent-map   10.271s

根據這輪基準測試結果我們可以發現,在讀寫次數一致情況下,對併發安全的 map 進行 100000 次讀寫操作,使用 channel 實現的 map 性能最差,使用分片機制實現的 map 性能最好。

那麼我們再測一測讀多寫少的場景。示例如下:

https://github.com/jianghushinian/blog-go-example/tree/main/sync/map/concurrent-map/sharding_map_test.go
// 讀多寫少
func BenchmarkSyncMapReadHeavy(b *testing.B) {
    var m sync.Map
    var wg sync.WaitGroup
    for i := 0; i < b.N; i++ {
        for j := 0; j < 100; j++ {
            wg.Add(1)
            go func(k int) {
                defer wg.Done()
                m.Load(string(rune(k)))
            }(j)
        }
        for j := 0; j < 10; j++ {
            wg.Add(1)
            go func(k int) {
                defer wg.Done()
                m.Store(string(rune(k)), k)
            }(j)
        }
    }
    wg.Wait()
}

執行示例代碼,可以得到如下輸出:

$ go test -bench="MapReadHeavy$" -benchmem -run="^$"
goos: darwin
goarch: amd64
pkg: github.com/jianghushinian/blog-go-example/sync/concurrent-map
cpu: Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
BenchmarkChannelMapReadHeavy-4              8490            212698 ns/op           33944 B/op        489 allocs/op
BenchmarkRWMutexMapReadHeavy-4             30123             37994 ns/op            5342 B/op        230 allocs/op
BenchmarkMutexMapReadHeavy-4               33726             42250 ns/op            5352 B/op        230 allocs/op
BenchmarkShardingMapReadHeavy-4            31478             36449 ns/op            5320 B/op        230 allocs/op
BenchmarkSyncMapReadHeavy-4                35464             36445 ns/op            5640 B/op        250 allocs/op
PASS
ok      github.com/jianghushinian/blog-go-example/sync/concurrent-map   10.816s

從這一輪基準測試結果來看,依然是使用 channel 實現的 map 性能最差,而使用分片機制實現的 mapsync.Map 性能不相上下。

所以,這幾個併發安全 map 的實現該怎麼選擇,你心目中有了答案嗎?

我的答案是,不要看我的基準測試結果,很可能換一下數據量重新測試你會得到不一樣的結果。更不要人云亦云,比如不要被一些網上的言論所惑,並不像有些人鼓吹的那樣,Go 併發編程只用 channel,你看使用 channel 實現的併發安全 map 性能反而最差。我想表達的是,你應該根據你預估的數據量來跑基準測試,以你自己的基準測試結果為選擇依據。即使是 channel 也不是萬能的,Go 中每一種併發原語都有它的適用場景,實踐才是檢驗真理的唯一標準。

總結

Go 中內置的 map 不是併發安全的,而 map 是一種很常用的類型,為此 Go 在 sync 包中又內置了 sync.Map 來支持併發安全。

除此以外,我們還可以使用 Go 的併發原語自己來實現併發安全的 map,比如使用互斥鎖、讀寫鎖、分片機制以及 channel。每一種 map 的實現都有其特點和適用場景,我們應該根據自己的數據量對這些實現進行基準測試,以此來選擇合適的 map 實現。

另外,為了降低讀者閲讀代碼時的心智負擔,本文中沒有介紹泛型版本的併發安全 map 實現。本文原理已經講解清楚了,對於泛型版本的實現,就留作作業交給你自己去完成了。

本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。

希望此文能對你有所啓發。

聯繫我

  • 公眾號:Go編程世界
  • 微信:jianghushinian
  • 郵箱:jianghushinian007@outlook.com
  • 博客:https://jianghushinian.cn
  • GitHub:https://github.com/jianghushinian
user avatar u_14540126 头像
点赞 1 用户, 点赞了这篇动态!
点赞

评论

anonymousUser 头像 @anonymousUser

创建 时间 11:09 下午 · 11月 04 ,2025
Hi there, We run an Instagram growth service, which increases your number of followers both safely and practically. - Real, human followers: People follow you because they are interested in your business or niche. - Safe: All actions are made manually. We do not use any bots. - The price is from just $60 per month, and we can start immediately. If you'd like to see some of our previous work, let me know, and we can discuss it further. Kind Regards, Gemma

Add a new 评论

Some HTML is okay.