成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

自己動手用Go語言寫三種實用隊列

開發 前端
今天給大家演示下三種隊列的使用方法與相應的使用場景,大家在工作中可以直接Copy這些代碼,加速自己項目的開發。

背景

我們在使用kubernetes的客戶端k8s.io/client-go 進行開發的時候,比如寫個CRD的operator, 經常會用到隊列這種數據結構。并且很多時候,我們在做服務器端后臺開發的時候,需要用到任務隊列,進行任務的異步處理與任務管理。k8s.io/client-go中的workqueue包里面提供了三種常用的隊列。今天給大家演示下三種隊列的使用方法與相應的使用場景,大家在工作中可以直接copy這些代碼,加速自己項目的開發。這三個隊列的關系如下圖所示:

k8s隊列關系

隊列

type (基礎隊列)

下面給出了數據結構,其中dirty,processing兩個集合分別存儲的是需要處理的任務和正在處理的任務,queue[]t按序存放的是所有添加的任務。這三個屬性的關系很有意思,dirty用于快速判斷queue中是否存在相應的任務,這樣有以下兩個用處:

1. 在Add的時候,可以防止重復添加。(代碼查看:?https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L120?)。

2.由于在任務完成后要調用Done方法,把任務從processing集合中刪除掉,那么如果在完成前(即調用Done方法之前),把任務再次添加進dirty集合,那么在完成調用Done方法的時候,會再次把任務重新添加進queue隊列,進行處理(代碼查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L180)。

而processing集合存放的是當前正在執行的任務,它的作用有以下幾點。

1.在Add的時候,如果任務正在處理,就直接返回。這樣在任務調用Done的時候,由于dirty集合中有,會把這個任務再次放在隊列的尾部。(代碼查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L120)。

2.用于判斷隊列中是否還有任務正在執行,這樣在shutdown的時候,可以有的放矢。(代碼查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L221)。

整個queue隊列工作模式是,你的工作線程通過Get方法從隊列中獲取任務(如果隊列長度為0,需要q.cond.Wait()),然后處理任務(你自己的業務邏輯),處理完后調用Done方法,表明任務完成了,同時調用q.cond.Signal(),喚醒等待的工作線程?。

type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t

// dirty defines all of the items that need to be processed.
dirty set

// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set

cond *sync.Cond

shuttingDown bool
drain bool

metrics queueMetrics

unfinishedWorkUpdatePeriod time.Duration
clock clock.WithTicker
}

delaying_queue(延遲隊列)

這個延遲隊列繼承了上面的基礎隊列,同時提供了addAfter函數,實現根據延遲時間把元素增加進延遲隊列。其中的waitForPriorityQueue實現了一個用于waitFor元素的優先級隊列,其實就是一個最小堆。

func (q *delayingType) AddAfter(item interface{}, duration time.Duration)這個函數(代碼https://github.com/kubernetes/client-go/blob/master/util/workqueue/delaying_queue.go#L162)。

當duration為0,就直接通過q.add放到它繼承的基礎執行隊列里面,如果有延遲值,就放在q.waitingForAddCh通道里面,等待readyAt時機成熟,再放到隊列中。那這個通道里面的元素當readyAt后,如何加入到基礎執行隊列?下面的截圖給出了答案,便是啟動的ret.waitingLoop協程。這個方法的具體代碼(https://github.com/kubernetes/client-go/blob/master/util/workqueue/delaying_queue.go#L189),具體思路就是利用了上面的waitForPriorityQueue最小堆,還有等待加入隊列通道q.waitingForAddCh,大家可以看看給出的具體代碼,大致的思想就會了解。

創建延遲隊列

// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface

// clock tracks time for delayed firing
clock clock.Clock

// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once

// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker

// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor

// metrics counts the number of retries
metrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
type waitFor struct {
data t
readyAt time.Time
// index in the priority queue (heap)
index int
}
type waitForPriorityQueue []*waitFor

元素添加邏輯

下面是測試代碼,大家可以看看如何創建延遲隊列,還有添加任務。

下面的代碼,在延遲隊列里面增加了一個字符串"foo",延遲執行的時間是50毫秒。然后差不多50毫秒后,延遲隊列長度為0
fakeClock := testingclock.NewFakeClock(time.Now())
q := NewDelayingQueueWithCustomClock(fakeClock, "")

first := "foo"

q.AddAfter(first, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}

if q.Len() != 0 {
t.Errorf("should not have added")
}

fakeClock.Step(60 * time.Millisecond)

if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ := q.Get()
q.Done(item)

// step past the next heartbeat
fakeClock.Step(10 * time.Second)

err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
if q.Len() > 0 {
return false, fmt.Errorf("added to queue")
}

return false, nil
})
if err != wait.ErrWaitTimeout {
t.Errorf("expected timeout, got: %v", err)
}

if q.Len() != 0 {
t.Errorf("should not have added")
}

func waitForAdded(q DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
return true, nil
}

return false, nil
})
}

func waitForWaitingQueueToFill(q DelayingInterface) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if len(q.(*delayingType).waitingForAddCh) == 0 {
return true, nil
}

return false, nil
})
}

rate_limiting_queue(限速隊列)?

限速隊列是利用延遲隊列的延遲特性,延遲某個元素的插入FIFO隊列的時間,達到限速的目的

workqueue包下面的rateLimiter有多種,下面的代碼顯示的是
ItemExponentialFailureRateLimiter(排隊指數算法)。

type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int

baseDelay time.Duration
maxDelay time.Duration
}

它有個基礎延遲時間,加入到延遲隊列后,被執行的延遲時間的計算公式是如下所示。另外它還有個最大延遲時間的參數。

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

exp := r.failures[item]
r.failures[item] = r.failures[item] + 1

// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}

calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}

return calculated
}
下面的測試代碼,顯示的是創建了一個1毫秒基礎延遲,最大1秒的延遲隊列。它在延遲隊列中增加了
一個"one"字符串,由于是第一次添加,所以基于上面的公式它的延遲時間是1毫秒,再次增加"one"
后,它的延遲時間是2*1毫秒,即2毫秒,對于增加的字符串"two"也是一樣,當我們調用forget
方法后ItemExponentialFailureRateLimiter中的計數器會重置,再次增加"one"字符串后,
它的延遲時間又變成了1毫秒

limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
fakeClock := testingclock.NewFakeClock(time.Now())
delayingQueue := &delayingType{
Interface: New(),
clock: fakeClock,
heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),
}
queue.DelayingInterface = delayingQueue

queue.AddRateLimited("one")
waitEntry := <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("one")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, queue.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}

queue.AddRateLimited("two")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("two")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}

queue.Forget("one")
if e, a := 0, queue.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("one")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}

此外這個包下面還有ItemFastSlowRateLimiter,BucketRateLimiter等。具體的大家可以查看default_rate_limiters.go(代碼:https://github.com/kubernetes/client-go/blob/master/util/workqueue/default_rate_limiters.go)。

應用場景

延遲隊列場景:

1、訂單延遲支付關閉

常見的打車軟件都會有匹配司機,這個可以用延遲隊列來實現;處理已提交訂單超過30分鐘未付款失效的訂單,延遲隊列可以很好的解決;又或者注冊了超過30天的用戶,發短信撩動等。

2、定時任務調度

比如使用DelayQueue保存當天將會執行的任務和執行時間,或是需要設置一個倒計時,倒計時結束后更新數據庫中某個表狀態

限速隊列場景:

比如限制數據隊列的寫入速度。

責任編輯:姜華 來源: 今日頭條
相關推薦

2020-03-31 20:23:46

C語言TCP服務器

2020-08-12 08:51:19

Go語言Concurrency后臺

2023-08-15 08:01:07

Go 語言排序

2009-10-28 09:25:18

VB.NET List

2010-09-01 12:50:04

CSS清除浮動

2023-03-07 10:32:34

Go語言結構體

2024-04-01 08:53:50

分庫分表分片算法

2025-06-06 00:00:00

JavaHTTPMCP

2019-04-16 11:21:50

Linux動態庫軟連接

2016-09-30 01:10:12

R語言聚類方法

2022-05-31 16:00:46

Go 編程語言復制文件Go 標準庫

2009-07-17 09:47:41

Linux RootLinux使用Linux開發

2023-10-10 13:28:44

Pythonpygame

2011-01-18 15:35:59

jQueryJavaScriptweb

2024-09-11 14:57:00

Redis消費線程模型

2010-09-24 19:18:22

SQL索引

2021-01-12 08:43:29

Redis ListStreams

2024-12-03 00:40:55

2020-10-28 10:10:03

Java單鏈表數據結構

2022-06-20 08:50:16

TypeScript類型語法
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 毛片日韩| 日韩精品一区二区三区四区 | 在线视频 欧美日韩 | 久久久精彩视频 | 久久久久黄色 | 日本不卡免费新一二三区 | 欧美日韩视频网站 | 一区二区三区四区在线视频 | 成人av网站在线观看 | 国产精品免费一区二区三区 | 日韩精品免费在线观看 | 蜜臀av日日欢夜夜爽一区 | 激情在线视频网站 | 91在线区 | 亚洲精品国产第一综合99久久 | 国产一区精品在线 | 三级成人片 | av在线播放网址 | 久久精品天堂 | 亚洲欧美高清 | 精品国产乱码久久久久久图片 | 欧美亚洲国产一区 | 久久久不卡网国产精品一区 | 亚洲精品久久久一区二区三区 | 国产成人精品午夜 | 日韩有码一区 | 日韩午夜| 欧美激情一区二区三区 | 成人高潮片免费视频欧美 | 国产精品永久久久久久久www | 国产不卡在线播放 | 国产激情免费视频 | 久久婷婷国产麻豆91 | 国产91久久久久蜜臀青青天草二 | 欧美激情一区二区三区 | 亚洲免费婷婷 | 91在线视频免费观看 | 一区二区三区视频在线观看 | 亚洲欧美成人影院 | 色播视频在线观看 | 久久er99热精品一区二区 |