成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

如何基于 Go 語(yǔ)言設(shè)計(jì)一個(gè)簡(jiǎn)潔優(yōu)雅的分布式任務(wù)系統(tǒng)

開(kāi)發(fā) 后端
本文帶大家從技術(shù)選型到架構(gòu)設(shè)計(jì)再到代碼實(shí)現(xiàn),一步步完成了一個(gè)簡(jiǎn)潔優(yōu)雅的分布式任務(wù)系統(tǒng)。

在當(dāng)今云計(jì)算與微服務(wù)盛行的時(shí)代,分布式任務(wù)系統(tǒng)已成為支撐大規(guī)模業(yè)務(wù)的核心基礎(chǔ)設(shè)施。今天就來(lái)為大家分享下如何基于 Go 語(yǔ)言從零設(shè)計(jì)和實(shí)現(xiàn)一個(gè)架構(gòu)簡(jiǎn)潔且擴(kuò)展性強(qiáng)的分布式任務(wù)系統(tǒng)。

一、前置概念

本文會(huì)設(shè)計(jì)并實(shí)現(xiàn)一個(gè)分布式任務(wù)系統(tǒng),這里我們要先明確兩個(gè)概念。

  • 分布式:在我們將要實(shí)現(xiàn)的分布式任務(wù)系統(tǒng)中,分布式是指我們的服務(wù)可以部署多個(gè)副本,這樣才能確保服務(wù)更加穩(wěn)定。
  • 任務(wù):這里的任務(wù)是指異步任務(wù),可能是定時(shí)或需要周期性運(yùn)行的任務(wù)。

有了這兩個(gè)前置概念,我們?cè)賮?lái)分析下在 Go 中如何實(shí)現(xiàn)分布式和如何處理異步任務(wù)。

二、異步任務(wù)

在 Go 中,要處理異步任務(wù)有多種方式,比如原生支持的 time.Sleep、time.Timer 或 time.Ticke,再比如一些第三方包 go-co-op/gocron/v2、robfig/cron/v3 或 bamzi/jobrunner 等。本項(xiàng)目在調(diào)研過(guò)后決定采用 robfig/cron/v3 包(以下簡(jiǎn)稱(chēng) cron)來(lái)處理異步任務(wù),原因如下:

  • cron 是一個(gè)非常流行的包,支持標(biāo)準(zhǔn) crontab 表達(dá)式(并且可精確到秒),支持時(shí)區(qū)、任務(wù)鏈等高級(jí)功能。
  • 提供秒級(jí)精度的任務(wù)調(diào)度。
  • 輕量級(jí),且能輕松應(yīng)對(duì)各種復(fù)雜的定時(shí)任務(wù)場(chǎng)景。

對(duì)于 cron 包的使用,可以參考我的另一篇文章「在 Go 中使用 cron 執(zhí)行定時(shí)任務(wù)」,里面有詳細(xì)說(shuō)明。

三、分布式

既然我們的任務(wù)系統(tǒng)是分布式的,那么必然要考慮并發(fā)安全問(wèn)題。當(dāng)多個(gè)副本同時(shí)讀寫(xiě)系統(tǒng)資源時(shí),很容易產(chǎn)生競(jìng)態(tài)問(wèn)題。在分布式場(chǎng)景中,解決競(jìng)態(tài)問(wèn)題最常用的手段當(dāng)然是分布式鎖。

Go 中的分布式鎖解決方案也很多,常見(jiàn)的有基于 etcd、Redis、ZooKeeper 等中間件來(lái)實(shí)現(xiàn)的,因?yàn)?Redis 在系統(tǒng)中更加常用,所以本項(xiàng)目采用基于 Redis 實(shí)現(xiàn)分布式鎖的解決方案。Go 中有兩個(gè)比較常用的第三方包 bsm/redislock 和 go-redsync/redsync 都是基于 Redis 的分布式鎖實(shí)現(xiàn)。本項(xiàng)目在調(diào)研過(guò)后決定采用 go-redsync/redsync 包(以下簡(jiǎn)稱(chēng) redsync),原因如下:

  • redsync 遵循 Redis 官方推薦的 Redlock 算法,支持多節(jié)點(diǎn),容忍部分節(jié)點(diǎn)故障,避免單點(diǎn)問(wèn)題。
  • 通過(guò)多數(shù)派機(jī)制確保鎖的全局唯一性,降低鎖沖突風(fēng)險(xiǎn)。
  • redsync 是 Redis 官方 唯一推薦的 Go Redis 分布式鎖解決方案,由 Redis 社區(qū)背書(shū),長(zhǎng)期維護(hù),可靠性高。

對(duì)于 redsync 包的使用,可以參考我的另一篇文章「在 Go 中如何使用分布式鎖解決并發(fā)問(wèn)題?」,里面有詳細(xì)說(shuō)明。

四、分布式任務(wù)系統(tǒng)

現(xiàn)在我們對(duì)分布式任務(wù)系統(tǒng)中的分布式和任務(wù)都有了明確的認(rèn)識(shí),并且找到了解決方案。那么接下來(lái)就可以設(shè)計(jì)并實(shí)現(xiàn)分布式任務(wù)系統(tǒng)了。

1. 功能介紹

我們要實(shí)現(xiàn)的分布式任務(wù)系統(tǒng)叫 nightwatch,nightwatch 是守夜、值班的意思,那么這套系統(tǒng)的功能也就一目了然了,就是用來(lái) 24 小時(shí)不停機(jī)的執(zhí)行異步任務(wù)的。

nightwatch 要實(shí)現(xiàn)的主要功能如下:

現(xiàn)在我們有一個(gè)系統(tǒng),用戶(hù)可以在 Web 頁(yè)面通過(guò)表單提交一個(gè)“任務(wù)”到關(guān)系型數(shù)據(jù)庫(kù)的任務(wù)表中。然后 nightwatch 系統(tǒng)會(huì)定時(shí)的掃描任務(wù)表,取出待執(zhí)行的任務(wù),并根據(jù)任務(wù)中的配置,到 Kubernetes 中拉起 Job 資源對(duì)象,真正的執(zhí)行任務(wù)。此外,nightwatch 還會(huì)取出已經(jīng)開(kāi)始執(zhí)行的任務(wù),然后去 Kubernetes 中獲取當(dāng)前任務(wù)對(duì)應(yīng)的 Job 實(shí)時(shí)狀態(tài),并回寫(xiě)到數(shù)據(jù)庫(kù)中。直到 Kubernetes 中的 Job 執(zhí)行完成(或失?。琻ightwatch 會(huì)標(biāo)記 Job 在數(shù)據(jù)庫(kù)表中的任務(wù)狀態(tài)為完成(或失?。?。當(dāng)任務(wù)狀態(tài)為完成(或失敗),則任務(wù)任務(wù)終止,nightwatch 不再掃描出這種狀態(tài)的數(shù)據(jù)。

系統(tǒng)整體架構(gòu)如下:

nightwatch-component

nightwatch 是系統(tǒng)中一個(gè)非常核心的組件,用來(lái)控制任務(wù)的執(zhí)行,并同步任務(wù)狀態(tài)。

2. 架構(gòu)設(shè)計(jì)

現(xiàn)在我們知道了 nightwatch 的作用,那么就可以設(shè)計(jì)其實(shí)現(xiàn)架構(gòu)了。

nightwatch 架構(gòu)設(shè)計(jì)如下:

nightwatch-architecture

首先,我們需要思考一個(gè)問(wèn)題,分布式鎖應(yīng)該在何時(shí)使用?

在分布式任務(wù)系統(tǒng)中,我們有兩種方式使用分布式鎖來(lái)保證并發(fā)安全。一種是在執(zhí)行具體的定時(shí)任務(wù)時(shí),多個(gè)副本之間進(jìn)行競(jìng)爭(zhēng),誰(shuí)搶到鎖,誰(shuí)就可以執(zhí)行任務(wù),未搶到鎖的副本可以選擇性的跳過(guò)此次執(zhí)行周期。另一種是在 nigthwatch 啟動(dòng)時(shí),就開(kāi)始搶鎖,多個(gè)副本之間誰(shuí)搶到鎖,誰(shuí)就去執(zhí)行任務(wù)調(diào)度,未搶到鎖的副本則進(jìn)行周期性的嘗試搶鎖操作,如果當(dāng)前執(zhí)行任務(wù)調(diào)度的副本被終止,那么其他副本就有機(jī)會(huì)搶到鎖,并執(zhí)行任務(wù)調(diào)度。

這兩種方式個(gè)各自有不同的使用場(chǎng)景,第一種方式的優(yōu)勢(shì)是能夠?qū)崿F(xiàn)多副本之間的負(fù)載均衡,多個(gè)副本都在工作,都有可能搶到鎖并執(zhí)行任務(wù),不過(guò)這種方式不能?chē)?yán)格控制執(zhí)行任務(wù)的間隔時(shí)間,比較適合對(duì)間隔時(shí)間要求不嚴(yán)格的任務(wù)。第二種方式實(shí)際上只有一個(gè)副本在執(zhí)行任務(wù)調(diào)度,其他副本是空載狀態(tài),是主備設(shè)計(jì),這種方式的好處是能夠嚴(yán)格控制任務(wù)執(zhí)行的間隔時(shí)間。

nigthwatch 采用第二種方式來(lái)使用分布式鎖保證并發(fā)安全。所以在 nigthwatch 的架構(gòu)設(shè)計(jì)中,在啟動(dòng) nigthwatch 時(shí),先將所有的定時(shí)任務(wù)注冊(cè)到任務(wù)調(diào)度器中,接著就會(huì)進(jìn)行搶鎖操作,只有搶到鎖的副本才能夠執(zhí)行任務(wù)調(diào)度。未搶到鎖,則使用一個(gè)循環(huán)周期性的嘗試搶鎖,直到搶鎖成功。對(duì)于搶到鎖的副本,當(dāng)注冊(cè)的任務(wù)定時(shí)策略達(dá)到時(shí),任務(wù)調(diào)度器就會(huì)執(zhí)行任務(wù)。架構(gòu)圖中的 task 就是我們要實(shí)現(xiàn)的異步任務(wù),也是主要業(yè)務(wù)邏輯,task 組件會(huì)從數(shù)據(jù)庫(kù)表中讀取任務(wù),然后在 Kubernetes 中啟動(dòng) Job,并同步數(shù)據(jù)庫(kù)和 Kubernetes 資源之間的狀態(tài)。

3. 目錄結(jié)構(gòu)

我們現(xiàn)在已經(jīng)設(shè)計(jì)好了 nigthwatch 的架構(gòu),可以動(dòng)手進(jìn)行開(kāi)發(fā)實(shí)現(xiàn)了。

以下是 nigthwatch 項(xiàng)目的目錄和文件:

$ tree nightwatch
nightwatch # 項(xiàng)目目錄
├── README.md # README 文件
├── assets # 項(xiàng)目相關(guān)的資源目錄
│   ├── docker-compose.yaml # 用于啟動(dòng)項(xiàng)目依賴(lài)的 MariaDB 和 Redis
│   └── schema.sql # 測(cè)試數(shù)據(jù) SQL
├── cmd # 項(xiàng)目啟動(dòng)入口
│   └── main.go
├── go.mod
├── go.sum
├── internal # 項(xiàng)目?jī)?nèi)部包
│   ├── logger.go # 定制日志
│   ├── nightwatch.go # nightwatch 的實(shí)現(xiàn)和啟動(dòng)入口
│   └── watcher # 任務(wù)接口和實(shí)現(xiàn)
│       ├── all # 任務(wù)注冊(cè)入口
│       │   └── all.go
│       ├── config.go # 任務(wù)配置
│       ├── task # 任務(wù)實(shí)現(xiàn),一個(gè)可以定時(shí)同步 MariaDB 和 Kubernetes 任務(wù)狀態(tài)的示例程序
│       │   ├── task.go
│       │   └── watcher.go
│       └── watcher.go # 任務(wù)接口
└── pkg # 項(xiàng)目公共包
    ├── db # 數(shù)據(jù)庫(kù)實(shí)例
    │   ├── mysql.go
    │   └── redis.go
    ├── meta # 元信息
    │   └── where.go # MariaDB where 查詢(xún)條件元信息封裝
    ├── model # 任務(wù)模型
    │   └── task.go
    ├── store # 數(shù)據(jù)庫(kù)操作接口
    │   ├── helper.go
    │   ├── store.go
    │   └── task.go
    └── util # 工具包
        └── reflect
            └── reflect.go

14 directories, 21 files

這里主要的目錄和文件我都標(biāo)明了其用途,不必完全記住,你先有個(gè)印象,大概知道整個(gè)項(xiàng)目的結(jié)構(gòu)。

4. 調(diào)用鏈路

為了便于你理解代碼,我畫(huà)了一張 nigthwatch 項(xiàng)目的調(diào)用鏈路圖:

nightwatch-flow

這個(gè)調(diào)用鏈路圖指明了 nigthwatch 項(xiàng)目中所有目錄之間的代碼調(diào)用關(guān)系。根據(jù)這張圖,可以看出這是一個(gè)非常簡(jiǎn)潔的架構(gòu)。cmd 中的入口函數(shù)main會(huì)調(diào)用internal中的nigthwatch包,nigthwatch是分布式系統(tǒng)實(shí)現(xiàn)的關(guān)鍵所在,這里實(shí)現(xiàn)了任務(wù)的注冊(cè)和調(diào)度,watcher定義了任務(wù)的接口,task就是任務(wù)的具體實(shí)現(xiàn),task的業(yè)務(wù)邏輯中會(huì)依賴(lài)store層來(lái)讀寫(xiě)數(shù)據(jù)庫(kù),所以store會(huì)依賴(lài)model和db。

5. 代碼實(shí)現(xiàn)

接下來(lái)就進(jìn)入到真正的編碼階段了。

首先我們需要為 nigthwatch 項(xiàng)目的業(yè)務(wù)設(shè)計(jì)一張任務(wù)表,建表 SQL 語(yǔ)句如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/assets/schema.sql

CREATE TABLE IF NOTEXISTS `task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(45) NOT NULLDEFAULT'' COMMENT '任務(wù)名稱(chēng)',
  `namespace` varchar(45) NOT NULLDEFAULT'' COMMENT 'k8s namespace 名稱(chēng)',
  `info` TEXT NOT NULL COMMENT '任務(wù) k8s 相關(guān)信息',
  `status` varchar(45) NOT NULLDEFAULT'' COMMENT '任務(wù)狀態(tài)',
  `user_id` bigint(20) NOT NULLDEFAULT'0' COMMENT '用戶(hù) ID',
  `created_at` datetime NOT NULLDEFAULTCURRENT_TIMESTAMP,
  `updated_at` datetime NOT NULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_name_namespace` (`name`, `namespace`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任務(wù)表';

為了簡(jiǎn)化你理解項(xiàng)目的成本,這里僅定義了最小需要字段。

同時(shí)我們可以插入兩條測(cè)試數(shù)據(jù),用來(lái)后續(xù)項(xiàng)目功能的驗(yàn)證:

INSERT INTO `task` (`id`, `name`, `namespace`, `info`, `status`, `user_id`) VALUES (1, 'demo-task-1', 'default', '{"image":"alpine","command":["sleep"],"args":["60"]}', 'Normal', 1);
INSERT INTO `task` (`id`, `name`, `namespace`, `info`, `status`, `user_id`) VALUES (2, 'demo-task-2', 'demo', '{"image":"busybox","command":["sleep"],"args":["3600"]}', 'Normal', 2);

拿 ID為1的task數(shù)據(jù)舉例,任務(wù)名是demo-task-1,namespace是default,鏡像是alpine,執(zhí)行命令是sleep,命令參數(shù)是60,狀態(tài)為Normal表示待執(zhí)行。當(dāng) nigthwatch 服務(wù)掃描到這條數(shù)據(jù)時(shí),就會(huì)在 Kubernetes 中default這個(gè)namespace下創(chuàng)建一個(gè)name為demo-task-1的 Job,其啟動(dòng)鏡像為alpine,啟動(dòng)命令為sleep 60,即睡眠60 秒然后退出。

現(xiàn)在有了數(shù)據(jù)庫(kù)表和測(cè)試數(shù)據(jù),我們來(lái)看看 nigthwatch 代碼是如何實(shí)現(xiàn)的。

入口文件 cmd/main.go 實(shí)現(xiàn)如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/cmd/main.go

package main

import (
    "flag"
    "log/slog"
    "path/filepath"
    "time"

    genericapiserver "k8s.io/apiserver/pkg/server"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"

    "github.com/jianghushinian/blog-go-example/nightwatch/internal"
    "github.com/jianghushinian/blog-go-example/nightwatch/pkg/db"
)

func main() {
    slog.SetLogLoggerLevel(slog.LevelDebug)

    var kubecfg *string
    if home := homedir.HomeDir(); home != "" {
        kubecfg = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "Optional absolute path to kubeconfig")
    } else {
        kubecfg = flag.String("kubeconfig", "", "Absolute path to kubeconfig")
    }

    config, err := clientcmd.BuildConfigFromFlags("", *kubecfg)
    if err != nil {
        slog.Error(err.Error())
        return
    }
    config.QPS = 50
    config.Burst = 100
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        slog.Error(err.Error())
        return
    }

    cfg := nightwatch.Config{
        MySQLOptions: &db.MySQLOptions{
            Host:                  "127.0.0.1:33306",
            Username:              "root",
            Password:              "nightwatch",
            Database:              "nightwatch",
            MaxIdleConnections:    100,
            MaxOpenConnections:    100,
            MaxConnectionLifeTime: time.Duration(10) * time.Second,
        },
        RedisOptions: &db.RedisOptions{
            Addr:         "127.0.0.1:36379",
            Username:     "",
            Password:     "nightwatch",
            Database:     0,
            MaxRetries:   3,
            MinIdleConns: 0,
            DialTimeout:  5 * time.Second,
            ReadTimeout:  3 * time.Second,
            WriteTimeout: 3 * time.Second,
            PoolSize:     10,
        },
        Clientset: clientset,
    }

    nw, err := cfg.New()
    if err != nil {
        slog.Error(err.Error())
        return
    }

    stopCh := genericapiserver.SetupSignalHandler()
    nw.Run(stopCh)
}

因?yàn)?nbsp;main.go 非常重要,是整個(gè)程序的入口,所以我就把完整代碼都貼出來(lái)了,包括 import 部分,這是為了讓你對(duì)項(xiàng)目文件之間的依賴(lài)關(guān)系有一個(gè)更清晰的認(rèn)知,后續(xù)講解的其他模塊我就只會(huì)貼出核心代碼。

main 函數(shù)的核心功能如下:

首先會(huì)初始化各種依賴(lài)包,初始化 Kubernetes clientset 用于后續(xù)操作 Job,初始化 MySQL 用于從中讀取任務(wù)和更新任務(wù)狀態(tài),初始化 Redis 用于實(shí)現(xiàn)分布式鎖。接著會(huì)使用這些初始化的對(duì)象創(chuàng)建一個(gè)配置對(duì)象 nightwatch.Config。然后使用 cfg.New() 創(chuàng)建一個(gè) nightwatch 實(shí)例對(duì)象 nw。最后調(diào)用 nw.Run(stopCh) 啟動(dòng)服務(wù)。這里為了做優(yōu)雅退出,還引用了 Kubernetes genericapiserver 優(yōu)雅退出機(jī)制。

這里涉及到的 Kubernetes clientset、MySQL 和 Redis 相關(guān)的具體配置細(xì)節(jié)我就不詳細(xì)講解了,咱們還是將主要精力聚焦在 nigthwatch 的主脈絡(luò)上。

接下來(lái)看下 cfg.New() 代碼實(shí)現(xiàn)如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

// New 通過(guò)配置構(gòu)造一個(gè) nightWatch 對(duì)象
func (c *Config) New() (*nightWatch, error) {
    rdb, err := db.NewRedis(c.RedisOptions)
    if err != nil {
        slog.Error(err.Error(), "Failed to create Redis client")
        returnnil, err
    }

    logger := newCronLogger()
    runner := cron.New(
        cron.WithSeconds(),
        cron.WithLogger(logger),
        cron.WithChain(cron.SkipIfStillRunning(logger), cron.Recover(logger)),
    )

    pool := goredis.NewPool(rdb)
    lockOpts := []redsync.Option{
        redsync.WithRetryDelay(50 * time.Microsecond),
        redsync.WithTries(3),
        redsync.WithExpiry(defaultExpiration),
    }
    locker := redsync.New(pool).NewMutex(lockName, lockOpts...)

    cfg, err := c.CreateWatcherConfig()
    if err != nil {
        returnnil, err
    }

    nw := &nightWatch{runner: runner, locker: locker, config: cfg}
    if err := nw.addWatchers(); err != nil {
        returnnil, err
    }

    return nw, nil
}

*Config.New 方法會(huì)通過(guò)配置信息構(gòu)造一個(gè) nightWatch 對(duì)象并返回。這里的 runner 就是異步任務(wù)調(diào)度器,使用 cron 包實(shí)現(xiàn),用來(lái)調(diào)度和執(zhí)行定時(shí)任務(wù)。并且這個(gè)方法內(nèi)部還實(shí)例化了一個(gè) redsync 分布式鎖對(duì)象 locker。nightWatch 對(duì)象就是通過(guò) runner、locker 和 cfg 來(lái)構(gòu)造的。

這里的核心部分是 addWatchers 的邏輯,其實(shí)現(xiàn)如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

// 注冊(cè)所有 Watcher 實(shí)例到 nightWatch
func (nw *nightWatch) addWatchers() error {
    for n, w := range watcher.ListWatchers() {
        if err := w.Init(context.Background(), nw.config); err != nil {
            slog.Error(err.Error(), "Failed to construct watcher", "watcher", n)
            return err
        }

        spec := watcher.Every3Seconds
        if obj, ok := w.(watcher.ISpec); ok {
            spec = obj.Spec()
        }

        if _, err := nw.runner.AddJob(spec, w); err != nil {
            slog.Error(err.Error(), "Failed to add job to the cron", "watcher", n)
            return err
        }
    }

    returnnil
}

*nightWatch.addWatchers 方法用來(lái)注冊(cè)所有 Watcher 對(duì)象到調(diào)度器 runner 中。

Watcher 是一個(gè)接口,定義了異步任務(wù)應(yīng)該實(shí)現(xiàn)的方法。Watcher 接口定義如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/watcher.go

type Watcher interface {
    Init(ctx context.Context, config *Config) error
    cron.Job
}

type ISpec interface {
    Spec() string
}

var (
    registryLock = new(sync.Mutex)
    registry     = make(map[string]Watcher)
)

func Register(watcher Watcher) {
    registryLock.Lock()
    defer registryLock.Unlock()

    name := reflectutil.StructName(watcher)
    if _, ok := registry[name]; ok {
        panic("duplicate watcher entry: " + name)
    }

    registry[name] = watcher
}

func ListWatchers()map[string]Watcher {
    registryLock.Lock()
    defer registryLock.Unlock()

    return registry
}

可以看到,要實(shí)現(xiàn)一個(gè)異步任務(wù),需要實(shí)現(xiàn) Init 方法以及 cron.Job 接口。cron.Job 接口其實(shí)只有一個(gè)方法定義如下:

type Job interface {
    Run()
}

只要滿(mǎn)足 Watcher 接口的任務(wù),就可以通過(guò) Register 函數(shù)注冊(cè)到 registry 中。ListWatchers 函數(shù)則可以返回注冊(cè)到 registry 中全部任務(wù)。而 ListWatchers 函數(shù)正是在前文講解的 *nightWatch.addWatchers 方法中調(diào)用的。

到目前為止,任務(wù)如何被注冊(cè)到 nightWatch.runner 的過(guò)程我們就串起來(lái)了。接下來(lái)需要關(guān)注的兩個(gè)點(diǎn)是,調(diào)度器 runner 是何時(shí)啟動(dòng)的,以及是何時(shí)調(diào)用 Register 函數(shù)注冊(cè)任務(wù)的。

我們先來(lái)看調(diào)度器 runner 是何時(shí)啟動(dòng)的:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

// Run 執(zhí)行異步任務(wù),此方法會(huì)阻塞直到關(guān)閉 stopCh
func (nw *nightWatch) Run(stopCh <-chanstruct{}) {
    ctx := wait.ContextForChannel(stopCh)

    // 循環(huán)加鎖,直到加鎖成功,再去啟動(dòng)任務(wù)
    ticker := time.NewTicker(defaultExpiration + (5 * time.Second))
    defer ticker.Stop()
    for {
        err := nw.locker.LockContext(ctx)
        if err == nil {
            slog.Debug("Successfully acquired lock", "lockName", lockName)
            break
        }
        slog.Debug("Failed to acquire lock", "lockName", lockName, "err", err)
        <-ticker.C
    }

    // 看門(mén)狗,實(shí)現(xiàn)鎖自動(dòng)續(xù)約
    ticker = time.NewTicker(extendExpiration)
    defer ticker.Stop()
    gofunc() {
        for {
            select {
            case <-ticker.C:
                if ok, err := nw.locker.ExtendContext(ctx); !ok || err != nil {
                    slog.Debug("Failed to extend lock", "err", err, "status", ok)
                }
            case <-ctx.Done():
                slog.Debug("Exiting lock watchdog")
                return
            }
        }
    }()

    // 啟動(dòng)定時(shí)任務(wù)
    nw.runner.Start()
    slog.Info("Successfully started nightwatch server")

    // 阻塞等待退出信號(hào)
    <-stopCh

    nw.stop()
}

在 *nightWatch.Run 方法中,首先會(huì)啟動(dòng)一個(gè)無(wú)限循環(huán),定時(shí)執(zhí)行嘗試搶鎖操作,直到搶鎖成功。這與前文中講解的 nightwatch 架構(gòu)設(shè)計(jì)是一致的。搶到鎖后,就可以執(zhí)行 nw.runner.Start() 啟動(dòng)調(diào)度器,執(zhí)行定時(shí)任務(wù)了。

此外,在 nightwatch 架構(gòu)圖中沒(méi)有體現(xiàn)的一點(diǎn)是,這里為分布式鎖實(shí)現(xiàn)了看門(mén)狗機(jī)制,用來(lái)自動(dòng)續(xù)約。關(guān)于 redsync 分布式鎖的自動(dòng)續(xù)約,在我的文章「在 Go 中如何使用分布式鎖解決并發(fā)問(wèn)題?」中有詳細(xì)講解。

而這個(gè) Run 方法,就是在 main 函數(shù)中通過(guò) nw.Run(stopCh) 調(diào)用的。

我們還剩下一個(gè)最后要看的核心邏輯是 task 在何時(shí)會(huì)調(diào)用 Register 注冊(cè)到 registry 變量中。

還記得前文中講解的 Watcher 接口么,*taskWatcher 實(shí)現(xiàn)了這個(gè)接口:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/task/watcher.go

var _ watcher.Watcher = (*taskWatcher)(nil)

type taskWatcher struct {
    store     store.IStore
    clientset kubernetes.Interface

    wg sync.WaitGroup
}

func (w *taskWatcher) Init(ctx context.Context, config *watcher.Config) error {
    w.store = config.Store
    w.clientset = config.Clientset
    returnnil
}

func (w *taskWatcher) Spec() string {
    return"@every 30s"
}

func init() {
    watcher.Register(&taskWatcher{})
}

taskWatcher 就是 task 任務(wù)的具體對(duì)象,它實(shí)現(xiàn)了 watcher.Watcher 接口??梢园l(fā)現(xiàn),Register 函數(shù)是在 init 函數(shù)中調(diào)用的,即 task 包被導(dǎo)入時(shí)實(shí)現(xiàn)自動(dòng)注冊(cè)。

task 包會(huì)在 nightwatch/internal/watcher/all/all.go 文件被導(dǎo)入:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/all/all.go

package all

import (
    // 觸發(fā)所有 Watcher 的 init 函數(shù)進(jìn)行注冊(cè)
    _ "github.com/jianghushinian/blog-go-example/nightwatch/internal/watcher/task"
)

這里以匿名包的方式導(dǎo)入 task 包。如果我們還有其他的任務(wù)實(shí)現(xiàn),則同樣可以參考 task 包的注冊(cè)方式,在這里以匿名包形式導(dǎo)入,這也是 all 包名的由來(lái),可以注冊(cè)全部的任務(wù)。

然后會(huì)在 nightwatch 中再次以匿名包的方式導(dǎo)入 all 包:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

package nightwatch

import (
    ...
    // 觸發(fā) init 函數(shù)
    _ "github.com/jianghushinian/blog-go-example/nightwatch/internal/watcher/all"
)

我們可以總結(jié)出任務(wù)的注冊(cè)流程是,nightwatch 包導(dǎo)入 all 包,all 包會(huì)導(dǎo)入 task 包,task 包的 init 函數(shù)執(zhí)行就會(huì)完成注冊(cè)。所以在入口文件 main.go 導(dǎo)入 nightwatch 包的時(shí)候,就會(huì)觸發(fā)任務(wù)的注冊(cè)。在調(diào)用 nw.Run(stopCh) 啟動(dòng)服務(wù)時(shí),所有的任務(wù)已經(jīng)注冊(cè)完成了。

taskWatcher 對(duì)象的核心邏輯當(dāng)然就是 Run 方法了:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/task/watcher.go

// Run 運(yùn)行 task watcher 任務(wù)
func (w *taskWatcher) Run() {
    w.wg.Add(2)

    slog.Debug("Current sync period is start")

    // NOTE: 將 Normal 狀態(tài)任務(wù)在 Kubernetes 中啟動(dòng)
    gofunc() {
        defer w.wg.Done()
        ctx := context.Background()

        _, tasks, err := w.store.Tasks().List(ctx, meta.WithFilter(map[string]any{
            "status": model.TaskStatusNormal,
        }))
        if err != nil {
            slog.Error(err.Error(), "Failed to list tasks")
            return
        }

        var wg sync.WaitGroup
        wg.Add(len(tasks))
        for _, task := range tasks {
            gofunc(task *model.Task) {
                defer wg.Done()
                job, err := w.clientset.BatchV1().Jobs(task.Namespace).Create(ctx, toJob(task), metav1.CreateOptions{})
                if err != nil {
                    slog.Error(err.Error(), "Failed to create job")
                    return
                }

                task.Status = model.TaskStatusPending
                if err := w.store.Tasks().Update(ctx, task); err != nil {
                    slog.Error(err.Error(), "Failed to update task status")
                    return
                }
                slog.Info("Successfully created job", "namespace", job.Namespace, "name", job.Name)
            }(task)
        }
        wg.Wait()
    }()

    // NOTE: 同步中間狀態(tài)的任務(wù)在 Kubernetes 中的狀態(tài)到表中
    gofunc() {
        defer w.wg.Done()
        ctx := context.Background()

        _, tasks, err := w.store.Tasks().List(ctx, meta.WithFilterNot(map[string]any{
            // 排除這幾個(gè)狀態(tài)
            "status": []model.TaskStatus{model.TaskStatusNormal, model.TaskStatusSucceeded, model.TaskStatusFailed},
        }))
        if err != nil {
            slog.Error(err.Error(), "Failed to list tasks")
            return
        }

        var wg sync.WaitGroup
        wg.Add(len(tasks))
        for _, task := range tasks {
            gofunc(task *model.Task) {
                defer wg.Done()
                job, err := w.clientset.BatchV1().Jobs(task.Namespace).Get(ctx, task.Name, metav1.GetOptions{})
                if err != nil {
                    slog.Error(err.Error(), "Failed to get task")
                    return
                }

                task.Status = toTaskStatus(job)
                if err := w.store.Tasks().Update(ctx, task); err != nil {
                    slog.Error(err.Error(), "Failed to update task status")
                    return
                }
                slog.Info("Successfully sync job status to task", "namespace", job.Namespace, "name", job.Name, "status", task.Status)
            }(task)
        }
        wg.Wait()
    }()

    w.wg.Wait()
    slog.Debug("Current sync period is complete")
}

Run 方法就是用來(lái)實(shí)現(xiàn)每個(gè) watcher 對(duì)象的業(yè)務(wù)邏輯。比如這里就實(shí)現(xiàn)了 task 任務(wù)的業(yè)務(wù)邏輯,它包含兩個(gè)功能,在 Run 方法的上半部分代碼中啟動(dòng)了第一個(gè) goroutine 用來(lái)實(shí)現(xiàn)將 Normal 狀態(tài)任務(wù)在 Kubernetes 中啟動(dòng),下半部分代碼中啟動(dòng)了第二個(gè) goroutine 用來(lái)實(shí)現(xiàn)同步已運(yùn)行的任務(wù)在 Kubernetes 中的 Job 狀態(tài)到數(shù)據(jù)庫(kù)表中。

至此,nightwatch 項(xiàng)目就講解完成了。我們一起實(shí)現(xiàn)了一個(gè)架構(gòu)簡(jiǎn)潔且擴(kuò)展性強(qiáng)的分布式任務(wù)系統(tǒng)。關(guān)于 nightwatch 項(xiàng)目中更多的代碼細(xì)節(jié)你可以跳轉(zhuǎn)到我的 GitHub 倉(cāng)庫(kù)中查看。

五、總結(jié)

本文帶大家從技術(shù)選型到架構(gòu)設(shè)計(jì)再到代碼實(shí)現(xiàn),一步步完成了一個(gè)簡(jiǎn)潔優(yōu)雅的分布式任務(wù)系統(tǒng)。這套系統(tǒng)不僅架構(gòu)簡(jiǎn)潔,擴(kuò)展也非常方便,我們只需要按照 task 的套路實(shí)現(xiàn)更多的異步任務(wù),都可以非常方便的方式注冊(cè)到 nightwatch 中。

責(zé)任編輯:趙寧寧 來(lái)源: 令飛編程
相關(guān)推薦

2016-09-30 10:13:07

分布式爬蟲(chóng)系統(tǒng)

2024-09-23 04:00:00

java架構(gòu)分布式系統(tǒng)

2021-11-01 12:25:56

Redis分布式

2023-03-06 08:14:48

MySQLRedis場(chǎng)景

2018-09-06 22:49:31

分布式架構(gòu)服務(wù)器

2022-08-01 08:01:04

ID發(fā)號(hào)器系統(tǒng)

2024-08-07 08:15:47

2019-07-19 15:51:11

框架選型分布式

2013-09-11 16:02:00

Spark分布式計(jì)算系統(tǒng)

2023-09-04 08:45:07

分布式配置中心Zookeeper

2019-01-28 11:46:53

架構(gòu)運(yùn)維技術(shù)

2023-10-08 10:49:16

搜索系統(tǒng)分布式系統(tǒng)

2018-05-09 09:44:51

Java分布式系統(tǒng)

2022-11-06 19:28:02

分布式鎖etcd云原生

2019-12-27 16:00:56

分布式事務(wù)框架Java

2024-05-07 09:00:41

Go語(yǔ)言令牌桶

2024-07-15 08:25:07

2024-10-29 14:32:45

Golang分布式系統(tǒng)

2022-12-29 08:32:50

xxl-job緩存Schedule

2022-06-27 08:36:27

分布式事務(wù)XA規(guī)范
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 日韩免费福利视频 | h视频在线免费 | 国产精品日产欧美久久久久 | 欧美日韩一 | 亚洲一二三区不卡 | 国产精品99久久久久久动医院 | 日韩成人av在线 | 久草.com| 日本久久www成人免 成人久久久久 | 午夜精品久久久久久久久久久久久 | 色香蕉在线 | 欧美三级久久久 | 九九久久这里只有精品 | av网站在线看 | 日韩视频中文字幕 | julia中文字幕久久一区二区 | 国产高清在线 | 日韩欧美国产成人一区二区 | 成人av激情 | 99国产精品99久久久久久 | 一区二区在线观看免费视频 | 在线视频 亚洲 | 亚洲午夜视频 | 成人综合在线视频 | 日韩在线高清 | 国产午夜精品一区二区三区四区 | 国产丝袜一区二区三区免费视频 | 国产在线观看不卡一区二区三区 | 国产一区二区久久久 | 91av在线免费看 | 九九热在线观看视频 | www国产亚洲精品久久网站 | 麻豆国产精品777777在线 | 懂色中文一区二区三区在线视频 | 美女午夜影院 | 国产精品爱久久久久久久 | 夜夜摸夜夜操 | 看一级黄色毛片 | 欧美精品一区二区免费 | 欧美性生活免费 | 一级毛片视频在线 |