Gin+robfig/cron/v3 實現(xiàn)任務(wù)調(diào)度系統(tǒng):定時、周期、立即執(zhí)行、重試與恢復全支持!
最近在開發(fā)自動化平臺、漏洞工單系統(tǒng)、監(jiān)控平臺等后臺服務(wù)時,我們經(jīng)常需要任務(wù)調(diào)度系統(tǒng),來定時執(zhí)行某些邏輯,比如周期性數(shù)據(jù)同步、定時告警、定時下發(fā)任務(wù)等。
本文將手把手教你如何使用Go語言的Gin 框架+robfig/cron/v3實現(xiàn)一個自定義任務(wù)調(diào)度系統(tǒng),支持以下核心功能:
- 任務(wù)定時執(zhí)行
- 周期執(zhí)行(Cron 表達式)
- 立即執(zhí)行任務(wù)
- 暫停/恢復任務(wù)
- 重試任務(wù)
技術(shù)棧
- Gin:HTTP 接口服務(wù)框架
- robfig/cron v3:任務(wù)調(diào)度庫,支持 Cron 表達式解析
- 原生 Goroutine 和上下文管理:控制任務(wù)生命周期
項目結(jié)構(gòu)
做一個簡單的demo項目,所以項目目錄結(jié)構(gòu)比較簡單,具體如下所示:
? robfig_cron_gin tree .
.
├── go.mod
├── go.sum
├── handler
│ └── task_handler.go
├── main.go
├── router
│ └── router.go
└── scheduler
├── scheduler.go
└── task.go
4 directories, 7 files
初始化項目,并添加依賴,具體如下所示:
go get github.com/robfig/cron/v3
go get github.com/gin-gonic/gin
原理簡析
(1) robfig/cron工作機制
cron/v3是Go最成熟的任務(wù)調(diào)度庫之一,支持標準Cron表達式(包括秒級),本質(zhì)上維護了一個調(diào)度器,通過 AddFunc() 添加任務(wù)后,每到設(shè)定的時間點自動執(zhí)行。
(2) 自定義調(diào)度器
我們封裝了一個 Scheduler:
- 內(nèi)部持有 *cron.Cron
- 每個任務(wù)用 map[string]EntryID 管理,支持增刪查
- 支持RunNow() 手動觸發(fā)
- 支持暫停(remove)和恢復(重新 add)
代碼實現(xiàn)
(1) 定義任務(wù)結(jié)構(gòu)體scheduler/task.go
package scheduler
import "context"
type Task struct {
ID string
Name string
Schedule string
JobFunc func(ctx context.Context)
Retry int
}
(2) 調(diào)度器核心邏輯scheduler/scheduler.go
package scheduler
import (
"context"
"sync"
"github.com/robfig/cron/v3"
)
type Scheduler struct {
c *cron.Cron
mu sync.Mutex
tasks map[string]cron.EntryID
funcs map[string]*Task
status map[string]string
}
func NewScheduler() *Scheduler {
return &Scheduler{
c: cron.New(cron.WithSeconds()),
tasks: make(map[string]cron.EntryID),
funcs: make(map[string]*Task),
status: make(map[string]string),
}
}
func (s *Scheduler) Start() {
s.c.Start()
}
func (s *Scheduler) Stop() {
s.c.Stop()
}
func (s *Scheduler) AddTask(t *Task) error {
s.mu.Lock()
defer s.mu.Unlock()
id, err := s.c.AddFunc(t.Schedule, func() {
ctx := context.Background()
t.JobFunc(ctx)
})
if err != nil {
return err
}
s.tasks[t.ID] = id
s.funcs[t.ID] = t
s.status[t.ID] = "running"
return nil
}
func (s *Scheduler) RunNow(taskID string) {
s.mu.Lock()
defer s.mu.Unlock()
if t, ok := s.funcs[taskID]; ok {
go t.JobFunc(context.Background())
}
}
func (s *Scheduler) PauseTask(taskID string) {
s.mu.Lock()
defer s.mu.Unlock()
if id, ok := s.tasks[taskID]; ok {
s.c.Remove(id)
s.status[taskID] = "paused"
}
}
func (s *Scheduler) ResumeTask(taskID string) error {
if t, ok := s.funcs[taskID]; ok {
return s.AddTask(t)
}
return nil
}
func (s *Scheduler) RetryTask(taskID string) {
s.RunNow(taskID) // 簡單實現(xiàn),等同于立即執(zhí)行
}
func (s *Scheduler) Status(taskID string) string {
s.mu.Lock()
defer s.mu.Unlock()
return s.status[taskID]
}
(3) 接口實現(xiàn)邏輯handler/task_handler.go
package handler
import (
"context"
"net/http"
"robfig_cron_gin/scheduler"
"github.com/gin-gonic/gin"
)
var sched *scheduler.Scheduler
func Init(s *scheduler.Scheduler) {
sched = s
}
func AddTask(c *gin.Context) {
var req struct {
ID string `json:"id"`
Name string `json:"name"`
Schedule string `json:"schedule"`
}
if err := c.BindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
task := &scheduler.Task{
ID: req.ID,
Name: req.Name,
Schedule: req.Schedule,
JobFunc: func(ctx context.Context) {
// 模擬任務(wù)執(zhí)行邏輯
println("Task executed:", req.ID)
},
}
err := sched.AddTask(task)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "task added"})
}
func RunNow(c *gin.Context) {
id := c.Param("id")
sched.RunNow(id)
c.JSON(http.StatusOK, gin.H{"message": "task executed now"})
}
func PauseTask(c *gin.Context) {
id := c.Param("id")
sched.PauseTask(id)
c.JSON(http.StatusOK, gin.H{"message": "task paused"})
}
func ResumeTask(c *gin.Context) {
id := c.Param("id")
err := sched.ResumeTask(id)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "task resumed"})
}
(4) Gin 路由配置router/router.go
package router
import (
"robfig_cron_gin/handler"
"github.com/gin-gonic/gin"
)
func SetupRouter() *gin.Engine {
r := gin.Default()
r.POST("/task", handler.AddTask)
r.POST("/task/:id/run", handler.RunNow)
r.POST("/task/:id/pause", handler.PauseTask)
r.POST("/task/:id/resume", handler.ResumeTask)
return r
}
(5) 服務(wù)入口main.go
package main
import (
"robfig_cron_gin/handler"
"robfig_cron_gin/router"
"robfig_cron_gin/scheduler"
)
func main() {
s := scheduler.NewScheduler()
s.Start()
defer s.Stop()
handler.Init(s)
r := router.SetupRouter()
r.Run(":9311")
}
運行并測試項目
運行程序命令如下所示:
go run .
測試命令如下所示:
? ~ curl -X POST http://localhost:9311/task \
-H 'Content-Type: application/json' \
-d '{"id":"task1", "name":"MyTask", "schedule":"*/10 * * * * *"}'
{"message":"task added"}%
? ~ curl -X POST http://localhost:9311/task/task1/pause
{"message":"task paused"}%
? ~ curl -X POST http://localhost:9311/task/task1/run
{"message":"task executed now"}%
? ~ curl -X POST http://localhost:9311/task/task1/pause
{"message":"task paused"}%
? ~ curl -X POST http://localhost:9311/task/task1/resume
{"message":"task resumed"}%
效果如下圖所示:
后續(xù)可擴展方向
- 支持任務(wù)持久化到數(shù)據(jù)庫(MySQL/Postgres)
- 支持失敗重試策略(使用backoff、retry)
- 支持任務(wù)執(zhí)行日志持久化 + WebSocket實時推送
- 支持多實例集群調(diào)度,分布式鎖控制任務(wù)唯一執(zhí)行
總結(jié)
通過Gin + robfig/cron,我們實現(xiàn)了一個輕量、功能靈活的任務(wù)調(diào)度系統(tǒng)。結(jié)構(gòu)清晰、接口豐富,非常適合內(nèi)嵌進后端服務(wù)系統(tǒng)中,如定時同步平臺、自動化任務(wù)管理系統(tǒng)、CI/CD執(zhí)行器等。