一文讀懂K8s controller-runtime
在K8s開發中,經常能聽過controller的概念,那么這些概念在K8s底層是如何實現,本文將詳細介紹。
Controller
在K8s中,實現一個controller是通過controller-runtime(https://github.com/kubernetes-sigs/controller-runtime) 框架來實現的,包括Kubebuilder、operator-sdk等工具也只是在controller-runtime上做了封裝,以便開發者快速生成項目的腳手架而已。
Controller定義在pkg/internal/controller/controller,一個controller主要包含Watch和Start兩個方法,以及一個調協方法Reconcile。在controller的定義中,看上去沒有資源對象的Informer或者Indexer數據,而在K8s中所有與kube-apiserver資源的交互是通過Informer實現的,實際上這里是通過下面的 startWatches 屬性做了一層封裝。
type Controller struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
// ensures that the state of the system matches the state specified in the object.
// Defaults to the DefaultReconcileFunc.
Do reconcile.Reconciler
// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.RateLimitingInterface
// SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
// Deprecated: the caller should handle injected fields itself.
SetFields func(i interface{}) error
// mu is used to synchronize Controller setup
mu sync.Mutex
// Started is true if the Controller has been Started
Started bool
// ctx is the context that was passed to Start() and used when starting watches.
//
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
// while we usually always strive to follow best practices, we consider this a legacy case and it should
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context.Context
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
// Defaults to 2 minutes if not set.
CacheSyncTimeout time.Duration
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
// outside the context of a reconciliation.
LogConstructor func(request *reconcile.Request) logr.Logger
// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
RecoverPanic *bool
}
Watch()
Watch方法首先會判斷當前的controller是否已啟動,如果未啟動,會將watch的內容暫存到startWatches中等待controller啟動。如果已啟動,則會直接調用src.Start(c.ctx, evthdler, c.Queue, prct...), 其中Source可以為informer、kind、channel等。
// Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
...
// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
以informer為例,會通過以下方法添加對應的EventHandler:
_, err := is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
以kind為例,會通過以下方法添加對應的EventHandler:
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
_, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
internal.EventHandler 實現了 OnAdd、OnUpdate、OnDelete 幾個方法。也就是說src.Start方法作用是獲取對應的informer,并注冊對應的EventHandler。
Start()
Start方法有兩個主要功能,一是調用所有startWatches中Source的start方法,注冊EventHandler。
for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
二是啟動Work來處理資源對象。
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
defer wg.Done()
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}()
}
processNextWorkItem從Queue中獲取資源對象,reconcileHandler 函數就是我們真正執行元素業務處理的地方,函數中包含了事件處理以及錯誤處理,真正的事件處理是通過c.Do.Reconcile(req) 暴露給開發者的,所以對于開發者來說,只需要在 Reconcile 函數中去處理業務邏輯就可以了。
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
c.reconcileHandler(ctx, obj)
return true
}
// Reconcile implements reconcile.Reconciler.
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
defer func() {
if r := recover(); r != nil {
if c.RecoverPanic != nil && *c.RecoverPanic {
for _, fn := range utilruntime.PanicHandlers {
fn(r)
}
err = fmt.Errorf("panic: %v [recovered]", r)
return
}
log := logf.FromContext(ctx)
log.Info(fmt.Sprintf("Observed a panic in reconciler: %v", r))
panic(r)
}
}()
return c.Do.Reconcile(ctx, req)
}
Reconcile
Controller的調協邏輯在Reconcile中執行。
type Reconciler interface {
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
Reconcile(context.Context, Request) (Result, error)
}
type Request struct {
// NamespacedName is the name and namespace of the object to reconcile.
types.NamespacedName
}
Reconcile方法的入參Request來自于controller.queue,并且會判斷隊列中的數據類型是否為Reconcile.Request,如果數據類型不一致,則不會執行Reconcile的邏輯。
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// Make sure that the object is a valid request.
req, ok := obj.(reconcile.Request)
if !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.Queue.Forget(obj)
c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
// Return true, don't take a break
return
}
}
那么數據是如何進入隊列queue的呢,實際是通過Informer中的EventHandler入隊的。回到src.Start(c.ctx, evthdler, c.Queue, prct...)方法中,該方法為informer注冊了一個internal.EventHandler。internal.EventHandler實現了OnAdd、OnUpdate、OnDelete等方法,以OnAdd方法為例,該方法最后會調用EventHandler.Create 方法。
type EventHandler struct {
EventHandler handler.EventHandler
Queue workqueue.RateLimitingInterface
Predicates []predicate.Predicate
}
// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e EventHandler) OnAdd(obj interface{}) {
c := event.CreateEvent{}
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
for _, p := range e.Predicates {
if !p.Create(c) {
return
}
}
// Invoke create handler
e.EventHandler.Create(c, e.Queue)
}
EventHandler為一個接口,有EnqueueRequestForObject、Funcs、EnqueueRequestForOwner、enqueueRequestsFromMapFunc四個實現類。以EnqueueRequestForObject為例,其create方法為:
// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}
所以Reconcile協調執行的數據對象,實際是通過Informer中的EventHandler入隊的。
kubebuilder等腳手架框架解析
利用kubebuilder、operator-sdk等框架,可以快速生成相應資源對象的controller代碼。接下來,以kubebuilder為例,對controller代碼邏輯進行解析。
一個完整的controller啟動邏輯包含以下步驟:
1) 在main.go啟動函數中,會定義一個controllerManager對象。
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "9a82ee0d.my.domain",
CertDir: "dir",
...
})
2)通過SetUpWithManager()方法,注冊每種資源對象的controller到controllerManager對象中。
if err = (&controllers.AppServiceReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AppService")
os.Exit(1)
}
3)啟動controllerManager,也即啟動對應資源對象的controller。
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
主要的代碼邏輯在于SetUpWithManager()和mgr.Start()這兩個方法中。
// SetupWithManager sets up the controller with the Manager.
func (r *AppServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&appexamplecomv1.AppService{}).
Complete(r)
}
Builder
ctrl.NewControllerManagedBy(mgr)會返回一個builder對象。
NewControllerManagedBy = builder.ControllerManagedBy
func ControllerManagedBy(m manager.Manager) *Builder {
return &Builder{mgr: m}
}
builder為controller的構造器,其結構定義為:
type Builder struct {
forInput ForInput
ownsInput []OwnsInput
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
ctrl controller.Controller
ctrlOptions controller.Options
name string
}
ctrlOptions指定構建controller的一些配置,主要是Reconciler。forInput指定被協調的對象本身,通過build.For()進行設置。ownsInput指定被協調監聽的子對象資源,通過build.Owns()進行設置。watchesInput能夠自定義EventHandler處理邏輯,通過build.Watches()進行設置。所以,kubebuilder生成的controller默認只會對協調的對象本身進行調協。
type WatchesInput struct {
src source.Source
eventhandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}
builder.Complete()會調用Builder.Build()進行構造。Build()包含doController()和doWatch()這兩個重要方法。
DoController()
doController通過資源對象的 GVK 來獲取 Controller 的名稱,最后通過一個 newController 函數來實例化Controller。
controllerName, err := blder.getControllerName(gvk, hasGVK)
blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
newContrller為controller.New的別名,方法為:func New(name string, mgr manager.Manager, options Options) (Controller, error) { c, err := NewUnmanaged(name, mgr, options) if err != nil { return nil, err }
// Add the controller as a Manager components
return c, mgr.Add(c)
}
c, err := NewUnmanaged(name, mgr, options)初始化Controller實例,Controller 實例化完成后,又通過 mgr.Add(c) 函數將控制器添加到 Manager 中去進行管理。controllerManager 的 Add 函數傳遞的是一個 Runnable 參數,Runnable 是一個接口,用來表示可以啟動的一個組件,而恰好 Controller 實際上就實現了這個接口的 Start 函數,所以可以通過 Add 函數來添加 Controller 實例。
DoWatch()
DoWatch實現比較簡單,就是調用controller.watch來注冊EventHandler事件。DoWatch方法會調用controller.Watch()方法來注冊EventHandler。可以看到對于forInput這類資源,默認的EventHandler為EnqueueRequestForObject,對于ownsInput這類資源,默認的EventHandler為EnqueueRequestForOwner,這兩類handler已在上文提到過,均實現了Create()、Update()、Delete()等方法,能夠將被調協的資源對象入隊。
func (blder *Builder) doWatch() error {
// Reconcile type
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
// Watches the managed types
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}
// Do the watch requests
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
}
srckind.Type = typeForSrc
}
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}
}
return nil
}
watchesInput這類資源需要自己實現EventHandler,使用類似以下方式實現相應功能。根據之前的結論,controller中調協的資源對象來自于queue,而queue中的數據是通過EventHandler的Create、Update、Delete等處理邏輯進行入隊的。因此這時controller的處理順序為:EventHandler中定義的邏輯->入隊->Reconcile。
func (r *AppServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.Infof("開始Reconcile邏輯")
...
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *AppServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("appServiceController").
Watches(
&source.Kind{
Type: &appexamplecomv1.AppService{},
},
handler.Funcs{
CreateFunc: func(createEvent event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) {
klog.Infof("createFunc")
limitingInterface.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: createEvent.Object.GetName(),
Namespace: createEvent.Object.GetNamespace(),
}})
},
UpdateFunc: func(updateEvent event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
klog.Infof("updateFunc")
},
DeleteFunc: func(deleteEvent event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
klog.Infof("deleteFunc")
},
}).
Complete(r)
}
上述代碼只有在Create時進行了入隊處理,因此只有在創建資源時會進入Reconcile的邏輯。
Manager.Start()
在注冊controller到manager后,需要使用mgr.Start(ctrl.SetupSignalHandler())來啟動manager。之前說過,注冊Controller時調用DoController方法中的mgr.Add()將controller已runnable的形式添加到了Manager。Manager.start()正是調用了cm.runnables的start方法,也即controller.start()來啟動controller。
func (cm *controllerManager) Start(ctx context.Context) (err error) {
...
if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}
// Start and wait for caches.
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}
// Start the non-leaderelection Runnables after the cache has synced.
if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}
...
}