動態

詳情 返回 返回

Goroutine間的“靈魂管道”:Channel如何實現數據同步與因果傳遞? - 動態 詳情

Channel是連接Goroutine的“管道”,是CSP理念在Golang中的具象化實現。它不僅是數據傳遞的隊列,更是Goroutine間同步的天然工具,讓開發者無需訴諸顯式的鎖或條件變量。

func main() {
	ch := make(chan int, 1) // 創建一個int,緩衝區大小為1的Channel
	ch <- 2                 // 將2發送到ch

	go func() {  // 開啓一個異步Goroutine
		n, ok := <-ch // n接收從ch發出的值,如果沒有接收到數據,將會阻塞等待
		if ok {
			fmt.Println(n) // 2
		}
	}()

	close(ch) // 關閉Channel
}

Channel數據結構
Channel 在運行時使用src/runtime/chan.go 結構體表示。我們在 Go 語言中創建新的 Channel 時,實際上創建的是如下所示的結構。

type hchan struct {
	qcount   uint           // 隊列中所有數據總數
	dataqsiz uint           // 環形隊列的 size
	buf      unsafe.Pointer // 指向 dataqsiz 長度的數組
	elemsize uint16         // 元素大小
	closed   uint32
	elemtype *_type         // 元素類型
	sendx    uint           // 已發送的元素在環形隊列中的位置
	recvx    uint           // 已接收的元素在環形隊列中的位置
	recvq    waitq          // 接收者的等待隊列
	sendq    waitq          // 發送者的等待隊列

	lock mutex
}

image

runtime.hchan 結構體中的五個字段 qcount、dataqsiz、buf、sendx、recv 構建底層的循環隊列。除此之外,elemsize 和 elemtype 分別表示當前 Channel 能夠收發的元素類型和大小。
sendq 和 recvq 存儲了當前 Channel 由於緩衝區空間不足而阻塞的 Goroutine 列表,這些等待隊列使用雙向鏈表 runtime.waitq表示,鏈表中所有的元素都是runtime.sudog 結構。

type waitq struct {
    first *sudog 
    last  *sudog
}

runtime.sudog(Scheduling Unit Descriptor)是用於實現Goroutine調度的一種數據結構。它包含了與Goroutine相關的信息,如Goroutine的狀態、等待的條件、等待的時間等。
當一個Goroutine需要等待某個事件或條件時,它會創建一個runtime.sudog,並將其加入到等待隊列中。當事件或條件滿足時,等
待隊列中的runtime.sudog會被喚醒,從而允許對應的Goroutine繼續執行。
Channel發送數據
1)如果等待接收的隊列recvq中存在Goroutine,那麼直接把正在發送的值發送給等待接收的Goroutine。
image

2)當緩衝區未滿時,找到sendx所指向的緩衝區數組的位置,將正在發送的值拷貝到該位置,並增加sendx索引以及釋放鎖。

image

3)如果是阻塞發送,那麼就將當前的Goroutine打包成一個sudog結構體,並加入到Channel的發送隊列sendq裏。

image

之後則調用goparkunlock將當前Goroutine設置為_Gwaiting狀態並解鎖,進入阻塞狀態等待被喚醒;如果被調度器喚醒,執行清理
工作並最終釋放對應的sudog結構體。

Channel接收數據
1)如果等待發送的隊列sendq裏存在掛起的Goroutine,那麼有兩種情況:當前Channel無緩衝區,或者當前Channel已滿。從sendq中取出最先阻塞的Goroutine,然後調用recv方法,此時需做如下判斷:

  1. 如果無緩衝區,那麼直接從sendq接收數據;
  2. 如果緩衝區已滿,從buf隊列的頭部接收數據,並把數據加到buf隊列的尾部;
  3. 最後調用goready函數將等待發送數據的Goroutine的狀態從_Gwaiting置為_Grunnable,等待下一次調度。
    當緩衝區已滿時的處理過程。

image

2)如果緩衝區buf中還有元素,那麼就走正常的接收,將從buf中取出的元素拷貝到當前協程的接收數據目標內存地址中。值得注意的是,即使此時Channel已經關閉,仍然可以正常地從緩衝區buf中接收數據。
3)如果是阻塞模式,且當前沒有數據可以接收,那麼就需要將當前Goroutine打包成一個sudog加入到Channel的等待接收隊列recvq中,將當前Goroutine的狀態置為_Gwaiting,等待喚醒。

image

Channel與happens-before 關係
Channel happens-before 規則有 4 條。
1)對一個元素的send操作happens-before對應的receive 完成操作。

var c = make(chan int, 10) // buffered或者unbuffered
var a string

func f() {
   // a 的初始化 happens-before 往ch中發送數據
	a = "hello, world"
   c <- 0
}

func main() {
	go f()
    // 往ch發送數據 happens-before 從ch中讀取出數據
	<-c
   // 打印a的值 happens-after 第12行
   // 打印a的結果值“hello world”
	print(a)
}

2)對Channel的close操作happens-before receive 端的收到關閉通知操作。

var c = make(chan int, 10) // buffered或者unbuffered
var a string

func f() {
   // a 的初始化 happens-before close ch
	a = "hello, world"
   close(c)
}

func main() {
	go f()
    // close ch happens-before 從ch中讀取出數據
	<-c
   // 打印a的值 happens-after 第12行
   // 打印a的結果值“hello world”
	print(a)
}

3)對於Unbuffered Channel,對一個元素的receive 操作happens-before對應的send完成操作。

var c = make(chan int) // unbuffered
var a string

func f() {
   // a 的初始化 happens-before 從ch中讀取出數據
	a = "hello, world"
   <-c
}

func main() {
	go f()
    // 從ch中讀取出數據 happens-before 往ch發送數據
	c <- 0   
   // 打印a的值 happens after 第12行
   // 打印a的結果值“hello world”
	print(a)
}

4)如果 Channel 的容量是 c(c>0),那麼,第 n 個 receive 操作 happens-before 第 n+c 個 send 的完成操作。規則3是規則4 c=0時的特例。

Channel使用場景
1)併發控制:通過控制帶緩衝的Channel 的隊列大小來限制併發的數量。

func worker(id int, sem chan struct{}) {
	// 獲取許可
	sem <- struct{}{}
	time.Sleep(time.Second) // 模擬耗時操作
	// 釋放許可
	<-sem
}

func main() {
	// 創建一個緩衝區為2的Channel
	sem := make(chan struct{}, 2)

	for i := 0; i < 5; i++ {
		go worker(i, sem)
	}
}

2)信號通知:使用一個無緩衝的 Channel 來通知一個 Goroutine 任務已經完成。

func main() {
	done := make(chan bool)

	go func() {
		time.Sleep(2 * time.Second) // 模擬耗時操作
		// 發送信號表示工作已完成
		done <- true
	}()

	<-done // 等待信號
}

3)異步操作結果獲取:在一個 Goroutine 中執行異步操作,然後通過 Channel 將結果發送到另一個 Goroutine。

func asyncTask() <-chan int {
	ch := make(chan int)
	go func() {
		// 模擬異步操作
		time.Sleep(2 * time.Second)
		ch <- 1 // 發送結果
		close(ch)
	}()
	return ch
}

func main() {
	ch := asyncTask()
	time.Sleep(1 * time.Second) // 模擬其他操作
	result := <-ch // 獲取異步操作的結果
}

總結:控制與編排,殊途同歸
Java 與 Golang 在併發模型上的差異,深刻地體現了兩種構建程序確定性的不同哲學:
1)Java (共享內存):採用顯式同步的路徑。它為開發者提供了強大的底層控制能力(鎖、內存屏障),但要求開發者必須承擔起預見並管理資源競態的心智負擔。確定性來自於對臨界區和內存可見性的嚴格手工控制。
2)Golang (消息傳遞):採用隱式因果的路徑。它通過 Channel 將數據的所有權在 Goroutine 間傳遞,將併發問題從“共享數據訪問”轉化為“數據流設計”。確定性來自於消息傳遞建立的自然因果順序,從而在結構上規避了競態。
Java的路徑是“先有併發,後加約束”,而Golang的路徑是“通過約束,實現併發”。兩者並非優劣之分,而是針對不同問題域和開發哲學的選擇。Java的完備工具集賦予了處理極端複雜場景的靈活性,而Golang的簡約設計則為構建清晰、可靠、易於推理的併發系統提供了優雅的範式。
最終,無論是顯式的同步約束,還是隱式的因果傳遞,它們都通向併發編程的聖盃——在多核時代,構建出可預測、可維護且高性能的軟件系統。這兩種思想的碰撞與融合,正持續推動着現代併發編程的演進。

很高興與你相遇!如果你喜歡本文內容,記得關注哦

Add a new 評論

Some HTML is okay.