K8s Informer 是如何保證事件不丟失的?
1、資源 Controller 主要作用
我們知道 k8s 里重要概念之一就是 聲明式 API,比如 kubectl apply 就是聲明式 API的實現。
效果就是資源對象的運行狀態要與我們聲明的一致。比如kubectl apply 一個 deployment 的 yml,他要求的狀態就是: 該 deployment 成功運行。
那么問題來了,k8s 是如何 "監視" 資源對象,以確保其始終保持我們聲明的狀態的呢?答案就是 -- Controller。除了組件中的 kube-controller-manager,我們可以編寫自己的 Controller,也叫自定義控制器(為了方便下文統稱為自定義 Controller)。
接下來,我們就來剖析一下 Controller 背后的"秘密"
2、流程大覽
我們先看看社區給出的 Controller 的架構圖:
其中有幾個主要對象(結構體) -- Reflector、Informer、Indexer。Reflector 和 Indexer 我們會在之后的文章中會一一講解 。
本文主要是講解一下 Informer。
從圖中可以看到主要有9個步驟,這里我將9個步驟合并成3個大步驟:
(畫的有點丑-__- !!!)
大步驟1: Reflector 將資源對象的事件添加進 Delta FIFO queue 中。
這里先提前介紹一下 Delta FIFO queue。所謂 Delta 就是變化的意思,什么的變化呢?就是資源對象的變化。
即 資源對象的變化都會被添加到 Delta FIFO queue 中!這樣是不是就很好理解了。
大步驟2: Informer 將 Delta FIFO queue 中的對象數據 添加到本地 cache 中。
補充一下這個本地 cache 緩存的就是監聽資源對象的最新版。就是緩存的當前集群里面的資源信息。
大步驟3: 使用 workqueue 處理業務邏輯。
3、步驟分析
咱們結合社區給的編寫的 自定義Controller用例 來做源碼分析。這里使用的版本是 client-go v0.20.5。
用例中用到的是普通 informer,介紹的也是普通 informer。但很更多用的是sharedInformer,比如 manager、SharedInformerFactory 都是對普通 informer 的一個再封裝,本質的東西是一樣的。感興趣的話,后面再出介紹 sharedInformer、manager 的文章。
大步驟1
我們看到架構圖中間有一個分界線,將流程分割為上下兩半, 而上半部主要包括大步驟 1、2。
這兩個步驟其實是連在一起的,其入口代碼就是這一行 : informer.Run(),可以先不管這。
我們先看用例中Informer的初始化入口代碼。
NewIndexerInformer 的代碼如下:
再真正的Informer初始化,就是 newInformer :
注意第381行 就是 Delta FIFO 的初始化,架構圖中的 Delta FIFO queue 就是在這實例化的。
我們發現 newInformer 返回的 是一個 low-level Controller 接口。這個接口抽象的很簡單,就三個方法:
Run(stopCh <-chan struct{}):
運行邏輯。
HasSynced() bool :
數據同步完成與否
LastSyncResourceVersion() string:
資源最近一次的ResourceVersion
接下來我們看看三個方法是如何在 controller 中看到這實現的。
咱們直接跳轉到 419 行里面的代碼,low-level Controller 的初始化, 可以很方便就看到了 Run 方法的實現:
大部分代碼是 Reflector 的初始化。
第152行 啟動了一個協程,*r.Run* 就是 Reflector 的執行邏輯:List & Watch 資源對象,然后 Add object to Delta FIFO queue 。
咱們點擊跳轉,直接跳到 ListAndWatch 方法中, 雖然這兒的代碼又多又亂(忍不住吐槽),但它要的做的事很簡單,就四件事。這里我們就把重點代碼拷貝出來說。
第一件事
用你初始化好的 cache.ListWatch 對象 的ListFunc拉取資源對象,然后將對象同步到 Delta FIFO queue:
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
......
......
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
// 拉取資源列表
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
.....
resourceVersion = listMetaInterface.GetResourceVersion()
.....
items, err := meta.ExtractList(list) // 轉換成對象
......
if err := r.syncWith(items, resourceVersion); err != nil { // 將拉取到資源對象都添加到 Delta FIFO queue
return fmt.Errorf("unable to sync list result: %v", err)
}
......
r.setLastSyncResourceVersion(resourceVersion) // 設置最近一次的版本
......
}
這里再簡單說明一下,r.syncWith(items, resourceVersion) 主要是通過 Delta FIFO queue 中的 Replace() 來同步資源。 其中有一個關鍵的邏輯如下:
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
f.populated =true 就是確定資源對象進入隊列的動作已經發生;f.initialPopulationCount 就是確定已經有多少對象在隊列中了。
然后我們看 informer HasSynced() 的底層邏輯:
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.populated && f.initialPopulationCount == 0
}
而 f.initialPopulationCount-- 發生在下文的 pop 中。
LastSyncResourceVersion() string 返回的版本,就是r.setLastSyncResourceVersion(resourceVersion) 設置的。
第二件事
再次同步資源。
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
用例代碼 中 cache.NewIndexerInformer() 會設置一個 resyncPeriod 參數就是在這起作用。
設置的是 0,所以這個協程會永遠阻塞在 case<-resyncCh。
這的詳細邏輯會放在之后講 Delta FIFO queue 的時候再講,簡單理解就是將 indexer 緩存的數據用同步到 Delta FIFO queue 中。
第三件事
用你初始化好的 cache.ListWatch 對象的 WatchFunc watch 對象。
這里的 watch 功能是底層就是 etcd 的 watch 特性功能,感興趣的同學可以自己了解一下,這里就不展開說明了。
w, err := r.listerWatcher.Watch(options)
if err != nil {
if utilnet.IsConnectionRefused(err) {
<-r.initConnBackoffManager.Backoff().C()
continue
}
return err
}
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
......
......
}
第四件事
將watch到的對象,加入到Delta FIFO queue中。
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
....
....
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
....
....
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
}
......
r.setLastSyncResourceVersion(newResourceVersion) // 設置最近一次的版本
......
......
}
再簡單歸納一下,就兩件事:
- 一開始,拉取資源列表然后加入到Delta FIFO queue
- watch 資源對象的變化,加入到Delta FIFO queue
大步驟2
Indexerer 其實算是一個內存數據庫的抽象接口。其中Store當然就代表的存儲,其他的就是索引相關的。
// client-go/tools/cache/store.go
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
}
cache 就是接口的實現,就是一個緩存。索引肯定是用作搜索的,其使用咱們下文在 作死的優化 那一節可以看到。
然后我們退回看 Run 方法截圖的第154 行代碼,看看第二大步驟的邏輯。
wait.Until 就是一個定時器,簡化成下面的代碼:
func Util(stopCh <-chan struct{}) {
dur := 1 * time.Second
timer := time.NewTimer(dur)
defer timer.Stop()
for {
select {
case <-stopCh:
return
case <-t.C():
f()
timer.Reset(dur)
}
}
}
執行的邏輯就是 c.processLoop:
其實代碼很容易理解,就是將隊列 (Delta FIFO queue)的item 彈出,然后調用處理函數執行ResourceEventHandler中的方法。
先看跳轉到 Pop 代碼:
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
......
......
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount-- // 同步數據減1
}
......
item, ok := f.items[id]
......
err := process(item)
......
}
它的內容其實比較簡單,這里只羅列出了最主要的邏輯,相信大伙兒能看明白。而且也看到了上文提到確定同步的關鍵邏輯 f.initialPopulationCount--
也就是說只有 Delta FIFO queue 中的所有數據都同步到了 Indexer 中,informer 的數據同步才算完成。
然后咱們再來看 process ,就是 newInformer截圖 圖中我們第393行的 Process ,展開的方法:
Process: func(obj interface{}) error {
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
}
我們看 Process 匿名函數的行參,obj 就是 Pop 出的對象。根據 Delta類型d.Type 來判斷對對象的處理方式。clientState 就是 Indexer,h 就是 ResourceEventHandler。
所以 Pop出來的對象,馬上就進入了 Indexer中,然后再調用 ResourceEventHandler 對應的方法,這里我們就是將 object 的 key 加入到 workqueue 中。
它各種方法的對應的操作就是 這段代碼。
大步驟3
最后就是我們自己的應用程序,來處理各種資源事件(Add、Update、Delete)。由于Workqueue的存在,就簡化成處理隊列里面的元素。
我們直接可以看這個processNextItem 函數。
第55行,獲取隊列里面的數據。
第65行,就是我們處理對象的業務邏輯。syncToStdout 只是打印一些日志, 但其中 obj, exists, err := c.indexer.GetByKey(key) 這行代碼很關鍵,就是從 indexer 中獲取資源對象。有了它我們就能處理各種業務邏輯,比如我自己工作一般就是將與ResourceEventHandler定義的變化(AddFunc、UpdateFunc、DeleteFunc 你可以只有AddFunc)的對象寫回我們自己的云平臺。
類似代碼如下(syncToStdout 換成了 action):
func (d *Deployment) action(key string) error {
obj, exists, err := d.indexer.GetByKey(key)
if err != nil {
return fmt.Errorf("fetching object with key %s from store failed with %w", key, err)
}
ns, deploymentName, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
if exists {
deployment, ok := obj.(*apps_v1.Deployment) // 一定要斷言資源類型,這里類型要同 list & watch 方法中的一致。github的例子是pod,這里用的是deployment
if !ok {
return fmt.Errorf("type asset fault")
}
post(deployment) // 將資源傳回的偽代碼
}
return nil
}
到這,3大步驟就結束了。
4、補充一個知識:
第三大步驟主要就是對 workqueue 的調用。而 workqueue 有三大類:
- 普通隊列
- 延遲隊列
- 限速隊列
延遲隊列是對普通隊列的封裝。而限速隊列是對延遲對列的封裝,外加一個限速器。
我們一般使用限速隊列,方便我們在處理錯誤的時候重試。
處理完后,還要記得從隊列中移除正在處理的 key
defer c.queue.Done(key)
重試與移除在 用例代碼 中寫的非常清楚,一定不要漏掉這兩塊重要的邏輯。
作死的“優化”
我們可能會發現 workqueue 有點多余。我們完全可以直接在ResourceEventHandler中處理業務邏輯嘛!代碼如下:
func NewPodWithOutWorkQueue(ctx context.Context, clientset *kubernetes.Clientset) {
//workQueue := workqueue.NewDelayingQueue()
namespace := meta_v1.NamespaceAll
listWatcher := &cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime2.Object, error) {
//options.LabelSelector = requireLabel.String()
return clientset.CoreV1().Pods(namespace).List(ctx, options)
},
WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
//options.LabelSelector = requireLabel.String()
return clientset.CoreV1().Pods(namespace).Watch(ctx, options)
},
}
indexer, informer := cache.NewIndexerInformer(listWatcher, &core_v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
fmt.Println("add: ", key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
fmt.Println("delete: ", key)
}
},
}, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
go informer.Run(ctx.Done())
go GetIndexer(indexer)
}
func GetIndexer(idx cache.Indexer) {
for {
time.Sleep( 3 * time.Second)
fmt.Println("GetIndexers:", idx.ListIndexFuncValues(cache.NamespaceIndex))
}
}
這里我們還借機查看了Indexer里面的信息。
其中 GetIndexer 就會打印以 namespace 聚合數據??梢院唵卫斫獬上旅娴?sql 語句select namespace from xx_table。
為什么說是作死呢?我們有些小伙伴就是這樣寫的,以為不依賴一個組件就是“優化”,但卻沒有思考過為什么官方用例、 manager 中都會用到 workqueue。
所以這就要引申一個問題 為什么要用 workqueue ?原因如下:
- 在不依賴 Delta FIFO queue 的情況下,將資源事件變得有序。
- workqueue 也可以當作緩存看。將要處理的事件以 key 的方式先緩存在 workqueue 中。
緩存的作用相信很多人都清楚:解決兩個組件處理速度不匹配的問題,如 cpu 和 硬盤之間經常是用 內存做緩存。
我們的業務處理邏輯大概率肯定是慢于事件的生成的,而且還延遲隊列類型做選擇
方便失敗后重試。
加個煎蛋
這可以算個番外系列,不感興趣的朋友可以直接跳過。
有些同學其實已經發現,我們完全不可以不用那么多隊列的(Delta FIFO queue,Workqueue),甚至還用了個小數據庫(Indexer)!
我們可不可以直接Watch對象?即相當于直接調用 etcd 的 watch API。答案是可以的。
我們借鑒一下這里的代碼。
實現一個pod的watch, 代碼如下:
func NewPodOnlyWithWatch(ctx context.Context, clientset *kubernetes.Clientset) {
onlyWatch := &cache.ListWatch{
WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
//options.LabelSelector = requireLabel.String()
//options.ResourceVersion = ""
return clientset.CoreV1().Pods("devops").Watch(ctx, meta_v1.ListOptions{})
},
}
watcher, err := watch2.NewRetryWatcher("1", onlyWatch)
if err != nil {
panic(err)
}
// Give the watcher a chance to get to sending events (blocking)
time.Sleep(10 * time.Millisecond)
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
fmt.Println("ResultChan closed")
return
}
//fmt.Println("get event")
if pod, ok := event.Object.(*core_v1.Pod); ok {
switch event.Type {
case watch.Added:
fmt.Printf("新增事件:%s/%s\n", pod.Namespace, pod.Name)
case watch.Deleted:
fmt.Printf("刪除事件:%s/%s\n", pod.Namespace, pod.Name)
case watch.Modified:
fmt.Printf("更新事件:%s/%s\n", pod.Namespace, pod.Name)
default:
fmt.Printf("%s事件:%s\n", event.Type, pod.Name)
}
}
case <-watcher.Done():
fmt.Println("watcher down")
return
}
}
}
但不建議直接watcher。其中之一就是:從業務視角會看到的重復性事件。即資源對象的一個更新動作,收到多個事件。
5、總結
我們常說的Controller 他最核心的能力就是能監控到資源的任何變化,也就是 聲明式 概念中保證狀態的關鍵技術 -- Informer,流程是:
- Reflector 將對象加入到Delta FIFO queue中。
- 然后 informer 將其 pop 出,加入到 Indexer中,以及 resourceEventHandler。
- 最后就是我們自己的業務邏輯, 即:我們自己先到workqueue中,拿到 key,然后用 key 去Indexer 中換取對象,最后處理對象。
然后我們又通過 一個錯誤的*優化* 的例子,講清楚了 workqueue 的重要性。
我們還可以再 geek 一點,選擇直接watch對象變化的事件,但個人不建議這樣做。
這一篇文章主要是介紹了 資源事件通過 informer 扭轉到 ResourceEventHandler 中的大體流程,并沒有講很多細節的部分。
因為我們還需要掌握一些關鍵的組件:Delta FIFO queue、Indexer、workqueue
當這些都清楚了后,再來了解流程的細節,那就非常輕松了。
當然了除了知道了上面的內容,我們還應該掌握 sharedInformer 以及寫 Controller 的“神器” -- controller-runtime 再封裝的 manager。
如果大家感興趣的話再后面的文章再作詳細介紹。當了解完了這些后,相信 Controller 中的任何技術細節問題都難不倒你了。