Golang 中主流程要控制某個協程的暫停與繼續,需要兩個通道分別接收來自主流程的通知,並在協程中始終監聽這兩個通知。例如:
package main
import (
"fmt"
"time"
)
func main() {
// 創建一個通道
chPause := make(chan struct{})
chResume := make(chan struct{})
// 啓動一個 goroutine
go func() {
// 初始化 isPaused
isPaused := false
// 等待通知
for {
select {
case <-chPause:
// 暫停執行
isPaused = true
fmt.Println("已暫停。")
case <-chResume:
// 繼續執行
isPaused = false
fmt.Println("繼續執行。")
default:
// 暫停執行
if isPaused {
continue
}
// 執行任務
fmt.Println("執行任務...")
time.Sleep(time.Millisecond * 500)
}
}
}()
// 發送通知
time.Sleep(time.Second * 2)
chPause <- struct{}{}
time.Sleep(time.Second * 2)
chResume <- struct{}{}
time.Sleep(time.Second)
}
以上代碼中,對於已經開始執行的任務無法暫停,只能做到收到暫停通知後不再繼續執行後續任務。再考慮到chPause和chResume均為無緩衝通道,這意味着任務未執行完畢時,暫時不接收chPause和chResume,即發送端會被阻塞。此時,可以認為,只要暫停信號成功送入,即表示之前的任務已暫停,且不會執行後續的任務。若之前任務未暫停,則需要待之前的任務執行完畢後才會送入。即,暫停信號的送入和成功暫停是同步的。
如果想要無阻塞送入暫停信號,可以為chPause設置緩衝,並在成功發送通知後阻止再次送入信號。
相應地,由於送入暫停信號和實際暫停相分離,需要設置回調機制通知送入信號一方已成功暫停,以實現送入暫停信號和成功暫停的完整異步邏輯。
例如:
package main
import (
"context"
"fmt"
"sync"
"time"
)
type CoroutineControl struct {
pauseChan chan struct{}
resumeChan chan struct{}
pauseNotify chan struct{}
resumeNotify chan struct{}
isPaused bool
mu sync.Mutex
}
func NewCoroutineControl() *CoroutineControl {
return &CoroutineControl{
pauseChan: make(chan struct{}),
resumeChan: make(chan struct{}),
pauseNotify: make(chan struct{}),
resumeNotify: make(chan struct{}),
}
}
func (cc *CoroutineControl) Run(ctx context.Context) {
go func() {
defer fmt.Println("canceled.")
for {
select {
case <-cc.pauseChan:
cc.mu.Lock()
cc.isPaused = true
fmt.Println("Paused.")
close(cc.pauseNotify)
cc.mu.Unlock()
select {
case <-cc.resumeChan:
cc.mu.Lock()
cc.isPaused = false
fmt.Println("Resumed.")
close(cc.resumeNotify)
cc.mu.Unlock()
case <-ctx.Done():
return
}
case <-ctx.Done():
return
default:
cc.mu.Lock()
if !cc.isPaused {
fmt.Println("Working...")
}
cc.mu.Unlock()
time.Sleep(time.Millisecond * 500)
}
}
}()
}
func (cc *CoroutineControl) Pause() {
cc.mu.Lock()
if !cc.isPaused {
cc.pauseNotify = make(chan struct{})
cc.mu.Unlock()
cc.pauseChan <- struct{}{}
<-cc.pauseNotify
} else {
cc.mu.Unlock()
}
}
func (cc *CoroutineControl) Resume() {
cc.mu.Lock()
if cc.isPaused {
cc.resumeNotify = make(chan struct{})
cc.mu.Unlock()
cc.resumeChan <- struct{}{}
<-cc.resumeNotify
} else {
cc.mu.Unlock()
}
}
func main() {
control := NewCoroutineControl()
ctx, cancel := context.WithCancel(context.Background())
control.Run(ctx)
time.Sleep(time.Second)
control.Pause()
time.Sleep(time.Second)
control.Resume()
time.Sleep(time.Second * 2)
control.Pause()
fmt.Println("waiting for being canceled.")
// 模擬在其他地方調用cancel來取消協程
time.Sleep(time.Second * 2)
cancel()
// 等待協程退出
time.Sleep(time.Second)
}