成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

用 Go 實現一個支持任務下發的后臺服務:Gin + Machinery v2 完整實戰

開發 后端
本文將帶你一步一步使用Go語言,結合Gin框架與Machinery v2實現一個完整的 “任務下發服務”,并支持通過REST API發起任務。

在日常開發當中,我們經常希望通過消息通信機制來異步執行任務,例如發郵件、生成報表、風控計算等。這種場景中,使用“任務隊列”框架來解耦主業務流程是一種最佳實踐。

本文將帶你一步一步使用Go語言,結合Gin框架與Machinery v2實現一個完整的 “任務下發服務”,并支持通過REST API發起任務。廢話不多說,開始今天的內容吧,let's Go!!!

核心目標

項目開始前我們先設定一個小目標,具體項如下所示:

  • 使用 Gin 提供一個 HTTP 接口,用于接收任務參數
  • 使用 Machinery v2 執行后臺任務(通過 Redis 通信)
  • 使用消息隊列解耦 API 層與實際執行邏輯

整體流程

用戶請求 -> Gin Server -> Redis Machinery -> Worker

用戶通過HTTP請求提交任務參數,Gin服務將任務發送到Machinery任務隊列中,后續由Worker異步消費任務并執行。

準備工作

(1) 安裝 Redis

本地環境可以直接通過 Docker 運行:

docker run -d --name redis -p 6379:6379 redis

(2) 初始化 Go 項目

go mod init machinery-gin
go get github.com/gin-gonic/gin
go get github.com/RichardKnop/machinery/v2

(3) 創建對應目錄和代碼文件

?  machinery-gin tree .
.
├── cmd
│   ├── api
│   │   └── main.go
│   └── worker
│       └── main.go
├── config
│   └── config.go
├── controller
│   └── task_controller.go
├── go.mod
├── go.sum
├── router
│   └── router.go
├── scheduler
│   └── manager.go
├── service
│   └── task_service.go
└── tasks
    ├── handler.go
    └── registry.go


10 directories, 11 files

核心模塊代碼實現

(1) config/config.go

package config


import "github.com/RichardKnop/machinery/v2/config"


func GetMachineryConfig() *config.Config {
	return &config.Config{
		Broker:        "redis://localhost:6379",
		DefaultQueue:  "machinery_tasks",
		ResultBackend: "redis://localhost:6379",
	}
}

(2) tasks/handler.go

package tasks


import (
	"fmt"
	"time"
)


func PrintMessage(msg string) error {
	fmt.Printf("?? Task Received: %s at %s\n", msg, time.Now().Format(time.RFC3339))
	return nil
}

(3) tasks/registry.go

package tasks


import "github.com/RichardKnop/machinery/v2"


func RegisterTasks(server *machinery.Server) error {
	return server.RegisterTasks(map[string]interface{}{
		"print_message": PrintMessage,
	})
}

(4) scheduler/manager.go

package scheduler


import (
	"sync"
	"time"


	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/tasks"
)


type ScheduledTask struct {
	Name     string
	Interval time.Duration
	Msg      string
	Paused   bool
	StopChan chan struct{}
}


var (
	tasksMap = make(map[string]*ScheduledTask)
	mu       sync.Mutex
)


func AddScheduledTask(server *machinery.Server, name, msg string, interval time.Duration) {
	mu.Lock()
	defer mu.Unlock()


	if _, exists := tasksMap[name]; exists {
		return
	}


	t := &ScheduledTask{
		Name:     name,
		Interval: interval,
		Msg:      msg,
		StopChan: make(chan struct{}),
	}
	tasksMap[name] = t


	go func(task *ScheduledTask) {
		ticker := time.NewTicker(task.Interval)
		defer ticker.Stop()


		for {
			select {
			case <-ticker.C:
				if !task.Paused {
					signature := &tasks.Signature{
						Name: "print_message",
						Args: []tasks.Arg{
							{Type: "string", Value: task.Msg},
						},
					}
					server.SendTask(signature)
				}
			case <-task.StopChan:
				return
			}
		}
	}(t)
}


func PauseTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		task.Paused = true
	}
}


func ResumeTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		task.Paused = false
	}
}


func StopTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		close(task.StopChan)
		delete(tasksMap, name)
	}
}

(5) service/task_service.go

package service


import (
	"time"


	"github.com/RichardKnop/machinery/v2"
	"machinery-gin/scheduler"
)


func ScheduleNewTask(server *machinery.Server, name, msg string, intervalSec int) {
	scheduler.AddScheduledTask(server, name, msg, time.Duration(intervalSec)*time.Second)
}


func PauseScheduledTask(name string) {
	scheduler.PauseTask(name)
}


func ResumeScheduledTask(name string) {
	scheduler.ResumeTask(name)
}


func StopScheduledTask(name string) {
	scheduler.StopTask(name)
}

(6) controller/task_controller.go

package controller


import (
	"net/http"


	"github.com/RichardKnop/machinery/v2"
	"github.com/gin-gonic/gin"
	"machinery-gin/service"
)


func TaskHandler(server *machinery.Server) gin.HandlerFunc {
	return func(c *gin.Context) {
		var req struct {
			Name     string `json:"name"`
			Interval int    `json:"interval"`
			Msg      string `json:"msg"`
		}


		if err := c.ShouldBindJSON(&req); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
			return
		}


		service.ScheduleNewTask(server, req.Name, req.Msg, req.Interval)
		c.JSON(http.StatusOK, gin.H{"status": "task scheduled"})
	}
}


func PauseHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.PauseScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "paused"})
	}
}


func ResumeHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.ResumeScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "resumed"})
	}
}


func StopHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.StopScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "stopped"})
	}
}

(7) router/router.go

package routes


import (
	"github.com/RichardKnop/machinery/v2"
	"github.com/gin-gonic/gin"
	"machinery-gin/controller"
)


func SetupRouter(server *machinery.Server) *gin.Engine {
	r := gin.Default()


	r.POST("/task/start", controller.TaskHandler(server))
	r.POST("/task/pause", controller.PauseHandler())
	r.POST("/task/resume", controller.ResumeHandler())
	r.POST("/task/stop", controller.StopHandler())


	return r
}

(8) cmd/api/main.go (啟動Gin API服務)

package main


import (
	server "github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"machinery-gin/config"
	"machinery-gin/router"
	"machinery-gin/tasks"
)


func main() {
	cfg := config.GetMachineryConfig()


	broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	machineryServer := server.NewServer(cfg, broker, backend, lock)


	_ = tasks.RegisterTasks(machineryServer)
	r := routes.SetupRouter(machineryServer)
	r.Run(":9311")
}

(9) cmd/worker/main.go (啟動Worker消費者)

package main


import (
	server "github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"machinery-gin/config"
	"machinery-gin/tasks"
)


func main() {
	cfg := config.GetMachineryConfig()
	broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	machineryServer := server.NewServer(cfg, broker, backend, lock)
	_ = tasks.RegisterTasks(machineryServer)


	worker := machineryServer.NewWorker("worker_name", 10)
	_ = worker.Launch()
}

測試程序

啟動 API 服務和 Worker:

go run cmd/api/main.go
go run cmd/worker/main.go

測試命令如下所示:

?  ~ curl -X POST http://localhost:9311/task/start -H 'Content-Type: application/json' -d '{
  "name": "hello-task",
  "interval": 5,
  "msg": "Hello from Machinery"
}'
{"status":"task scheduled"}%                                                       
 ?  ~ curl -X POST http://localhost:9311/task/pause\?name\=hello-task
{"status":"paused"}%
?  ~ curl -X POST http://localhost:9311/task/resume\?name\=hello-task
{"status":"resumed"}%                                                                 
?  ~ curl -X POST http://localhost:9311/task/stop\?name\=hello-task
{"status":"stopped"}%

測試結果如下所示:

總結

我們已經實現了任務的“下發與執行”,“暫停/恢復”后續可以進一步擴展:

  • 支持任務列表,任務詳情
  • 支持“周期定時任務(調度器)”
  • 支持任務執行狀態查詢/UI管理面板
責任編輯:趙寧寧 來源: 馬嘍編程筆記
相關推薦

2022-05-22 13:55:30

Go 語言

2024-01-08 08:36:29

HTTPGo代理服務器

2025-05-20 09:39:57

GogRPC微服務

2024-01-02 13:58:04

GoREST API語言

2024-05-10 08:47:22

標準庫v2Go

2024-03-15 15:20:10

并發服務IP

2025-03-06 08:54:24

泛型類型MapGo1

2010-08-05 17:00:04

RIP V2協議

2010-08-06 14:07:21

RIP V2

2014-04-14 15:54:00

print()Web服務器

2022-03-06 19:57:50

狀態機easyfsm項目

2023-05-10 08:05:41

GoWeb應用

2021-09-27 09:55:06

Chrome瀏覽器Manifest V2

2012-04-24 18:10:56

華為E5

2017-05-08 15:00:20

H5代碼服務器

2020-07-03 10:21:48

Go框架Docker

2021-08-23 15:14:09

Linuxat命令任務

2023-02-26 01:37:57

goORM代碼

2009-08-14 17:04:19

Windows后臺服務

2023-03-01 09:39:40

調度系統
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精品2 | 亚洲人成网亚洲欧洲无码 | 欧美看片 | 亚洲日日夜夜 | 亚洲激精日韩激精欧美精品 | 男女羞羞视频在线免费观看 | 国产高清精品一区 | 亚洲色综合 | 精品国产青草久久久久96 | 亚洲精品1区 | 国产免费一区二区 | 精品久久久久久国产 | 国产精品久久久久久久久久 | 日本高清aⅴ毛片免费 | 精品成人免费视频 | 久久中文视频 | 毛片区| 欧美日韩福利 | 欧美 日韩 亚洲91麻豆精品 | 亚洲欧美日韩在线 | 黄色欧美 | 久久av一区 | 国产美女免费视频 | 亚洲情综合五月天 | 久久国产视频播放 | 丁香五月网久久综合 | 三级黄片毛片 | 99热播放| 99久久婷婷国产精品综合 | 精品网| 国产一区二区三区高清 | 亚洲成人精品 | 九九伊人sl水蜜桃色推荐 | 一区二区三区精品视频 | 久久久久免费精品国产 | 色偷偷噜噜噜亚洲男人 | 午夜久久久久久久久久一区二区 | 国产999在线观看 | 人人人人干 | 天天草天天 | 久久大陆 |