在當今雲計算與微服務盛行的時代,分佈式任務系統已成為支撐大規模業務的核心基礎設施。今天就來為大家分享下如何基於 Go 語言從零設計和實現一個架構簡潔且擴展性強的分佈式任務系統。
前置概念
本文會設計並實現一個分佈式任務系統,這裏我們要先明確兩個概念。
- 分佈式:在我們將要實現的分佈式任務系統中,分佈式是指我們的服務可以部署多個副本,這樣才能確保服務更加穩定。
- 任務:這裏的任務是指異步任務,可能是定時或需要週期性運行的任務。
有了這兩個前置概念,我們再來分析下在 Go 中如何實現分佈式和如何處理異步任務。
異步任務
在 Go 中,要處理異步任務有多種方式,比如原生支持的 time.Sleep、time.Timer 或 time.Ticke,再比如一些第三方包 go-co-op/gocron/v2、robfig/cron/v3 或 bamzi/jobrunner 等。本項目在調研過後決定採用 robfig/cron/v3 包(以下簡稱 cron)來處理異步任務,原因如下:
cron是一個非常流行的包,支持標準crontab表達式(並且可精確到秒),支持時區、任務鏈等高級功能。- 提供秒級精度的任務調度。
- 輕量級,且<font style="color:rgb(33, 33, 33);">能輕鬆應對各種複雜的定時任務場景</font>。
對於 cron 包的使用,可以參考我的另一篇文章「在 Go 中使用 cron 執行定時任務」,裏面有詳細説明。
分佈式
既然我們的任務系統是分佈式的,那麼必然要考慮併發安全問題。當多個副本同時讀寫系統資源時,很容易產生競太問題。在分佈式場景中,解決競太問題最常用的手段當然是分佈式鎖。
Go 中的分佈式鎖解決方案也很多,常見的有基於 etcd、Redis、ZooKeeper 等中間件來實現的,因為 Redis 在系統中更加常用,所以本項目採用基於 Redis 實現分佈式鎖的解決方案。Go 中有兩個比較常用的第三方包 bsm/redislock 和 go-redsync/redsync 都是基於 Redis 的分佈式鎖實現。本項目在調研過後決定採用 go-redsync/redsync 包(以下簡稱 redsync),原因如下:
redsync遵循 Redis 官方推薦的 Redlock 算法,支持多節點,容忍部分節點故障,避免單點問題。- 通過多數派機制確保鎖的全局唯一性,降低鎖衝突風險。
redsync是 Redis 官方 唯一推薦的 Go Redis 分佈式鎖解決方案,由 Redis 社區背書,長期維護,可靠性高。
對於 redsync 包的使用,可以參考我的另一篇文章「在 Go 中如何使用分佈式鎖解決併發問題?」,裏面有詳細説明。
分佈式任務系統
現在我們對分佈式任務系統中的分佈式和任務都有了明確的認識,並且找到了解決方案。那麼接下來就可以設計並實現分佈式任務系統了。
功能介紹
我們要實現的分佈式任務系統叫 nightwatch,nightwatch 是守夜、值班的意思,那麼這套系統的功能也就一目瞭然了,就是用來 24 小時不停機的執行異步任務的。
nightwatch 要實現的主要功能如下:
現在我們有一個系統,用户可以在 Web 頁面通過表單提交一個“任務”到關係型數據庫的任務表中。然後 nightwatch 系統會定時的掃描任務表,取出待執行的任務,並根據任務中的配置,到 Kubernetes 中拉起 Job 資源對象,真正的執行任務。此外,nightwatch 還會取出已經開始執行的任務,然後去 Kubernetes 中獲取當前任務對應的 Job 實時狀態,並回寫到數據庫中。直到 Kubernetes 中的 Job 執行完成(或失敗),nightwatch 會標記 Job 在數據庫表中的任務狀態為完成(或失敗)。當任務狀態為完成(或失敗),則任務任務終止,nightwatch 不再掃描出這種狀態的數據。
系統整體架構如下:
nightwatch 是系統中一個非常核心的組件,用來控制任務的執行,並同步任務狀態。
架構設計
現在我們知道了 nightwatch 的作用,那麼就可以設計其實現架構了。
nightwatch 架構設計如下:
首先,我們需要思考一個問題,分佈式鎖應該在何時使用?
在分佈式任務系統中,我們有兩種方式使用分佈式鎖來保證併發安全。一種是在執行具體的定時任務時,多個副本之間進行競爭,誰搶到鎖,誰就可以執行任務,未搶到鎖的副本可以選擇性的跳過此次執行週期。另一種是在 nigthwatch 啓動時,就開始搶鎖,多個副本之間誰搶到鎖,誰就去執行任務調度,未搶到鎖的副本則進行週期性的嘗試搶鎖操作,如果當前執行任務調度的副本被終止,那麼其他副本就有機會搶到鎖,並執行任務調度。
這兩種方式個各自有不同的使用場景,第一種方式的優勢是能夠實現多副本之間的負載均衡,多個副本都在工作,都有可能搶到鎖並執行任務,不過這種方式不能嚴格控制執行任務的間隔時間,比較適合對間隔時間要求不嚴格的任務。第二種方式實際上只有一個副本在執行任務調度,其他副本是空載狀態,是主備設計,這種方式的好處是能夠嚴格控制任務執行的間隔時間。
nigthwatch 採用第二種方式來使用分佈式鎖保證併發安全。所以在 nigthwatch 的架構設計中,在啓動 nigthwatch 時,先將所有的定時任務註冊到任務調度器中,接着就會進行搶鎖操作,只有搶到鎖的副本才能夠執行任務調度。未搶到鎖,則使用一個循環週期性的嘗試搶鎖,直到搶鎖成功。對於搶到鎖的副本,當註冊的任務定時策略達到時,任務調度器就會執行任務。架構圖中的 task 就是我們要實現的異步任務,也是主要業務邏輯,task 組件會從數據庫表中讀取任務,然後在 Kubernetes 中啓動 Job,並同步數據庫和 Kubernetes 資源之間的狀態。
目錄結構
我們現在已經設計好了 nigthwatch 的架構,可以動手進行開發實現了。
以下是 nigthwatch 項目的目錄和文件:
$ tree nightwatch
nightwatch # 項目目錄
├── README.md # README 文件
├── assets # 項目相關的資源目錄
│ ├── docker-compose.yaml # 用於啓動項目依賴的 MariaDB 和 Redis
│ └── schema.sql # 測試數據 SQL
├── cmd # 項目啓動入口
│ └── main.go
├── go.mod
├── go.sum
├── internal # 項目內部包
│ ├── logger.go # 定製日誌
│ ├── nightwatch.go # nightwatch 的實現和啓動入口
│ └── watcher # 任務接口和實現
│ ├── all # 任務註冊入口
│ │ └── all.go
│ ├── config.go # 任務配置
│ ├── task # 任務實現,一個可以定時同步 MariaDB 和 Kubernetes 任務狀態的示例程序
│ │ ├── task.go
│ │ └── watcher.go
│ └── watcher.go # 任務接口
└── pkg # 項目公共包
├── db # 數據庫實例
│ ├── mysql.go
│ └── redis.go
├── meta # 元信息
│ └── where.go # MariaDB where 查詢條件元信息封裝
├── model # 任務模型
│ └── task.go
├── store # 數據庫操作接口
│ ├── helper.go
│ ├── store.go
│ └── task.go
└── util # 工具包
└── reflect
└── reflect.go
14 directories, 21 files
這裏主要的目錄和文件我都標明瞭其用途,不必完全記住,你先有個印象,大概知道整個項目的結構。
調用鏈路
為了便於你理解代碼,我畫了一張 nigthwatch 項目的調用鏈路圖:
這個調用鏈路圖指明瞭 nigthwatch 項目中所有目錄之間的代碼調用關係。根據這張圖,可以看出這是一個非常簡潔的架構。cmd 中的入口函數 main 會調用 internal 中的 nigthwatch 包,nigthwatch 是分佈式系統實現的關鍵所在,這裏實現了任務的註冊和調度,watcher 定義了任務的接口,task 就是任務的具體實現,task 的業務邏輯中會依賴 store 層來讀寫數據庫,所以 store 會依賴 model 和 db。
代碼實現
接下來就進入到真正的編碼階段了。
首先我們需要為 nigthwatch 項目的業務設計一張任務表,建表 SQL 語句如下:
https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/assets/schema.sql
CREATE TABLE IF NOT EXISTS `task` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(45) NOT NULL DEFAULT '' COMMENT '任務名稱',
`namespace` varchar(45) NOT NULL DEFAULT '' COMMENT 'k8s namespace 名稱',
`info` TEXT NOT NULL COMMENT '任務 k8s 相關信息',
`status` varchar(45) NOT NULL DEFAULT '' COMMENT '任務狀態',
`user_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '用户 ID',
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_name_namespace` (`name`, `namespace`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任務表';
為了簡化你理解項目的成本,這裏僅定義了最小需要字段。
同時我們可以插入兩條測試數據,用來後續項目功能的驗證:
INSERT INTO `task` (`id`, `name`, `namespace`, `info`, `status`, `user_id`) VALUES (1, 'demo-task-1', 'default', '{"image":"alpine","command":["sleep"],"args":["60"]}', 'Normal', 1);
INSERT INTO `task` (`id`, `name`, `namespace`, `info`, `status`, `user_id`) VALUES (2, 'demo-task-2', 'demo', '{"image":"busybox","command":["sleep"],"args":["3600"]}', 'Normal', 2);
拿 ID 為 1 的 task 數據舉例,任務名是 demo-task-1,namespace 是 default,鏡像是 alpine,執行命令是 sleep,命令參數是 60,狀態為 Normal 表示待執行。當 nigthwatch 服務掃描到這條數據時,就會在 Kubernetes 中 default 這個 namespace 下創建一個 name 為 demo-task-1 的 Job,其啓動鏡像為 alpine,啓動命令為 sleep 60,即睡眠 60 秒然後退出。
現在有了數據庫表和測試數據,我們來看看 nigthwatch 代碼是如何實現的。
入口文件 cmd/main.go 實現如下:
https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/cmd/main.go
package main
import (
"flag"
"log/slog"
"path/filepath"
"time"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"github.com/jianghushinian/blog-go-example/nightwatch/internal"
"github.com/jianghushinian/blog-go-example/nightwatch/pkg/db"
)
func main() {
slog.SetLogLoggerLevel(slog.LevelDebug)
var kubecfg *string
if home := homedir.HomeDir(); home != "" {
kubecfg = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "Optional absolute path to kubeconfig")
} else {
kubecfg = flag.String("kubeconfig", "", "Absolute path to kubeconfig")
}
config, err := clientcmd.BuildConfigFromFlags("", *kubecfg)
if err != nil {
slog.Error(err.Error())
return
}
config.QPS = 50
config.Burst = 100
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
slog.Error(err.Error())
return
}
cfg := nightwatch.Config{
MySQLOptions: &db.MySQLOptions{
Host: "127.0.0.1:33306",
Username: "root",
Password: "nightwatch",
Database: "nightwatch",
MaxIdleConnections: 100,
MaxOpenConnections: 100,
MaxConnectionLifeTime: time.Duration(10) * time.Second,
},
RedisOptions: &db.RedisOptions{
Addr: "127.0.0.1:36379",
Username: "",
Password: "nightwatch",
Database: 0,
MaxRetries: 3,
MinIdleConns: 0,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
PoolSize: 10,
},
Clientset: clientset,
}
nw, err := cfg.New()
if err != nil {
slog.Error(err.Error())
return
}
stopCh := genericapiserver.SetupSignalHandler()
nw.Run(stopCh)
}
因為 main.go 非常重要,是整個程序的入口,所以我就把完整代碼都貼出來了,包括 import 部分,這是為了讓你對項目文件之間的依賴關係有一個更清晰的認知,後續講解的其他模塊我就只會貼出核心代碼。
main 函數的核心功能如下:
首先會初始化各種依賴包,初始化 Kubernetes clientset 用於後續操作 Job,初始化 MySQL 用於從中讀取任務和更新任務狀態,初始化 Redis 用於實現分佈式鎖。接着會使用這些初始化的對象創建一個配置對象 nightwatch.Config。然後使用 cfg.New() 創建一個 nightwatch 實例對象 nw。最後調用 nw.Run(stopCh) 啓動服務。這裏為了做優雅退出,還引用了 Kubernetes genericapiserver 優雅退出機制,你可以參考我的文章「Go 程序如何實現優雅退出?來看看 K8s 是怎麼做的——上篇」、「Go 程序如何實現優雅退出?來看看 K8s 是怎麼做的——下篇」查看其實現原理。
這裏涉及到的 Kubernetes clientset、MySQL 和 Redis 相關的具體配置細節我就不詳細講解了,咱們還是將主要精力聚焦在 nigthwatch 的主脈絡上。
接下來看下 cfg.New() 代碼實現如下:
https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go
// New 通過配置構造一個 nightWatch 對象
func (c *Config) New() (*nightWatch, error) {
rdb, err := db.NewRedis(c.RedisOptions)
if err != nil {
slog.Error(err.Error(), "Failed to create Redis client")
return nil, err
}
logger := newCronLogger()
runner := cron.New(
cron.WithSeconds(),
cron.WithLogger(logger),
cron.WithChain(cron.SkipIfStillRunning(logger), cron.Recover(logger)),
)
pool := goredis.NewPool(rdb)
lockOpts := []redsync.Option{
redsync.WithRetryDelay(50 * time.Microsecond),
redsync.WithTries(3),
redsync.WithExpiry(defaultExpiration),
}
locker := redsync.New(pool).NewMutex(lockName, lockOpts...)
cfg, err := c.CreateWatcherConfig()
if err != nil {
return nil, err
}
nw := &nightWatch{runner: runner, locker: locker, config: cfg}
if err := nw.addWatchers(); err != nil {
return nil, err
}
return nw, nil
}
*Config.New 方法會通過配置信息構造一個 nightWatch 對象並返回。這裏的 runner 就是異步任務調度器,使用 cron 包實現,用來調度和執行定時任務。並且這個方法內部還實例化了一個 redsync 分佈式鎖對象 locker。nightWatch 對象就是通過 runner、locker 和 cfg 來構造的。
這裏的核心部分是 addWatchers 的邏輯,其實現如下:
https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go
// 註冊所有 Watcher 實例到 nightWatch
func (nw *nightWatch) addWatchers() error {
for n, w := range watcher.ListWatchers() {
if err := w.Init(context.Background(), nw.config); err != nil {
slog.Error(err.Error(), "Failed to construct watcher", "watcher", n)
return err
}
spec := watcher.Every3Seconds
if obj, ok := w.(watcher.ISpec); ok {
spec = obj.Spec()
}
if _, err := nw.runner.AddJob(spec, w); err != nil {
slog.Error(err.Error(), "Failed to add job to the cron", "watcher", n)
return err
}
}
return nil
}
*nightWatch.addWatchers 方法用來註冊所有 Watcher 對象到調度器 runner 中。
Watcher 是一個接口,定義了異步任務應該實現的方法。Watcher 接口定義如下:
https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/watcher.go
type Watcher interface {
Init(ctx context.Context, config *Config) error
cron.Job
}
type ISpec interface {
Spec() string
}
var (
registryLock = new(sync.Mutex)
registry = make(map[string]Watcher)
)
func Register(watcher Watcher) {
registryLock.Lock()
defer registryLock.Unlock()
name := reflectutil.StructName(watcher)
if _, ok := registry[name]; ok {
panic("duplicate watcher entry: " + name)
}
registry[name] = watcher
}
func ListWatchers() map[string]Watcher {
registryLock.Lock()
defer registryLock.Unlock()
return registry
}
可以看到,要實現一個異步任務,需要實現 Init 方法以及 cron.Job 接口。cron.Job 接口其實只有一個方法定義如下:
type Job interface {
Run()
}
只要滿足 Watcher 接口的任務,就可以通過 Register 函數註冊到 registry 中。ListWatchers 函數則可以返回註冊到 registry 中全部任務。而 ListWatchers 函數正是在前文講解的 *nightWatch.addWatchers 方法中調用的。
到目前為止,任務如何被註冊到 nightWatch.runner 的過程我們就串起來了。接下來需要關注的兩個點是,調度器 runner 是何時啓動的,以及是何時調用 Register 函數註冊任務的。
我們先來看調度器 runner 是何時啓動的:
https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go
// Run 執行異步任務,此方法會阻塞直到關閉 stopCh
func (nw *nightWatch) Run(stopCh <-chan struct{}) {
ctx := wait.ContextForChannel(stopCh)
// 循環加鎖,直到加鎖成功,再去啓動任務
ticker := time.NewTicker(defaultExpiration + (5 * time.Second))
defer ticker.Stop()
for {
err := nw.locker.LockContext(ctx)
if err == nil {
slog.Debug("Successfully acquired lock", "lockName", lockName)
break
}
slog.Debug("Failed to acquire lock", "lockName", lockName, "err", err)
<-ticker.C
}
// 看門狗,實現鎖自動續約
ticker = time.NewTicker(extendExpiration)
defer ticker.Stop()
go func() {
for {
select {
case <-ticker.C:
if ok, err := nw.locker.ExtendContext(ctx); !ok || err != nil {
slog.Debug("Failed to extend lock", "err", err, "status", ok)
}
case <-ctx.Done():
slog.Debug("Exiting lock watchdog")
return
}
}
}()
// 啓動定時任務
nw.runner.Start()
slog.Info("Successfully started nightwatch server")
// 阻塞等待退出信號
<-stopCh
nw.stop()
}
在 *nightWatch.Run 方法中,首先會啓動一個無限循環,定時執行嘗試搶鎖操作,直到搶鎖成功。這與前文中講解的 nightwatch 架構設計是一致的。搶到鎖後,就可以執行 nw.runner.Start() 啓動調度器,執行定時任務了。
此外,在 nightwatch 架構圖中沒有體現的一點是,這裏為分佈式鎖實現了看門狗機制,用來自動續約。關於 redsync 分佈式鎖的自動續約,在我的文章「在 Go 中如何使用分佈式鎖解決併發問題?」中有詳細講解。
而這個 Run 方法,就是在 main 函數中通過 nw.Run(stopCh) 調用的。
我們還剩下一個最後要看的核心邏輯是 task 在何時會調用 Register 註冊到 registry 變量中。
還記得前文中講解的 Watcher 接口麼,*taskWatcher 實現了這個接口:
https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/task/watcher.go
var _ watcher.Watcher = (*taskWatcher)(nil)
type taskWatcher struct {
store store.IStore
clientset kubernetes.Interface
wg sync.WaitGroup
}
func (w *taskWatcher) Init(ctx context.Context, config *watcher.Config) error {
w.store = config.Store
w.clientset = config.Clientset
return nil
}
func (w *taskWatcher) Spec() string {
return "@every 30s"
}
func init() {
watcher.Register(&taskWatcher{})
}
taskWatcher 就是 task 任務的具體對象,它實現了 watcher.Watcher 接口。可以發現,Register 函數是在 init 函數中調用的,即 task 包被導入時實現自動註冊。
task 包會在 nightwatch/internal/watcher/all/all.go 文件被導入:
https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/all/all.go
package all
import (
// 觸發所有 Watcher 的 init 函數進行註冊
_ "github.com/jianghushinian/blog-go-example/nightwatch/internal/watcher/task"
)
這裏以匿名包的方式導入 task 包。如果我們還有其他的任務實現,則同樣可以參考 task 包的註冊方式,在這裏以匿名包形式導入,這也是 all 包名的由來,可以註冊全部的任務。
然後會在 nightwatch 中再次以匿名包的方式導入 all 包:
https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go
package nightwatch
import (
...
// 觸發 init 函數
_ "github.com/jianghushinian/blog-go-example/nightwatch/internal/watcher/all"
)
我們可以總結出任務的註冊流程是,nightwatch 包導入 all 包,all 包會導入 task 包,task 包的 init 函數執行就會完成註冊。所以在入口文件 main.go 導入 nightwatch 包的時候,就會觸發任務的註冊。在調用 nw.Run(stopCh) 啓動服務時,所有的任務已經註冊完成了。
taskWatcher 對象的核心邏輯當然就是 Run 方法了:
https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/task/watcher.go
// Run 運行 task watcher 任務
func (w *taskWatcher) Run() {
w.wg.Add(2)
slog.Debug("Current sync period is start")
// NOTE: 將 Normal 狀態任務在 Kubernetes 中啓動
go func() {
defer w.wg.Done()
ctx := context.Background()
_, tasks, err := w.store.Tasks().List(ctx, meta.WithFilter(map[string]any{
"status": model.TaskStatusNormal,
}))
if err != nil {
slog.Error(err.Error(), "Failed to list tasks")
return
}
var wg sync.WaitGroup
wg.Add(len(tasks))
for _, task := range tasks {
go func(task *model.Task) {
defer wg.Done()
job, err := w.clientset.BatchV1().Jobs(task.Namespace).Create(ctx, toJob(task), metav1.CreateOptions{})
if err != nil {
slog.Error(err.Error(), "Failed to create job")
return
}
task.Status = model.TaskStatusPending
if err := w.store.Tasks().Update(ctx, task); err != nil {
slog.Error(err.Error(), "Failed to update task status")
return
}
slog.Info("Successfully created job", "namespace", job.Namespace, "name", job.Name)
}(task)
}
wg.Wait()
}()
// NOTE: 同步中間狀態的任務在 Kubernetes 中的狀態到表中
go func() {
defer w.wg.Done()
ctx := context.Background()
_, tasks, err := w.store.Tasks().List(ctx, meta.WithFilterNot(map[string]any{
// 排除這幾個狀態
"status": []model.TaskStatus{model.TaskStatusNormal, model.TaskStatusSucceeded, model.TaskStatusFailed},
}))
if err != nil {
slog.Error(err.Error(), "Failed to list tasks")
return
}
var wg sync.WaitGroup
wg.Add(len(tasks))
for _, task := range tasks {
go func(task *model.Task) {
defer wg.Done()
job, err := w.clientset.BatchV1().Jobs(task.Namespace).Get(ctx, task.Name, metav1.GetOptions{})
if err != nil {
slog.Error(err.Error(), "Failed to get task")
return
}
task.Status = toTaskStatus(job)
if err := w.store.Tasks().Update(ctx, task); err != nil {
slog.Error(err.Error(), "Failed to update task status")
return
}
slog.Info("Successfully sync job status to task", "namespace", job.Namespace, "name", job.Name, "status", task.Status)
}(task)
}
wg.Wait()
}()
w.wg.Wait()
slog.Debug("Current sync period is complete")
}
Run 方法就是用來實現每個 watcher 對象的業務邏輯。比如這裏就實現了 task 任務的業務邏輯,它包含兩個功能,在 Run 方法的上半部分代碼中啓動了第一個 goroutine 用來實現將 Normal 狀態任務在 Kubernetes 中啓動,下半部分代碼中啓動了第二個 goroutine 用來實現同步已運行的任務在 Kubernetes 中的 Job 狀態到數據庫表中。
至此,nightwatch 項目就講解完成了。我們一起實現了一個架構簡潔且擴展性強的分佈式任務系統。關於 nightwatch 項目中更多的代碼細節你可以跳轉到我的 GitHub 倉庫中查看。
總結
本文帶大家從技術選型到架構設計再到代碼實現,一步步完成了一個簡潔優雅的分佈式任務系統。這套系統不僅架構簡潔,擴展也非常方便,我們只需要按照 task 的套路實現更多的異步任務,都可以非常方便的方式註冊到 nightwatch 中。
我們實現的 nightwatch 項目實際上是開源項目 OneX 項目中 onex-nightwatch 組件的 copy 實現,其架構完全參考 onex-nightwatch 實現。本文介紹的 nightwatch 項目只是 OneX 其中的一個組件,OneX 所有源代碼的質量都超級高,並且相當規範,全部都是來自一線大廠的實踐經驗。
OneX 不僅是一個開源項目,更是一整套 Go + Kubernetes + LLMOps + MLOps 企業級落地方案,這個項目出自孔令飛老師的「雲原生 AI 實戰星球」。
OneX 項目技術棧如下:
OneX 項目完全開源免費,如果你想學習各種最佳實踐,非常建議你深入學習這個項目。如果你覺得自己一個人學習容易放棄,或者遇到問題無法解決,想找人一起討論,也歡迎你加入到孔令飛老師的「雲原生 AI 實戰星球」。這裏有一起進步的小夥伴,星球是圍繞 OneX 打造的,裏面會詳細講解 OneX 中用到的各種技術棧,極大縮短自學 OneX 項目的時間。
星球剛剛上線,原價 ¥1599,現在早鳥價 ¥1299,交個朋友,可以加我微信領取折扣優惠券(限時限量),星球值不值你可以自己先研究下 OneX 項目一探虛實。
掃碼直達星球:
掃碼加我微信領取星球大額優惠券(限時限量):
本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。
希望此文能對你有所啓發。
延伸閲讀
- Onex 項目 GitHub 源碼:https://github.com/onexstack/onex
- onex-nightwatch 組件 GitHub 源碼:https://github.com/onexstack/onex/tree/master/internal/nightwatch
- 在 Go 中使用 cron 執行定時任務:https://mp.weixin.qq.com/s/bCgGnw9QYTTBuoEktTYZcw
- 在 Go 中如何使用分佈式鎖解決併發問題?:https://mp.weixin.qq.com/s/8HJXRcCoyZqeC_b6l-TD0w
- Go 程序如何實現優雅退出?來看看 K8s 是怎麼做的——上篇:https://mp.weixin.qq.com/s/UR6Rf1ewthI8qU3A7Szfzg
- Go 程序如何實現優雅退出?來看看 K8s 是怎麼做的——下篇:https://mp.weixin.qq.com/s/oztrz9WfCDnyZC6s4b__LQ
- 本文 GitHub 示例代碼:https://github.com/jianghushinian/blog-go-example/tree/main/nightwatch
聯繫我
- 公眾號:Go編程世界
- 微信:jianghushinian
- 郵箱:jianghushinian007@outlook.com
- 博客:https://jianghushinian.cn
- GitHub:https://github.com/jianghushinian