Channel
CSP(Communicating Sequential Process):CSP 允許使用進程組件來描述系統,它們獨立運行,並且只通過消息傳遞的方式通信
Channel 類型是 Go 語言內置的類型,可以直接使用
Don’t communicate by sharing memory, share memory by communicating. --Go Proverbs by Rob Pike
Channel用法:
- 數據交流:當作併發的 buffer 或者 queue,解決生產者 - 消費者問題。多個 goroutine 可以併發當作生產者(Producer)和消費者(Consumer)。
- 數據傳遞:一個 goroutine 將數據交給另一個 goroutine,相當於把數據的擁有權 (引用) 託付出去。
- 信號通知:一個 goroutine 可以將信號 (closing、closed、data ready 等) 傳遞給另一個或者另一組 goroutine 。
- 任務編排:可以讓一組 goroutine 按照一定的順序併發或者串行的執行,這就是編排的功能。
- 鎖:利用 Channel 也可以實現互斥鎖的機制。
chan string // 可以發送接收string
chan<- struct{} // 只能發送struct{}
<-chan int // 只能從chan接收int
ch <- 2000 // 發送數據
x := <-ch // 把接收的一條數據賦值給變量x foo(<-ch) // 把接收的一個的數據作為參數傳給函數 <-ch // 丟棄接收的一條數據
close:關閉 chan 關閉掉
cap:返回 chan 的容量
chan 還可以應用於 for-range 語句中:
for v := range ch {
fmt.Println(v)
}
使用 Channel 容易犯的錯誤
panic
- close 為 nil 的 chan;
- send 已經 close 的 chan;
- close 已經 close 的 chan。
goroutine 泄漏
unbuffered chan(初始化時不指定容量)Writer和reader,這兩個事件必須同時發生。如果一個先發生,它所在的 goroutine 就會被 Go 的調度器阻塞
併發原語和Channel的選擇方法
- 共享資源的併發訪問使用傳統併發原語;
- 複雜的任務編排和消息傳遞使用 Channel;
- 消息通知機制使用 Channel,除非只想 signal 一個 goroutine,才使用 Cond;
- 簡單等待所有任務的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
- 需要和 Select 語句結合,使用 Channel;
- 需要和超時配合時,使用 Channel 和 Context。
使用反射操作 Channel
通過 reflect.Select 函數,可以將一組運行時的 case clause 傳入,當作參數執行,無需寫100個Case,可以動態創建case
func main() {
var ch1 = make(chan int, 10)
var ch2 = make(chan int, 10)
// 創建SelectCase
var cases = createCases(ch1, ch2)
// 執行10次select
for i := 0; i < 10; i++ {
chosen, recv, ok := reflect.Select(cases)
if recv.IsValid() { // recv case
fmt.Println("recv:", cases[chosen].Dir, recv, ok)
} else { // send case
fmt.Println("send:", cases[chosen].Dir, ok)
}
}
}
func createCases(chs ...chan int) []reflect.SelectCase {
var cases []reflect.SelectCase
// 創建recv case
for _, ch := range chs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
// 創建send case
for i, ch := range chs {
v := reflect.ValueOf(i)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(ch),
Send: v,
})
}
return cases
}
典型的應用場景
消息交流
chan的內部實現是循環隊列,所以有時會被當成線程安全的隊列和 buffer 使用
例子:
- 通過chan Job實現的worker 池
- etcd 中的 node節點
數據傳遞
令牌(token)傳遞
type Token struct{}
func newWorker(id int, ch chan Token, nextCh chan Token) {
for {
token := <-ch // 取得令牌
fmt.Println((id + 1)) // id從1開始
time.Sleep(time.Second)
nextCh <- token
}
}
func main() {
chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)}
// 創建4個worker
for i := 0; i < 4; i++ {
go newWorker(i, chs[i], chs[(i+1)%4])
}
//首先把令牌交給第一個worker
chs[0] <- struct{}{}
select {}
}
信號通知
使用 chan 實現程序的 graceful shutdown,在退出之前執行一些連接關閉、文件 close、緩存落盤等一些動作
func main() {
var closing = make(chan struct{})
var closed = make(chan struct{})
go func() {
// 模擬業務處理
for {
select {
case <-closing:
return
default:
// ....... 業務計算
time.Sleep(100 * time.Millisecond)
}
}
}()
// 處理CTRL+C等中斷信號
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
close(closing)
// 執行退出之前的清理動作
go doCleanup(closed)
select {
case <-closed:
case <-time.After(time.Second):
fmt.Println("清理超時,不等了")
}
fmt.Println("優雅退出")
}
func doCleanup(closed chan struct{}) {
time.Sleep((time.Minute))
close(closed)
}
鎖
互斥鎖,利用 select+chan 的方式,很容易實現 TryLock、Timeout 的功能
任務編排
Or-Done 模式、扇入模式、扇出模式、Stream 和 map-reduce
Or-Done 模式
如果有多個任務,只要有任意一個任務執行完,我們就想獲得這個信號
扇入模式
多個源 Channel 輸入、一個目的 Channel 輸出
扇出模式
扇出模式只有一個輸入源 Channel,有多個目標 Channel
Stream
流式管道,提供跳過幾個元素,或者是隻取其中的幾個元素等方法
map-reduce
第一步是映射(map),處理隊列中的數據,第二步是規約(reduce),把列表中的每一個元素按照一定的處理方式處理成結果,放入到結果隊列中
內存模型
多線程同時訪問同一個變量的可見性和順序
happens-before
👍在一個 goroutine 內部,程序的執行順序和它們的代碼指定的順序是一樣的,即使編譯器或者 CPU 重排了讀寫順序,從行為上來看,也和代碼指定的順序一樣
😡但Go 只保證 goroutine 內部重排對讀寫的順序沒有影響,如果要保證多個 goroutine 之間對一個共享變量的讀寫順序,在 Go 語言中,可以使用併發原語為讀寫操作建立 happens-before 關係
- 在 Go 語言中,對變量進行零值的初始化就是一個寫操作。
- 如果對超過機器 word(64bit、32bit 或者其它)大小的值進行讀寫,那麼,就可以看作是對拆成 word 大小的幾個讀寫無序進行。
- Go 並不提供直接的 CPU 屏障(CPU fence)來提示編譯器或者 CPU 保證順序性,而是使用不同架構的內存屏障指令來實現統一的併發原語。
Go 語言中保證的 happens-before 關係
init 函數
main 函數一定在導入的包的 init 函數之後執行
goroutine
啓動 goroutine 的 go 語句的執行,一定 happens before 此 goroutine 內的代碼執行
Channel
- 第 1 條規則是,往 Channel 中的發送操作,happens before 從該 Channel 接收相應數據的動作完成之前,即第 n 個 send 一定 happens before 第 n 個 receive 的完成。
- 第 2 條規則是,close 一個 Channel 的調用,肯定 happens before 從關閉的 Channel 中讀取出一個零值。
- 第 3 條規則是,對於 unbuffered 的 Channel,也就是容量是 0 的 Channel,從此 Channel 中讀取數據的調用一定 happens before 往此 Channel 發送數據的調用完成。
- 第 4 條規則是,如果 Channel 的容量是 m(m>0),那麼,第 n 個 receive 一定 happens before 第 n+m 個 send 的完成。
Mutex/RWMutex
- 第 n 次的 m.Unlock 一定 happens before 第 n+1 m.Lock 方法的返回;
- 對於讀寫鎖 RWMutex m,如果它的第 n 個 m.Lock 方法的調用已返回,那麼它的第 n 個 m.Unlock 的方法調用一定 happens before 任何一個 m.RLock 方法調用的返回,只要這些 m.RLock 方法調用 happens after 第 n 次 m.Lock 的調用的返回。這就可以保證,只有釋放了持有的寫鎖,那些等待的讀請求才能請求到讀鎖。
- 對於讀寫鎖 RWMutex m,如果它的第 n 個 m.RLock 方法的調用已返回,那麼它的第 k (k<=n)個成功的 m.RUnlock 方法的返回一定 happens before 任意的 m.RUnlockLock 方法調用,只要這些 m.Lock 方法調用 happens after 第 n 次 m.RLock。
WaitGroup
Wait 方法等到計數值歸零之後才返回
Once
對於 once.Do(f) 調用,f 函數的那個單次調用一定 happens before 任何 once.Do(f) 調用的返回