博客 / 詳情

返回

golang中的幾種併發模式

0.1、索引

https://blog.waterflow.link/articles/1663551951058

1、for- select模式

這種模式通常用在從多個通道讀取數據

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1, ch2 := make(chan int), make(chan int)

  // 每2秒不斷往通道1寫數據
    go func() {
        i := 0
        for {
            i += 2
            ch1 <- i
            time.Sleep(2 * time.Second)
        }
    }()

  // 每2秒不斷往通道2寫數據
    go func() {
        i := 1
        for {
            i += 2
            ch2 <- i
            time.Sleep(2 * time.Second)
        }
    }()

  // 不斷從通道讀數據
    for {
        select {
        case v := <-ch1:
            fmt.Println("ch1:", v)
            time.Sleep(time.Second)
        case v := <-ch2:
            fmt.Println("ch2:", v)
            time.Sleep(time.Second)
        default:
            fmt.Println("default")
            time.Sleep(time.Second)
        }
    }

}

如果ch1和ch2沒數據,會走default

如果ch1和ch2都有數據會隨機選擇一個執行,之所以隨機是為了避免只執行第一個case導致飢餓

2、done-channel模式

由於goroutine不會被垃圾回收,因此很可能導致內存泄漏。

為了避免內存泄漏,goroutine應該有被觸發取消的機制。父 Goroutine 需要通過一個名為 done 的只讀通道向其子 Goroutine 發送取消信號。按照慣例,它被設置為第一個參數。

這種模式在其他模式中也被大量使用。

package main

import (
    "fmt"
)

func main() {
    jobs := make(chan int, 5)
    done := make(chan bool)

    go doWork(done, jobs)

    for j := 1; j <= 3; j++ {

        fmt.Println("sent job", j)
        jobs <- j
    }
    close(jobs)
    fmt.Println("sent all jobs")

  // 任務結束
    done <- true
}

func doWork(done chan bool, jobs chan int) {
    for {
        select {
        case j, more := <-jobs:
            if more {
                fmt.Println("received job", j)
            } else {
                fmt.Println("received all jobs")
            }
        case <-done: // 任務結束,關閉子協程
            return
        default:
        }
    }
}

3、or-done模式

該模式旨在將多個完成通道組合成一個 agg_done;這意味着如果一個 done 通道發出信號,則整個 agg_done 通道也將關閉。然而,我們不知道在運行時完成通道的數量。

or-done 模式可以通過使用 goroutine 和 遞歸 來實現。

示例中 使上下遞歸函數像樹一樣相互依賴。上部將自己的 orDone 通道注入下部。然後下層也將自己的 orDone 返回給上層。

如果任何 orDone 通道關閉,則通知上層和下層。

這點和上面done-channel模式是不同的,上面是所有goroutine完成任務,這裏是只要有1個goroutine完成就結束所有goroutine。

就好比發送一個請求到多個微服務節點,只要有1個返回就算完成。

package main

import (
    "fmt"
    "time"
)

func main() {
    var or func(channels ...<-chan interface{}) <-chan interface{}
    // 只要有1個結束阻塞,關閉orDone並返回
    or = func(channels ...<-chan interface{}) <-chan interface{} {
        // 小於2個通道直接返回
        switch len(channels) {
        case 0:
            return nil
        case 1:
            return channels[0]
        }
        // 聲明一個orDone
        orDone := make(chan interface{})
        go func() {
            // 完成關閉orDone
            defer close(orDone)
            switch len(channels) {
            case 2: // 如果是2個channel,只需要監聽這兩個
                select {
                case <-channels[0]:
                case <-channels[1]:
                }
            default:
                // 二分法遞歸
                m := len(channels) / 2
                select {
                case <-or(channels[:m]...):
                case <-or(channels[m:]...):
                }
            }
        }()
        return orDone
    }

    // 傳入一個時間模擬請求時長,時間到了就close掉,結束當前channel的阻塞
    sig := func(after time.Duration) <-chan interface{} {
        c := make(chan interface{})
        go func() {
            defer close(c)
            time.Sleep(after)
        }()
        return c
    }

    start := time.Now()
    // 這裏orDone開始是阻塞的,裏面開了5個channel
    <-or(
        sig(2*time.Hour),
        sig(5*time.Minute),
        sig(1*time.Second),
    )
    fmt.Printf("done after %v\n", time.Since(start))
}

4、fanout-channel模式

意思是隻有1個輸入channel,有多個輸出channel,經常用在設計模式中的觀察者模式。觀察者模式中,當數據發生變動後,多個觀察者都會收到這個信號。

package main

import (
    "fmt"
    "time"
)

func main() {
    // 輸入的channel,相當於被觀察者
    ch := make(chan interface{})
    go func() {
        for {
            ch <- time.Now()
            time.Sleep(3 * time.Second)
        }
    }()

    // 觀察者
    out := make([]chan interface{}, 2)
    for k := range out {
        out[k] = make(chan interface{})
    }

    go fanout(ch, out)

    // 是否觀察到數據變化
    for {
        select {
        case res := <-out[0]:
            fmt.Println(res)
        case res := <-out[1]:
            fmt.Println(res)
        }
    }
}

func fanout(ch <-chan interface{}, out []chan interface{}) {
    defer func() {
        for i := 0; i < len(out); i++ {
            close(out[i])
        }
    }()

    // 訂閲被觀察者
    for v := range ch {
        v := v
        for i := 0; i < len(out); i++ {
            i := i
            out[i] <- v
        }
    }

}

5、fan-in-channel模式

和上面的相反,這個是指多個源channel輸入,一個目標channel輸出的情況。

package main

import (
    "fmt"
    "time"
)

func main() {
    // 輸入的channel
    in := make([]chan interface{}, 2)
    in2 := make([]<-chan interface{}, 2)

    for k := range in {
        k := k
        in[k] = make(chan interface{})
        var inin <-chan interface{} = in[k]
        in2[k] = inin

        go func() {
            for {
                in[k] <- time.Now()
                time.Sleep(3 * time.Second)
            }
        }()

    }

    // 打印輸出的channel
    for v := range fanIn(in2...) {
        fmt.Println(v)
    }

}

func fanIn(chans ...<-chan interface{}) <-chan interface{} {
    switch len(chans) {
    case 0:
        c := make(chan interface{})
        close(c)
        return c
    case 1:
        return chans[0]
    case 2:
        return mergeTwo(chans[0], chans[1])
    default: // 多個channel二分法
        m := len(chans) / 2
        return mergeTwo(fanIn(chans[:m]...), fanIn(chans[m:]...))
    }

}

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
    // 針對2個channel輸出
    c := make(chan interface{})
    go func() {
        defer close(c)
        for a != nil || b != nil {
            select {
            case v, ok := <-a:
                if !ok {
                    a = nil
                    continue
                }
                c <- v
            case v, ok := <-b:
                if !ok {
                    b = nil
                    continue
                }
                c <- v
            }

        }
    }()
    return c
}
user avatar rwxe 頭像
1 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.