動態

詳情 返回 返回

Golang 使用通道實現流程的暫停與繼續 - 動態 詳情

Golang 中主流程要控制某個協程的暫停與繼續,需要兩個通道分別接收來自主流程的通知,並在協程中始終監聽這兩個通知。例如:

package main

import (
    "fmt"
    "time"
)

func main() {
    // 創建一個通道
    chPause := make(chan struct{})
    chResume := make(chan struct{})

    // 啓動一個 goroutine
    go func() {
        // 初始化 isPaused
        isPaused := false

        // 等待通知
        for {
            select {
            case <-chPause:
                // 暫停執行
                isPaused = true
                fmt.Println("已暫停。")
            case <-chResume:
                // 繼續執行
                isPaused = false
                fmt.Println("繼續執行。")
            default:
                // 暫停執行
                if isPaused {
                    continue
                }
                // 執行任務
                fmt.Println("執行任務...")
                time.Sleep(time.Millisecond * 500)
            }
        }
    }()

    // 發送通知
    time.Sleep(time.Second * 2)
    chPause <- struct{}{}
    time.Sleep(time.Second * 2)
    chResume <- struct{}{}
    time.Sleep(time.Second)
}

以上代碼中,對於已經開始執行的任務無法暫停,只能做到收到暫停通知後不再繼續執行後續任務。再考慮到chPausechResume均為無緩衝通道,這意味着任務未執行完畢時,暫時不接收chPausechResume,即發送端會被阻塞。此時,可以認為,只要暫停信號成功送入,即表示之前的任務已暫停,且不會執行後續的任務。若之前任務未暫停,則需要待之前的任務執行完畢後才會送入。即,暫停信號的送入和成功暫停是同步的。

如果想要無阻塞送入暫停信號,可以為chPause設置緩衝,並在成功發送通知後阻止再次送入信號。

相應地,由於送入暫停信號和實際暫停相分離,需要設置回調機制通知送入信號一方已成功暫停,以實現送入暫停信號和成功暫停的完整異步邏輯。

例如:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type CoroutineControl struct {
    pauseChan    chan struct{}
    resumeChan   chan struct{}
    pauseNotify  chan struct{}
    resumeNotify chan struct{}
    isPaused     bool
    mu           sync.Mutex
}

func NewCoroutineControl() *CoroutineControl {
    return &CoroutineControl{
        pauseChan:    make(chan struct{}),
        resumeChan:   make(chan struct{}),
        pauseNotify:  make(chan struct{}),
        resumeNotify: make(chan struct{}),
    }
}

func (cc *CoroutineControl) Run(ctx context.Context) {
    go func() {
        defer fmt.Println("canceled.")
        for {
            select {
            case <-cc.pauseChan:
                cc.mu.Lock()
                cc.isPaused = true
                fmt.Println("Paused.")
                close(cc.pauseNotify)
                cc.mu.Unlock()
                select {
                case <-cc.resumeChan:
                    cc.mu.Lock()
                    cc.isPaused = false
                    fmt.Println("Resumed.")
                    close(cc.resumeNotify)
                    cc.mu.Unlock()
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            default:
                cc.mu.Lock()
                if !cc.isPaused {
                    fmt.Println("Working...")
                }
                cc.mu.Unlock()
                time.Sleep(time.Millisecond * 500)
            }
        }
    }()
}

func (cc *CoroutineControl) Pause() {
    cc.mu.Lock()
    if !cc.isPaused {
        cc.pauseNotify = make(chan struct{})
        cc.mu.Unlock()
        cc.pauseChan <- struct{}{}
        <-cc.pauseNotify
    } else {
        cc.mu.Unlock()
    }
}

func (cc *CoroutineControl) Resume() {
    cc.mu.Lock()
    if cc.isPaused {
        cc.resumeNotify = make(chan struct{})
        cc.mu.Unlock()
        cc.resumeChan <- struct{}{}
        <-cc.resumeNotify
    } else {
        cc.mu.Unlock()
    }
}

func main() {
    control := NewCoroutineControl()
    ctx, cancel := context.WithCancel(context.Background())

    control.Run(ctx)

    time.Sleep(time.Second)
    control.Pause()

    time.Sleep(time.Second)
    control.Resume()

    time.Sleep(time.Second * 2)
    control.Pause()

    fmt.Println("waiting for being canceled.")
    // 模擬在其他地方調用cancel來取消協程
    time.Sleep(time.Second * 2)
    cancel()

    // 等待協程退出
    time.Sleep(time.Second)
}
user avatar kubeexplorer 頭像 zhaozixing 頭像 xiaolanbenlan 頭像 lixingning 頭像 weiwudejiqimao 頭像 shanliangdeshou_ccwzfd 頭像 hantianfeng 頭像
點贊 7 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.