動態

詳情 返回 返回

學習Go語言併發編程 - 動態 詳情

關於併發

Go 語言的創始人Rob Pike 曾説過:並行關乎執行,併發關乎結構。他認為:
• 併發是一種程序設計方法:將一個程序分解成多個小片段,每個小片段獨立執行;併發程序的小片段之間可以通過通信相互協作。
• 並行是有關執行的,它表示同時進行一些計算任務。

程序小片段之間通訊不同語言實現不同,比如:傳統語言Java使用共享內存方式達到線程之間通訊,而Go語言channel來進行通訊。

原生線程、Java線程、Goroutine

Java中的多線程,由 JVM 在 Java 堆中分配內存來存儲線程的相關信息,包括線程棧、程序計數器等。當需要執行 Java 線程時,它會向操作系統請求分配一個或多個原生線程(例如 POSIX 線程或 Windows 線程),操作系統分配成功後,JVM 會將 Java 線程與這些原生線程進行映射,並建立關聯,並在需要時將 Java 線程的狀態同步到相應的原生線程中。

由此可以看出,Java線程和原生線程1:1對應,由操作系統(OS)調度算法執行,該併發以下特點:

  • 線程棧默認空間大且不支持動態伸縮,Java 默認最小都是1MB,Linux 默認 8MB;
  • 線程切換創建、銷燬以及線程間上下文切換的代價都較大。
  • 線程通過共享內存進行通訊,

POSIX線程(Pthreads)是C函數、類型和常數的集合,用於創建和管理線程。它是POSIX標準的一個子集,提供在BeagleBone Black上使用C/C++應用程序實現線程所需的一切。

原生線程就是操作系統線程或叫系統線程。

Go語言引入用户層輕量級線程(Goroutine),它由Go運行時負責調度。Goroutine相比傳統操作系統線程而言有如下優勢。

  • 資源佔用小,每個Goroutine的初始棧大小僅為2KB,且支持動態伸縮,避免內存浪費;
  • 由Go運行時而不是操作系統調度,goroutine上下文切換代價較小;
  • 內置channel作為goroutine間通信原語,為併發設計提供強大支撐。

瞭解Go調度原理

Go 語言實現了調度器(scheduler),它負責將 goroutine 分配到原生線程上執行。

G-P-M模型

Go 語言中的調度模型(G-P-M模型)它包含了三個重要組件:G(goroutine)、P(processor)、M(machine)。

GPM

  • G(goroutine):一個執行單元,這裏也就是 goroutine,它包含了執行代碼所需的信息,比如棧空間、程序計數器等。
  • P(processor):P 一個邏輯處理器,它負責執行 goroutine。每個 P 維護了一個 goroutine 隊列,它可以將 goroutine 分配到 M(系統線程)上執行。P 的數量由 GOMAXPROCS 環境變量決定,默認值為 CPU 的邏輯核心數。
  • M(machine):一個系統線程(machine),它負責執行 goroutine 的真正計算工作。M 與操作系統的線程直接綁定,負責實際的計算任務,比如執行 goroutine 的函數、系統調用等。Go 語言的調度器會將多個 goroutine 映射到少量的系統線程上執行。

搶佔式調度

在上面模型中,如果某個G處於死循環或長時間執行(比如:進行系統調用,IO操作),那麼P隊列裏面的G就長時間得不到執行,為了解決此問題,需要使用搶佔式調度。

Java 中有以下兩種搶佔式調度算法

  1. 優先級調度(Priority Scheduling)

    • 每個線程都有一個優先級,高優先級的線程會比低優先級的線程更容易獲得CPU的執行權(注意:設置了優先級不是絕對優先執行,只是概率上高)。
    • 在Java中,線程的優先級範圍是從Thread.MIN_PRIORITY(1)到Thread.MAX_PRIORITY(10),默認是Thread.NORM_PRIORITY(5)。
  2. 時間片輪轉調度(Round Robin Scheduling)

    • 每個線程被分配一個固定的時間片,當該線程的時間片用完時,操作系統會暫停它的執行,將CPU控制權交給下一個線程。
    • 在Java中,時間片輪轉調度通過yield()方法來實現。當線程調用yield()時,它就會主動放棄CPU的執行權,讓其他線程有機會執行。

Go 語言與Java搶佔調度不同,Java是實際上是操作系統時間片輪轉調度,發生在內核層。Go 搶佔調度是發生在用户層,由 Go 運行時管理,通過軟件定時器和搶佔點來實現搶佔。

Go 程序啓動時會創建一個線程(稱為監控線程),該線程運行一個內部函數 sysmon ,用來進行系統監控任務,如垃圾回收、搶佔調度、監視死鎖等。這個函數在後台運行,確保 Go 程序的正常運行。

func main() {
  ...
    if GOARCH != "wasm" { 
     // 系統棧上的函數執行
        systemstack(func() {  
            newm(sysmon, nil, -1) // 用於創建新的 M(機器,代表一個操作系統線程)。
        })
    } 
  ...
}

sysmon 每20us~10ms啓動一次,大體工作:

  • 釋放閒置超過5分鐘的span物理內存;
  • 如果超過2分鐘沒有垃圾回收,強制執行;
  • 將長時間未處理的netpoll結果添加到任務隊列;
  • 向長時間運行的G任務發出搶佔調度;
  • 收回因syscall長時間阻塞的P。

具體來説,以下情況會觸發搶佔式調度:

  1. 系統調用:當一個 goroutine 執行系統調用時,調度器會將該 goroutine 暫停,並將處理器分配給其他可運行的 goroutine。一旦系統調用完成,被暫停的 goroutine 可以繼續執行。
  2. 函數調用:當一個 goroutine 調用一個阻塞的函數(如通道的發送和接收操作、鎖的加鎖和解鎖操作等)時,調度器會將該 goroutine 暫停,並將處理器分配給其他可運行的 goroutine。一旦被阻塞的函數可以繼續執行,被暫停的 goroutine 可以繼續執行。
  3. 時間片耗盡:每個 goroutine 在運行一段時間後都會消耗一個時間片。當時間片耗盡時,調度器會將當前正在運行的 goroutine 暫停,並將處理器分配給其他可運行的 goroutine。被暫停的 goroutine 將會被放入到就緒隊列中,等待下一次調度。

GO併發模型

Go 使用 CSP(Communicating Sequential Processes,通信順序進程)併發編程模型,該模型由計算機科學家 Tony Hoare 在 1978 年提出。

在Go中,針對CSP模型提供了三種併發原語:

  • goroutine:對應CSP模型中的P(原意是進程,在這裏也就是goroutine),封裝了數據的處理邏輯,是Go運行時調度的基本執行單元。
  • channel:對應CSP模型中的輸入/輸出原語,用於goroutine之間的通信和同步。
  • select:用於應對多路輸入/輸出,可以讓goroutine同時協調處理多個channel操作。

Go 奉行“不要通過共享內存來通信,而應通過通信來共享內存。”,也就是推薦通過channel來傳遞值,讓goroutine相互通訊協作。

channel 分為無緩衝和有緩衝,使用通道時遵循以下規範:

  1. 在無緩衝通道上,每一次發送操作都有對應匹配的接收操作。
  2. 對於從無緩衝通道進行的接收,發生在對該通道進行的發送完成之前。
  3. 對於帶緩衝的通道(緩存大小為C),通道中的第K個接收完成操作發生在第K+C個發送操作完成之前。
  4. 如果將C=0就是無緩衝的通道,也就是第K個接收完成在第K個發送完成之前。
func sender(ch chan<- int, done chan<- bool) {
    fmt.Println("Sending...")
    ch <- 42 // 發送數據到無緩衝通道
    fmt.Println("Sent")
    done <- true // 發送完成信號
}

func receiver(ch <-chan int, done <-chan bool) {
    <-done // 等待發送操作完成信號
    fmt.Println("Receiving...")
    val := <-ch // 從無緩衝通道接收數據
    fmt.Println("Received:", val)
}

func main() {
    ch := make(chan int) // 創建無緩衝通道
    done := make(chan bool) // 用於發送操作完成信號

    go sender(ch, done)   // 啓動發送goroutine
    go receiver(ch, done) // 啓動接收goroutine

    time.Sleep(2 * time.Second) // 等待一段時間以觀察結果
}

有緩衝通道

func sender(ch chan<- int) {
    for i := 0; i < 5; i++ {
        fmt.Println("Sending:", i)
        ch <- i // 發送數據到通道
        fmt.Println("Sent:", i)
    }
    close(ch)
}

func receiver(ch <-chan int) {
    for {
        val, ok := <-ch // 從通道接收數據
        if !ok {
            fmt.Println("Channel closed")
            return
        }
        fmt.Println("Received:", val)
        time.Sleep(1 * time.Second) // 模擬接收操作耗時
    }
}

func main() {
    ch := make(chan int, 2) // 創建帶緩衝大小為2的通道

    go sender(ch)   // 啓動發送goroutine
    go receiver(ch) // 啓動接收goroutine

    time.Sleep(10 * time.Second) // 等待一段時間以觀察結果
}

Go併發場景

並行計算

利用goroutine併發執行任務,加速計算過程。

// calculateSquare 是一個計算數字平方的函數,它模擬了一個耗時的計算過程。
func calculateSquare(num int, resultChan chan<- int) {
    time.Sleep(1 * time.Second) // 模擬耗時計算
    resultChan <- num * num
}

func main() {
    nums := []int{1, 2, 3, 4, 5}
    resultChan := make(chan int)

    // 啓動多個goroutine併發計算數字的平方
    for _, num := range nums {
        go calculateSquare(num, resultChan)
    }

    // 從通道中接收計算結果並打印
    for range nums {
        result := <-resultChan
        fmt.Println("Square:", result)
    }
    close(resultChan)
}

IO密集型任務

在處理IO密集型任務時,可以使用goroutine和channel實現併發讀寫操作,提高IO效率。


// fetchURL 函數用於獲取指定URL的內容,並將結果發送到通道resultChan中。
func fetchURL(url string, resultChan chan<- string) {
    resp, err := http.Get(url)
    if err != nil {
        resultChan <- fmt.Sprintf("Error fetching %s: %s", url, err)
        return
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        resultChan <- fmt.Sprintf("Error reading response from %s: %s", url, err)
        return
    }
    resultChan <- string(body)
}

func main() {
    urls := []string{"https://example.com", "https://example.org", "https://example.net"}
    resultChan := make(chan string)

    // 啓動多個goroutine併發獲取URL的內容
    for _, url := range urls {
        go fetchURL(url, resultChan)
    }

    // 從通道中接收結果並打印
    for range urls {
        result := <-resultChan
        fmt.Println("Response:", result)
    }
    close(resultChan)
}

併發數據處理

對於需要同時處理多個數據流的情況,可以使用goroutine和channel實現併發數據處理,例如數據流的合併、拆分、過濾等操作。

// processData 函數用於處理從dataStream中接收的數據,並將處理結果發送到resultChan中。
func processData(dataStream <-chan int, resultChan chan<- int) {
    for num := range dataStream {
        resultChan <- num * 2 // 假設處理數據是將數據乘以2
    }
}

func main() {
    dataStream := make(chan int)
    resultChan := make(chan int)

    // 產生數據併發送到dataStream中
    go func() {
        for i := 1; i <= 5; i++ {
            dataStream <- i
        }
        close(dataStream)
    }()

    // 啓動goroutine併發處理數據
    go processData(dataStream, resultChan)

    // 從通道中接收處理結果並打印
    for range dataStream {
        result := <-resultChan
        fmt.Println("Processed Data:", result)
    }
    close(resultChan)
}

併發網絡編程

編寫網絡服務器或客户端時,可以利用goroutine處理每個連接,實現高併發的網絡應用。

// handler 是一個HTTP請求處理函數,它會向客户端發送"Hello, World!"的響應。
func handler(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Hello, World!")
}

func main() {
    // 註冊HTTP請求處理函數
    http.HandleFunc("/", handler)

    // 啓動HTTP服務器並監聽端口8080
    go http.ListenAndServe(":8080", nil)
    fmt.Println("Server started on port 8080")

    // 使用select{}使主goroutine保持運行狀態,以便HTTP服務器能夠處理請求
    select {}
}

定時任務和週期性任務

// task 是一個需要定時執行的任務函數。
func task() {
    fmt.Println("Task executed at:", time.Now())
}

func main() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    // 循環等待定時器的觸發並執行任務
    for {
        select {
        case <-ticker.C:
            task()
        }
    }
}

工作池

通過創建一組goroutine來處理任務池中的任務,可以有效地控制併發數量,適用於需要限制併發的情況。

// worker 是一個工作函數,它會從jobs通道中接收任務,並將處理結果發送到results通道中。
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d started job %d\n", id, job)
        time.Sleep(1 * time.Second) // 模擬工作時間
        fmt.Printf("Worker %d finished job %d\n", id, job)
        results <- job * 2 // 假設工作的結果是輸入的兩倍
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)    // 緩衝channel用於發送任務
    results := make(chan int, numJobs) // 用於接收任務結果

    // 啓動多個worker goroutine
    var wg sync.WaitGroup
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, jobs, results)
        }(i)
    }

    // 發送任務到jobs channel
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 關閉jobs channel

    // 等待所有worker完成並收集結果
    go func() {
        wg.Wait()
        close(results)
    }()

    // 從通道中接收處理結果並打印
    for result := range results {
        fmt.Println("Result:", result)
    }
}

Add a new 評論

Some HTML is okay.