成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

基于Dispatcher模式的事件與數據分發處理器的Go語言實現

開發 前端
本文以Eosc(一個高性能中間件開發框架)中的代碼為例子,看看如何在我們的實際項目中,實現這樣的功能。

背景

在實際項目中,我們經常需要異步處理事件與數據。比如MVC模型中處理請求的Filter鏈,又如在nginx中或是linux的iptables中,都會有一個處理鏈條,來一步步的順序處理一個請求。此外基于集中存儲與分發的模式,實現事件與數據的異步處理,對于提升系統響應程度,實現業務處理的解耦至關重要。本文以eosc(一個高性能中間件開發框架)中的代碼為例子,看看如何在我們的實際項目中,實現這樣的功能

代碼

eosc提供了關于dispatcher的關鍵實現的兩個文件,分別是dispatch.go和data-dispatch.go,具體的代碼地址是https://github.com/eolinker/eosc/tree/main/common/dispatcher。

這兩個文件中實現的結構體與接口的關系如圖所示:

dispatcher關鍵接口與結構體的關系

dispatch.go文件

在dispatch.go文件中,esco提供了IEvent、CallBackHandler、IListener三個重要的接口。

同時通過CallBackFunc來實現接口CallBackHandler, tListener來實現IListener。

//2個接口
type CallBackHandler interface {
DataEvent(e IEvent) error
}

type IListener interface {
Leave()
Event() <-chan IEvent
}
/*CallBackFunc實現了CallBackHandler,同時CallBackFunc又是一個接受IEvent為參數,
返回error的函數*/
type CallBackFunc func(e IEvent) error

func (f CallBackFunc) DataEvent(e IEvent) error {
return f(e)
}
//實現了IListener接口
func (t *tListener) Leave() {
t.Once.Do(func() {
atomic.StoreUint32(&t.closed, 1)
close(t.c)
})
}

func (t *tListener) Event() <-chan IEvent {
return t.c
}

注意:tListener還提供了一個Handler方法,這個方法的參數與返回結果與CallBackFunc一樣,也就是說它實現的Handler方法是一種CallBackFunc,這個在后面的分發處理邏輯的注冊中會用到。

func (t *tListener) Handler(e IEvent) error {
if atomic.LoadUint32(&t.closed) == 0 {
t.c <- e
return nil
}
return ErrorIsClosed
}

data-dispatch.go文件

該文件提供了兩種dispatcher創建方法,分別是NewDataDispatchCenter、NewEventDispatchCenter。這兩個方法都是創建了DataDispatchCenter結構體(這個結構體后面會講到),但是啟動的處理協程不同,NewDataDispatchCenter啟動的是doDataLoop,NewEventDispatchCenter啟動的是doEventLoop。

//兩種DispatchCenter創建方法
func NewDataDispatchCenter() IDispatchCenter {
ctx, cancelFunc := context.WithCancel(context.Background())
center := &DataDispatchCenter{
ctx: ctx,
cancelFunc: cancelFunc,
addChannel: make(chan *_CallbackBox, 10),
eventChannel: make(chan IEvent),
}
go center.doDataLoop()
return center
}

func NewEventDispatchCenter() IDispatchCenter {
ctx, cancelFunc := context.WithCancel(context.Background())
center := &DataDispatchCenter{
ctx: ctx,
cancelFunc: cancelFunc,
addChannel: make(chan *_CallbackBox, 10),
eventChannel: make(chan IEvent),
}
go center.doEventLoop()
return center
}

//DataDispatchCenter 數據廣播中心
type DataDispatchCenter struct {
addChannel chan *_CallbackBox
eventChannel chan IEvent

ctx context.Context
cancelFunc context.CancelFunc
}

DataDispatchCenter這個結構體中有兩個chan,一個是addChannel,一個是eventChannel。

addChannel

接受_CallbackBox,這個_CallbackBox提供了邏輯處理Handler

eventChannel

接受IEvent,觸發

doEventLoop邏輯:

NewEventDispatchCenter方法中啟動的doEventLoop,邏輯相對簡單,創建的channels用于存儲addChannel發送過來的_CallbackBox,即事件處理Handler.當eventChannel收到事件后,遍歷channels中的每一個_CallbackBox,并調用相應的Handler處理。

doEventLoop狀態圖

具體代碼可以查看https://github.com/eolinker/eosc/blob/main/common/dispatcher/data-dispatch.go#L48。

doDataLoop邏輯:

NewDataDispatchCenter方法中啟動的doDataLoop,這個邏輯稍微復雜點。其實它的大致流程和doEventLoop,不同的是每個新增加的_CallbackBox,需要對當前接收并緩存的所有Event鍵值對進行處理。而doEventLoop是不會的,新增加的_CallbackBox,只會對在它之后接收的Event生效。下面的代碼InitEvent(data.GET())很有意思。

  1. 首先InitEvent實現了IEvent接口,是一種IEvent。
  2. type InitEvent map[string]map[string][]byte (代碼鏈接:https://github.com/eolinker/eosc/blob/main/common/dispatcher/data.go#L88)InitEvent是一個map,可以通過InitEvent(data.GET())初始化。
func (d *DataDispatchCenter) doDataLoop() {
data := NewMyData(nil)
channels := make([]*_CallbackBox, 0, 10)
isInit := false
for {
select {
case event, ok := <-d.eventChannel:
if ok {
isInit = true
data.DoEvent(event)
next := channels[:0]
for _, c := range channels {
if err := c.handler(event); err != nil {
close(c.closeChan)
continue
}
next = append(next, c)
}
channels = next
}
case hbox, ok := <-d.addChannel:
{
if ok {
if !isInit {
channels = append(channels, hbox)
} else {
if err := hbox.handler(InitEvent(data.GET())); err == nil {
channels = append(channels, hbox)
}
}
}
}
}

}
}

應用

創建EventServer。

type EventServer struct {
IDispatchCenter
}
func NewEventServer() *EventServer {
es := &EventServer{
IDispatchCenter: NewDataDispatchCenter(),
}
return es
}

定義事件。

type MyEvent struct {
namespace string
key string
event string
data []byte
}

func (m *MyEvent) Namespace() string {
return m.namespace
}

func (m *MyEvent) Event() string {
return m.event
}

func (m *MyEvent) Key() string {
return m.key
}

func (m *MyEvent) Data() []byte {
return m.data
}

定義Handler并注冊。

func Handler(e IEvent) error {
//根據自己的業務要求
}
es.Register(Handler)

發送事件。

es.Send(&MyEvent{
namespace: "a",
key: "b",
event: "set",
data: []byte(fmt.Sprint(index)),
})
責任編輯:姜華 來源: 今日頭條
相關推薦

2023-04-03 08:39:33

中間件go語言

2024-06-06 09:47:56

2023-03-27 00:20:48

2024-06-11 00:05:00

CasaOS云存儲管理

2022-05-19 14:14:26

go語言限流算法

2024-08-09 10:59:01

KubernetesSidecar模式

2024-03-19 10:45:31

HTTPSGo語言

2022-11-01 18:29:25

Go語言排序算法

2020-08-12 08:56:30

代碼凱撒密碼函數

2023-05-08 07:55:05

快速排序Go 語言

2024-08-29 13:23:04

WindowsGo語言

2021-07-26 09:47:38

Go語言C++

2024-07-01 08:27:05

KeyAndroid按鍵事件

2017-03-14 13:51:23

AndroidView事件分發和處理

2024-07-30 08:12:04

Java消息go

2022-07-19 12:25:29

Go

2012-03-13 10:40:58

Google Go

2014-12-26 09:52:08

Go

2024-08-26 14:32:43

2021-03-01 21:59:25

編程語言GoCX
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 一区二区av | 日韩成人在线观看 | 超碰一区二区 | 久久久久久久久久久久久9999 | 一区二区三区免费 | 久久久久久高清 | 国产福利资源在线 | 黄色成人国产 | 成年人黄色免费视频 | 成人a免费 | 国产色| 亚洲精品天堂 | 成人午夜电影在线观看 | 紧缚调教一区二区三区视频 | 久久精品久久久久久 | 亚洲三区在线观看 | 成年人在线观看视频 | 亚洲视频免费在线 | 国产激情视频在线免费观看 | va精品 | 久久久久免费精品国产小说色大师 | 一区免费观看 | 欧洲精品视频一区 | 久久成人18免费网站 | 国产69久久精品成人看动漫 | 久久久久成人精品 | 欧美三区视频 | 精品久久久一区 | 久久精品亚洲精品国产欧美 | 国产精品精品视频一区二区三区 | 啪一啪在线视频 | 亚洲欧美中文日韩在线v日本 | 日本三级电影免费观看 | 99免费在线 | 亚洲人人 | 精品亚洲国产成av人片传媒 | 久久久久久国产 | 天堂一区在线观看 | 精品国产精品国产偷麻豆 | 久草网址 | 色99视频|