“不要通過共享內存來通信,而要通過通信來共享內存”。這句話精準概括了 Go 併發模型的核心哲學——而承載這一哲學的核心原語,正是 channel(通道)。
要深入理解 channel,我們需要從 runtime 包的源碼層面分析其核心結構、關鍵操作(創建、發送/接收、關閉)的實現邏輯,以及底層如何通過同步機制(鎖、等待隊列)實現協程(Goroutine)間的安全通信。
以下源碼基於 go1.24.5/runtime/chan.go 。
一、核心結構體:hchan
源代碼如下:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
synctest bool // true if created in a synctest bubble
closed uint32
timer *timer // timer feeding this chan
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
關鍵字段解析
buf:緩衝區是channel實現異步通信的核心。它是基於數組的環形隊列(循環隊列),用於存儲待傳遞的元素。僅當channel有緩衝 (make(chan T, n)中n>0)時,buf才非空。sendx和recvx:分別是發送和接收操作在緩衝區中的索引(環形隊列的寫指針和讀指針)。當發送元素時,數據寫入sendx位置,sendx遞增並取模dataqsiz;接收時類似。recvq和sendq:等待隊列,分別存儲因緩衝區滿而無法發送的goroutine(sendq)和因緩衝區空而無法接收的goroutine(recvq)。每個等待的goroutine被封裝為sudog結構體(包含goroutine指針、元素指針等信息)。lock:互斥鎖,保證對hchan所有字段的併發訪問是安全的(如修改sendx、操作等待隊列等)。closed:標記通道是否已關閉,防止重複關閉或在關閉後繼續操作。
二、創建:makechan
channel 通過 make(chan T, n) 創建,底層調用運行時函數 runtime.makechan 。其核心邏輯是根據元素類型和緩衝區大小分配內存,並初始化 hchan 結構體。
關鍵步驟
- 參數校驗(元素大小與對齊)
- 元素大小限制:
elem.Size_ >= 1<<16時拋出異常。
Go 通道設計時限制了元素類型的最大大小(65536 字節),避免因過大元素導致的性能問題(如緩存不友好)或內存浪費。 - 內存對齊檢查:
hchanSize%maxAlign != 0 || elem.Align_ > maxAlign時拋出異常。內存對齊是硬件要求(如 CPU 訪問對齊的內存更快),maxAlign是運行時定義的最大對齊值(通常為 8 或 16 字節)。若通道結構體或元素類型未對齊,可能導致程序崩潰或性能下降。
- 內存分配策略(三種情況)
makechan根據元素類型是否包含指針、緩衝區大小,選擇不同的內存分配方式,核心目的是優化內存使用效率和便於垃圾回收:
-
情況 1(
mem == 0):無緩衝通道或元素大小為 0(如空結構體struct{})。- 僅分配
hchan結構體內存(mallocgc(hchanSize, nil, true))。 c.buf = c.raceaddr():將緩衝區指針指向hchan結構體中的競態檢測字段地址(用於跨協程同步檢測)。
- 僅分配
-
情況 2(元素無指針):元素類型為值類型(如
int、struct{})。- 分配連續內存(
hchanSize + mem):hchan結構體與緩衝區內存連續。 - 優勢:減少內存碎片,提高 CPU 緩存命中率(連續內存訪問更快)。
- 分配連續內存(
-
情況 3(元素有指針):元素類型含指針(如
[]int、map[string]int)。- 單獨分配
hchan結構體和緩衝區:c = new(hchan)分配結構體,c.buf = mallocgc(mem, elem, true)分配緩衝區。 - 原因:包含指針的內存需被 GC 管理,單獨分配可使 GC 更高效地掃描和回收指針指向的內存(避免因內存連續導致的大範圍掃描)。
- 單獨分配
- 初始化通道字段
elemsize:記錄元素的大小(字節),用於後續數據拷貝(如chansend/chanrecv中的內存複製)。elemtype:保存元素類型的元信息(如類型的指針、大小、對齊等),用於類型檢查(如防止向chan int發送string)。dataqsiz:緩衝區容量(用户傳入的size),決定通道能緩衝多少元素。
4. 鎖初始化(lockInit)
- 為通道的互斥鎖(
c.lock)初始化,用於保護對通道的併發訪問(如發送、接收、關閉操作)。 lockRankHchan是鎖的優先級(用於鎖競爭時的排序),確保關鍵操作的原子性。
源代碼及註解:
// makechan 根據類型 t 和緩衝區大小 size 創建一個新的通道(hchan 結構體)。
// 參數:
// - t: 通道元素類型的元信息(chantype 指針)
// - size: 緩衝區的容量(若為 0,則創建無緩衝通道)
// 返回值:指向新創建的 hchan 結構體的指針
func makechan(t *chantype, size int) *hchan {
elem := t.Elem // 獲取通道元素的類型元信息(如 int、struct 等)
// 編譯器已檢查,但為安全起見:若元素大小超過 1<<16(65536 字節),拋出異常
// 原因:Go 通道設計限制元素類型大小,避免過大的內存分配和性能問題
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 檢查通道結構體對齊是否符合要求:
// 1. hchan 結構體本身的大小需滿足最大對齊(maxAlign)
// 2. 元素類型的對齊(elem.Align_)不能超過最大對齊
// 原因:內存對齊是硬件要求,確保數據訪問的效率和正確性
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}
// 計算緩衝區所需內存大小:元素大小 × 緩衝區容量
// 檢查是否溢出或超過最大允許分配內存(maxAlloc - hchanSize)
// 原因:防止內存分配過大導致程序崩潰或性能下降
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 聲明 hchan 指針 c,用於後續初始化
var c *hchan
// 根據元素類型是否包含指針、緩衝區大小,分三種情況分配內存:
switch {
case mem == 0:
// 情況 1:緩衝區大小為 0(無緩衝通道)或元素大小為 0(罕見,如空結構體)
// 分配僅包含 hchan 結構體的內存(不包含獨立緩衝區)
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 競態檢測需要:將 hchan 結構體的地址作為競態地址(用於跨協程同步檢測)
// raceaddr() 返回 hchan 結構體中用於競態檢測的字段地址
c.buf = c.raceaddr()
case !elem.Pointers():
// 情況 2:元素類型不包含指針(如 int、bool 等值類型)
// 優勢:hchan 結構體和緩衝區內存連續,減少內存碎片,提高訪問效率
// 分配內存大小 = hchan 結構體大小 + 緩衝區大小(mem)
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 緩衝區地址 = hchan 結構體地址 + hchan 結構體大小(偏移量)
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 情況 3:元素類型包含指針(如切片、map、自定義結構體含指針字段)
// 限制:緩衝區需單獨分配(不能與 hchan 結構體連續)
// 原因:包含指針的內存需被垃圾回收器(GC)管理,單獨分配便於 GC 掃描和回收
c = new(hchan) // 初始化 hchan 結構體(內存由 runtime 分配,可能不連續)
// 單獨為緩衝區分配內存(類型為 elem,大小為 mem)
c.buf = mallocgc(mem, elem, true)
}
// 初始化 hchan 結構體的核心字段:
c.elemsize = uint16(elem.Size_) // 元素大小(字節)
c.elemtype = elem // 元素類型元信息(用於類型檢查)
c.dataqsiz = uint(size) // 緩衝區容量(用户傳入的 size)
// 同步測試標記:若當前 Goroutine 屬於同步測試組,標記通道為同步測試通道
// 用途:限制同步測試通道的外部訪問(避免測試污染)
if getg().syncGroup != nil {
c.synctest = true
}
// 初始化通道的互斥鎖(用於保護併發訪問)
// lockRankHchan 是通道鎖的優先級(用於鎖競爭時的排序)
lockInit(&c.lock, lockRankHchan)
// 調試模式:打印通道創建信息(僅調試用)
if debugChan {
print("makechan: chan=", c,
"; elemsize=", elem.Size_,
"; dataqsiz=", size, "\n")
}
return c // 返回新創建的通道指針
}
三、發送操作:chansend
發送操作 ch <- x 底層調用 runtime.chansend,其核心邏輯是:嘗試將元素放入緩衝區,若緩衝區滿則阻塞當前 goroutine。
關鍵步驟:
- 鎖保護:所有對通道狀態的修改(如緩衝區、等待隊列)均在互斥鎖保護下完成,確保併發安全。
- 接收隊列處理:若有等待的接收者(因緩衝區空被阻塞),發送者直接將數據傳遞給接收者(無需經過緩衝區),減少一次數據拷貝。
- 緩衝區寫入:緩衝區未滿時(
c.qcount < c.dataqsiz),數據直接存入環形緩衝區(buf[sendx]),更新發送索引(sendx = (sendx + 1) % dataqsiz)和已存儲元素數量(qcount++)。 - 阻塞機制:緩衝區已滿(
qcount == dataqsiz)且無接收者時,則將當前發送者goroutine封裝為sudog,加入sendq隊列,調用gopark阻塞當前goroutine,直到被接收者喚醒(接收者騰出空間)或通道關閉。
源代碼及註解
// chansend 向通道 c 發送數據,數據來源為 ep(若 ep 非空)。
// 參數説明:
// - c:目標通道指針
// - ep:發送數據的內存地址(若為 nil 則忽略數據)
// - block:是否阻塞等待(true=阻塞,false=非阻塞)
// - callerpc:調用者的程序計數器(用於競態檢測)
// 返回值:
// - bool:是否成功發送(true=成功,false=失敗)
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 處理通道為 nil 的異常情況
if c == nil {
if !block {
// 非阻塞模式下,直接返回發送失敗
return false
}
// 阻塞模式下,永久掛起(錯誤場景,因通道不能為 nil)
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable") // 理論上不可達,防止死鎖
}
// 調試模式:打印通道信息(僅調試用)
if debugChan {
print("chansend: chan=", c, "\n")
}
// 競態檢測:記錄發送操作的調用棧(用於跨協程數據競爭檢測)
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
// 同步測試檢查:禁止從同步測試通道外部發送數據
if c.synctest && getg().syncGroup == nil {
panic(plainError("send on synctest channel from outside bubble"))
}
// 快速路徑:非阻塞模式下,無需加鎖直接檢查通道是否已關閉或緩衝區已滿
//
// 邏輯説明:
// - 若通道未關閉(c.closed == 0)且緩衝區已滿(full(c)),則非阻塞模式下直接返回失敗。
// - 此檢查避免了不必要的鎖競爭,但需注意:若在檢查後通道狀態變化(如被其他協程關閉),可能導致誤判。
// - 依賴後續鎖內的重試邏輯保證正確性(見後續註釋)。
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64 // 用於性能分析的時間戳(若啓用了阻塞分析)
if blockprofilerate > 0 {
t0 = cputicks() // 記錄當前 CPU 時間(納秒級)
}
// 獲取通道的互斥鎖(關鍵:所有對 hchan 的修改需加鎖保護)
lock(&c.lock)
// 情況 1:通道已關閉(加鎖後再次檢查,避免競態條件)
if c.closed != 0 {
unlock(&c.lock) // 提前解鎖(後續無共享操作)
panic(plainError("send on closed channel")) // 向已關閉通道發送數據,觸發 panic
}
// 情況 2:接收等待隊列(recvq)中有等待的接收者(因緩衝區空被阻塞)
if sg := c.recvq.dequeue(); sg != nil {
/*
* 找到等待的接收者(Goroutine),直接將數據傳遞給接收者,無需經過緩衝區。
* 這是“同步發送”的核心邏輯:接收者已阻塞等待,發送者無需等待緩衝區空間。
*/
send(c, sg, ep, func() { unlock(&c.lock) }, 3) // 調用 send 處理髮送邏輯
return true // 發送成功,返回 true
}
// 情況 3:緩衝區中有剩餘空間(未填滿)
if c.qcount < c.dataqsiz {
// 獲取緩衝區中發送位置的指針(環形隊列的寫指針 sendx)
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil) // 競態檢測:標記發送位置已訪問
}
// 將數據從發送者內存(ep)複製到緩衝區(qp)
typedmemmove(c.elemtype, qp, ep) // 類型安全的內存複製(根據元素類型)
// 更新發送索引(環形隊列循環遞增)
c.sendx++
if c.sendx == c.dataqsiz { // 若到達緩衝區末尾,回到起點
c.sendx = 0
}
// 增加緩衝區中已存儲的元素數量
c.qcount++
unlock(&c.lock) // 解鎖(操作完成)
return true // 發送成功,返回 true
}
// 情況 4:非阻塞模式且緩衝區已滿(無接收者等待)
if !block {
unlock(&c.lock) // 提前解鎖
return false // 未阻塞但無法發送(緩衝區滿且無接收者),返回 false
}
// 情況 5:阻塞模式且無接收者/緩衝區已滿(需掛起當前 Goroutine)
// 獲取當前 Goroutine 的 G 結構體指針
gp := getg()
// 分配/獲取一個 sudog(用於記錄等待狀態的 Goroutine 上下文)
mysg := acquireSudog()
mysg.releasetime = 0 // 初始化釋放時間(用於性能分析)
if t0 != 0 {
mysg.releasetime = -1 // 標記為非定時阻塞(若 t0=0 表示未啓用性能分析)
}
// 記錄發送目標內存地址(ep)到 sudog(用於後續喚醒時傳遞數據)
mysg.elem = ep
mysg.waitlink = nil // 等待鏈表指針(初始為 nil)
// 將 sudog 關聯到當前 Goroutine 的等待列表
gp.waiting = mysg
// 完善 sudog 的上下文信息
mysg.g = gp // 當前 Goroutine
mysg.isSelect = false // 非 select 語句觸發的發送(select 會單獨處理)
mysg.c = c // 關聯的通道
gp.param = nil // 參數(未使用)
// 將 sudog 加入通道的發送等待隊列(sendq)
c.sendq.enqueue(mysg)
// 若通道關聯定時器,阻塞定時器(防止超時後未喚醒)
if c.timer != nil {
blockTimerChan(c)
}
// 標記當前 Goroutine 正在等待通道(用於棧收縮檢測)
gp.parkingOnChan.Store(true)
// 設置阻塞原因(普通通道發送或同步測試通道發送)
reason := waitReasonChanSend
if c.synctest {
reason = waitReasonSynctestChanSend
}
// 掛起當前 Goroutine(釋放 CPU,進入等待狀態)
// 參數:喚醒時的回調函數、鎖指針、阻塞原因、跟蹤信息
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)
// 被喚醒後的處理邏輯(可能被接收者喚醒或超時喚醒)
// 檢查等待列表是否被篡改(防止併發錯誤)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 清除當前 Goroutine 的等待狀態
gp.waiting = nil
gp.activeStackChans = false // 標記不再活躍在通道上
// 記錄阻塞時間(用於性能分析)
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 獲取發送結果(是否成功發送,由接收者是否完成操作決定)
closed := !mysg.success
gp.param = nil // 清理參數
// 解除 sudog 與通道的關聯
mysg.c = nil
releaseSudog(mysg) // 釋放 sudog 資源
// 處理喚醒後的錯誤情況
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup") // 無理由喚醒,拋出異常
}
panic(plainError("send on closed channel")) // 通道已關閉,觸發 panic
}
return true // 發送成功,返回 true
}
四、接收操作:chanrecv
接收操作 <-ch 或 x, ok := <-ch 底層調用 runtime.chanrecv,其核心邏輯是:嘗試從緩衝區讀取元素,若緩衝區空則阻塞當前 Goroutine。
關鍵步驟:
- 鎖保護:所有對通道狀態的修改(如緩衝區、等待隊列)均在互斥鎖保護下完成,確保併發安全。
- 關閉處理:若通道已關閉且緩衝區無數據,接收操作返回零值和
false;若緩衝區有數據,仍能正常接收。 - 無緩衝區:
sendq中若有等待的發送者,直接交換數據(發送者和接收者同步阻塞/喚醒) - 緩衝區讀取:優先從環形緩衝區讀取已有數據,保證接收順序與發送順序一致(邏輯 FIFO)。
- 發送隊列處理:若有等待的發送者(因緩衝區滿被阻塞),接收者騰出空間後會喚醒寫入者,從而將數據寫入緩衝區尾部(環形隊列特性)。。
- 阻塞機制:緩衝區為空且無發送者時,則將當前接收者
goroutine封裝為sudog,加入recvq隊列,調用gopark阻塞當前goroutine(釋放鎖並掛起)。
源代碼及註解:
// chanrecv 從通道 c 接收數據,並將數據寫入 ep(若 ep 非空)。
// 參數説明:
// - c:目標通道指針
// - ep:接收數據的目標內存地址(若為 nil 則忽略數據)
// - block:是否阻塞等待(true=阻塞,false=非阻塞)
// 返回值:
// - selected:是否選中通道(總是 true,除非通道為 nil 或已關閉)
// - received:是否成功接收到數據(true=接收到,false=未接收到)
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 調試模式:打印通道信息(僅調試用)
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// 處理通道為 nil 的異常情況
if c == nil {
if !block {
// 非阻塞模式下,直接返回(無數據可接收)
return
}
// 阻塞模式下,永久掛起(錯誤場景,因通道不能為 nil)
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable") // 理論上不可達,防止死鎖
}
// 同步測試檢查:禁止從同步測試通道外部接收數據
if c.synctest && getg().syncGroup == nil {
panic(plainError("receive on synctest channel from outside bubble"))
}
// 若通道關聯定時器,嘗試觸發定時器邏輯(如超時接收)
if c.timer != nil {
c.timer.maybeRunChan()
}
// 快速路徑:非阻塞模式下,無需加鎖直接檢查通道是否為空
if !block && empty(c) {
/*
* 關鍵:避免競態條件(關閉與空檢查的順序)
* 使用原子操作確保:先檢查關閉狀態,再檢查是否為空
*/
if atomic.Load(&c.closed) == 0 {
// 通道未關閉且為空,非阻塞模式下直接返回未接收到
return
}
// 通道已關閉,再次檢查是否為空(防止關閉前有數據但被其他協程取走)
if empty(c) {
// 通道已關閉且為空,清空接收內存(避免髒讀)
if raceenabled {
raceacquire(c.raceaddr()) // 競態檢測:標記接收地址已訪問
}
if ep != nil {
typedmemclr(c.elemtype, ep) // 將 ep 指向的內存置為零值(類型由 c.elemtype 決定)
}
return true, false // 已關閉且無數據,返回 (true, false)
}
}
var t0 int64 // 用於性能分析的時間戳(若啓用了阻塞分析)
if blockprofilerate > 0 {
t0 = cputicks() // 記錄當前 CPU 時間(納秒級)
}
// 獲取通道的互斥鎖(關鍵:所有對 hchan 的修改需加鎖保護)
lock(&c.lock)
// 情況 1:通道已關閉
if c.closed != 0 {
if c.qcount == 0 {
// 通道已關閉且緩衝區無數據:清空接收內存並返回
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock) // 提前解鎖(後續無共享操作)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false // 已關閉且無數據,返回 (true, false)
}
// 通道已關閉但緩衝區有數據:繼續處理(允許接收剩餘數據)
} else {
// 情況 2:通道未關閉,檢查發送等待隊列(sendq)是否有等待的發送者
if sg := c.sendq.dequeue(); sg != nil {
/*
* 找到等待的發送者(因緩衝區滿被阻塞的 Goroutine)
* 若緩衝區大小為 0(無緩衝通道),直接從發送者獲取數據;
* 否則,從緩衝區頭部接收數據,並將發送者的數據添加到緩衝區尾部(環形隊列特性)。
*/
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 調用 recv 處理接收邏輯
return true, true // 接收成功,返回 (true, true)
}
}
// 情況 3:緩衝區中有數據(無論通道是否關閉)
if c.qcount > 0 {
// 獲取緩衝區中接收位置的指針(環形隊列的讀指針 recvx)
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil) // 競態檢測:標記接收位置已訪問
}
// 將數據從緩衝區複製到接收者的內存(ep)
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // 類型安全的內存複製(根據元素類型)
}
// 清空緩衝區中已讀取的位置(避免髒數據殘留)
typedmemclr(c.elemtype, qp)
// 更新接收索引(環形隊列循環遞增)
c.recvx++
if c.recvx == c.dataqsiz { // 若到達緩衝區末尾,回到起點
c.recvx = 0
}
// 減少緩衝區中已存儲的元素數量
c.qcount--
// 解鎖並返回成功(已從緩衝區讀取數據)
unlock(&c.lock)
return true, true
}
// 情況 4:非阻塞模式且緩衝區無數據(無發送者等待)
if !block {
unlock(&c.lock) // 提前解鎖
return false, false // 未阻塞但無數據,返回 (false, false)
}
// 情況 5:阻塞模式且無發送者/數據(需掛起當前 Goroutine)
// 獲取當前 Goroutine 的 G 結構體指針
gp := getg()
// 分配/獲取一個 sudog(用於記錄等待狀態的 Goroutine 上下文)
mysg := acquireSudog()
mysg.releasetime = 0 // 初始化釋放時間(用於性能分析)
if t0 != 0 {
mysg.releasetime = -1 // 標記為非定時阻塞(若 t0=0 表示未啓用性能分析)
}
// 記錄接收目標內存地址(ep)到 sudog
mysg.elem = ep
mysg.waitlink = nil // 等待鏈表指針(初始為 nil)
// 將 sudog 關聯到當前 Goroutine 的等待列表
gp.waiting = mysg
// 完善 sudog 的上下文信息
mysg.g = gp // 當前 Goroutine
mysg.isSelect = false // 非 select 語句觸發的接收(select 會單獨處理)
mysg.c = c // 關聯的通道
gp.param = nil // 參數(未使用)
// 將 sudog 加入通道的接收等待隊列(recvq)
c.recvq.enqueue(mysg)
// 若通道關聯定時器,阻塞定時器(防止超時後未喚醒)
if c.timer != nil {
blockTimerChan(c)
}
// 標記當前 Goroutine 正在等待通道(用於棧收縮檢測)
gp.parkingOnChan.Store(true)
// 設置阻塞原因(普通通道接收或同步測試通道接收)
reason := waitReasonChanReceive
if c.synctest {
reason = waitReasonSynctestChanReceive
}
// 掛起當前 Goroutine(釋放 CPU,進入等待狀態)
// 參數:喚醒時的回調函數、鎖指針、阻塞原因、跟蹤信息
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)
// 被喚醒後的處理邏輯(可能被髮送者喚醒或超時喚醒)
// 檢查等待列表是否被篡改(防止併發錯誤)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 若通道有關聯定時器,解除定時器阻塞
if c.timer != nil {
unblockTimerChan(c)
}
// 清除當前 Goroutine 的等待狀態
gp.waiting = nil
gp.activeStackChans = false // 標記不再活躍在通道上
// 記錄阻塞時間(用於性能分析)
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 獲取接收結果(是否成功接收到數據)
success := mysg.success
// 清理上下文
gp.param = nil
mysg.c = nil // 解除 sudog 與通道的關聯
releaseSudog(mysg) // 釋放 sudog 資源
// 返回最終結果(總是 true,received 由 success 決定)
return true, success
}
五、關閉操作:closechan
關閉操作 close(ch) 底層調用 runtime.closechan,其核心邏輯是:標記通道為已關閉,並喚醒所有等待的 Goroutine。
關鍵步驟:
- 鎖保護:所有對通道狀態的修改(如緩衝區、等待隊列)均在互斥鎖保護下完成,確保併發安全。
- 標記關閉:關閉前檢查通道是否已關閉(
c.closed != 0),如果重複關閉則觸發 panic。如未關閉,則設置c.closed = 1。 -
喚醒所有等待的
Goroutine:- 遍歷
recvq隊列,喚醒所有等待接收的Goroutine(它們會收到零值和 false)。 - 遍歷
sendq隊列,喚醒所有等待發送的Goroutine(它們會拋出 panic)。
- 遍歷
源代碼及註解:
// closechan 關閉通道 c,並喚醒所有等待的發送者和接收者。
// 注意:關閉已關閉的通道或 nil 通道會觸發 panic。
func closechan(c *hchan) {
// 處理通道為 nil 的異常情況
if c == nil {
panic(plainError("close of nil channel")) // 關閉 nil 通道,觸發 panic
}
// 獲取通道的互斥鎖(關鍵:所有對 hchan 的修改需加鎖保護)
lock(&c.lock)
// 情況 1:通道已被關閉(避免重複關閉)
if c.closed != 0 {
unlock(&c.lock) // 提前解鎖(後續無共享操作)
panic(plainError("close of closed channel")) // 重複關閉已關閉的通道,觸發 panic
}
// 競態檢測:記錄關閉操作的調用棧
if raceenabled {
callerpc := sys.GetCallerPC() // 獲取當前調用者的程序計數器(用於競態檢測)
// 標記通道的競態地址(c.raceaddr())的寫操作,記錄調用棧
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
// 釋放競態檢測的鎖(確保後續操作不被競態檢測干擾)
racerelease(c.raceaddr())
}
// 標記通道為已關閉(原子操作,確保多協程可見性)
c.closed = 1 // 0=未關閉,1=已關閉
// 初始化一個 Goroutine 列表(gList),用於暫存等待的發送者和接收者
var glist gList
// 釋放所有等待接收的 Goroutine(處理 recvq 隊列)
for {
sg := c.recvq.dequeue() // 從接收等待隊列中取出隊首元素(FIFO)
if sg == nil { // 隊列為空時退出循環
break
}
// 清理接收者的數據指針(避免懸垂指針)
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem) // 將接收者內存置為零值(類型安全)
sg.elem = nil // 解除接收者對數據的引用
}
// 記錄喚醒時間,用於性能分析
if sg.releasetime != 0 {
sg.releasetime = cputicks() // 更新為當前 CPU 時間(納秒級)
}
// 獲取接收者關聯的 Goroutine(G 結構體)
gp := sg.g
// 將通道指針(sg)存入 Goroutine 的 param 字段(用於喚醒時傳遞上下文)
gp.param = unsafe.Pointer(sg)
// 標記接收者未成功接收
sg.success = false
// 競態檢測:獲取 Goroutine 的鎖(確保併發安全)
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 將 Goroutine 加入暫存列表(gList),避免在持有通道鎖時喚醒
glist.push(gp)
}
// 釋放所有等待發送的 Goroutine(處理 sendq 隊列)
for {
sg := c.sendq.dequeue() // 從發送等待隊列中取出隊首元素(FIFO)
if sg == nil { // 隊列為空時退出循環
break
}
// 清理髮送者的數據指針(避免懸垂指針)
sg.elem = nil // 發送者的數據已無需傳遞(通道關閉)
// 記錄喚醒時間,用於性能分析
if sg.releasetime != 0 {
sg.releasetime = cputicks() // 更新為當前 CPU 時間(納秒級)
}
// 獲取發送者關聯的 Goroutine(G 結構體)
gp := sg.g
// 將通道指針(sg)存入 Goroutine 的 param 字段(用於喚醒時傳遞上下文)
gp.param = unsafe.Pointer(sg)
// 標記發送者未成功發送
sg.success = false
// 競態檢測:獲取 Goroutine 的鎖(確保併發安全)
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 將 Goroutine 加入暫存列表(gList),避免在持有通道鎖時喚醒
glist.push(gp)
}
// 解鎖通道(此時已處理完所有等待隊列的邏輯,可安全釋放鎖)
unlock(&c.lock)
// 喚醒所有暫存的 Goroutine(此時已釋放通道鎖,避免死鎖)
for !glist.empty() {
gp := glist.pop() // 從暫存列表中取出 Goroutine
gp.schedlink = 0 // 清除調度鏈接(避免殘留調度信息)
goready(gp, 3) // 將 Goroutine 標記為可運行狀態(3 表示喚醒原因與通道相關)
}
}
六、底層同步機制
channel 的併發安全性依賴於以下機制:
- 互斥鎖(
lock):所有對hchan關鍵字段(如sendx、recvx、buf、closed)的修改都必須在lock保護下進行,避免競態條件。 - 等待隊列(
recvq和sendq):通過隊列管理阻塞的Goroutine,確保喚醒順序(FIFO),避免飢餓。 gopark和goready:Go運行時的協程調度原語,用於掛起(gopark)和恢復(goready)阻塞的Goroutine。
七、性能優化與設計哲學
channel 的設計充分考慮了性能和易用性的平衡:
- 無緩衝
channel:適用於強同步場景(如協程間即時協作),避免不必要的內存拷貝(數據直接交換)。 - 有緩衝
channel:適用於流量削峯填谷(如生產者-消費者模型),減少協程阻塞次數(緩衝區滿才會阻塞發送者)。 - 內存佈局:緩衝區使用環形隊列(循環數組),內存連續,緩存友好(
CPU緩存命中率高)。 - 類型安全:通過
elemtype和elemsize元信息,在運行時檢查發送/接收的數據類型是否匹配,避免類型錯誤。
總結
從源碼角度看,channel 是 Go 運行時通過 hchan 結構體實現的併發安全通信原語,核心依賴:
- 環形緩衝區(
buf)實現FIFO數據傳遞。 - 等待隊列(
recvq、sendq)管理阻塞的 Goroutine。 - 互斥鎖(
lock)保證操作的原子性。 gopark、goready調度協程的阻塞與喚醒。
理解這些底層機制有助於寫出更高效的 Go 代碼(如合理選擇有緩衝/無緩衝 channel、避免通道濫用導致的性能問題),並更好地調試併發相關的 bug(如死鎖、數據競爭)。