Kubernetes API Server handler 注冊過程分析
前言
K8s提供 Aggregated APIServer? 的擴展方式,編寫 Aggregated APIServer? 本質上和K8s構建方式類似,理解 APiServer 資源的加載方式,能更好好的理解如何開發Aggregated APIServer。本文以內置資源的 handler 注冊過程為線索介紹了 APiServer 的啟動過程和 handler 注冊過程。使用k8s代碼commit id為c6970e64528ba78b74bf77b86f9b78b7b61bd0cd
APIServer啟動過程介紹
圖1 APIServer啟動流程
圖1給出了 ApiServer 的初始化流程,首先通過 CreateServerChain 構造出3個APIServer:
- AggregatorServer:攔截Aggregated APIServer? 中定義的資源對象請求,并轉發給相關的Aggregated APIServer 處理。
- KubeAPIServer:用于處理 k8s 的內建資源,如:Deployment,ConfigMap 等。
- APIExtensionServer:負責處理用戶自定義資源。
它們之間的處理順序為如下圖所示,當用戶請求進來,先判斷 AggregatorServer? 能否處理,否則代理給 kubeApiServer? ,如果 kubeApiServer? 不能處代理給 ApiExtensionServer 處理,如果都不能處理則交給 notFoundHandler 處理。
圖2 三種 APIServer 請求順序
限于篇幅原因,本文主要分析 kubeapiserver 的啟動過程。
CreateApiServerConfig? 通過調用 buildGenericConfig? 構建 genericapiserver.Config。genericapiserver.Config? 中包含了啟動Genericapiserver? 所需要的配置信息,比如:RequestTimeout? 定義了請求的超時時間,AdmissionControl? 對象進行準入控制。buildGenericConfig? 中需要注意的是 BuildHandlerChainFunc?,請求在路由給資源對象的handler前先經過的BuildHandlerChainFunc? 中定義的 Filter? 。參考圖1,通過深入 buildGenericConfig? 可以發現 BuildHandlerChainFunc? 傳入的是 DefaultBuildHandlerChain? ,其中 Filter 先定義的后調用。
// k8s.io/apiserver/pkg/server/config.go
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := filterlatency.TrackCompleted(apiHandler)
// 構造權限檢查filter
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
...
// 構造認證filter
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
...
// 構造請求超時filter, LongRunningFunc會判斷該請求是否是需要LongRunning的,比如watch的請求,如果是,該filter不會對這類請求生效
// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
...
// 初始化RequestInfo的filter并將其放入context中,后續的處理邏輯可以從context直接獲取RequestInfo
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
....
return handler
}
CreateKubeAPIServer? 中調用了kubeAPIServerConfig.Complete().New?構造出了 kubeAPIServer? 的 GenericServer。kubeAPIServerConfig.Complete().New?中通過調用 m.InstallLegacyAPI? 初始化核心資源并添加進路由中,對應的是以 api 開頭的資源,如:Pod,ConfigMap 等。調用 m.InstallAPI 初始化以 apis 開頭的內置資源如:Deployment。
handler的注冊過程
從圖1可以看出 InstallAPI? 與 InstallLegacyAPI? 的創建過程基本類似,本文主要介紹 InstallAPI 的初始化過程。
在調用 InstallAPI? 之前kubeAPIServerConfig.Complete().New?會先創建內置資源對象的RESTStorageProvider? 作為 InstallAPI 的入參
//pkg/controlplane/instance.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
...
// 構造內置資源的RESTStorageProvider
restStorageProviders := []RESTStorageProvider{
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
...
}
RESTStorageProvider? 是一個接口,通過其 NewRESTStorage? 構造出 APIGroupInfo? ,APIGroupInfo? 包含注冊資源所需的基本信息比如編解碼器,組下所有資源的 Storage 對象VersionedResourcesStorageMap。
//k8s.io/apiserver/pkg/server/genericapiserver.go
// Info about an API group.
type APIGroupInfo struct {
PrioritizedVersions []schema.GroupVersion
// Info about the resources in this group. It's a map from version to resource to the storage.
VersionedResourcesStorageMap map[string]map[string]rest.Storage
...
// NegotiatedSerializer controls how this group encodes and decodes data
NegotiatedSerializer runtime.NegotiatedSerializer
// ParameterCodec performs conversions for query parameters passed to API calls
ParameterCodec runtime.ParameterCodec
...
}
VersionedResourcesStorageMap? 需要重點注意,編寫 Aggregated APIServer?主要邏輯是通過 NewDefaultAPIGroupInfo? 初始化 APIGroupInfo? 以后設置 VersionedResourcesStorageMap? 屬性。VersionedResourcesStorageMap?的簽名是 map[string]map[string]rest.Storage?。第一個key是版本號,第二個key是資源名稱,資源名稱可以是 deployment 這種資源,同時也能是子資源如 pod/status? , pod/log? 等是pod的子資源有單獨的storage。最終構建handler的請求路徑是基于 VersionedResourcesStorageMap? 中提供的版本號和資源名稱確定的 。rest.Storage 用于處理具體的請求,其聲明如下:
// k8s.io/apiserver/pkg/registry/rest/rest.go
// Storage is a generic interface for RESTful storage services.
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
// that objects may implement any of the below interfaces.
type Storage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
// Destroy cleans up its resources on shutdown.
// Destroy has to be implemented in thread-safe way and be prepared
// for being called more than once.
Destroy()
}
實現 rest.Storage? 的接口最基本的,如果需要支持不同的請求,還需要實現其他的接口,相關定義在 k8s.io/apiserver/pkg/registry/rest/rest.go中,如:
// k8s.io/apiserver/pkg/registry/rest/rest.go
// 資源對象支持POST請求,例入通過kubectl create一個資源對象。
// Creater is an object that can create an instance of a RESTful object.
type Creater interface {
// New returns an empty object that can be used with Create after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
// Create creates a new version of a resource.
Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error)
}
// 資源對象支持GET請求,例如通過kubectl get 一個資源對象。
// Getter is an object that can retrieve a named RESTful resource.
type Getter interface {
// Get finds a resource in the storage by name and returns it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}
// 支持對資源對象進行watch操作 例如通過kubectl get 資源對象 -w。
type Watcher interface {
// 'label' selects on labels; 'field' selects on the object's fields. Not all fields
// are supported; an error should be returned if 'field' tries to select on a field that
// isn't supported. 'resourceVersion' allows for continuing/starting a watch at a
// particular version.
Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error)
}
后續的處理中會依據 Creater? ,Getter? 和 Watcher? 等接口生成對應請求的handler,后文會進行具體的分析。k8s的內置資源存儲都使用 etcd,因此內置資源的 Storage 是通過 Store? 構建。Store? 定義在 /k8s.io/apiserver/pkg/registry/generic/registry/store.go?文件中,已經實現 Creater? , Getter?, Watcher?等接口,其他的資源只需在初始化 Store 時傳入一些必須的參數即可,無需編寫存儲層的交互代碼。下面給出了構造 deployment 的 store 的過程,其他內置資源大同小異。
// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {
// 創建一個deployments的genericregistry.Store
store := &genericregistry.Store{
// 初始化一個空資源對象,這里使用的是internal的版本,下面定義的各種strategy操作的對象也是internal版本,這樣就不用為每一種版本編寫一個strategy策略
NewFunc: func() runtime.Object { return &apps.Deployment{} },
// 初始化一個空資源對象列表
NewListFunc: func() runtime.Object { return &apps.DeploymentList{} },
DefaultQualifiedResource: apps.Resource("deployments"),
// 創建更新刪除策略 主要是做校驗及控制那些字段不能被用戶覆蓋用
CreateStrategy: deployment.Strategy,
UpdateStrategy: deployment.Strategy,
DeleteStrategy: deployment.Strategy,
ResetFieldsStrategy: deployment.Strategy,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
// 繼續完成store其他屬性的初始化,比如初始化store.Storage屬性。Storage主要用于和底層存儲層交互
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, nil, err
}
statusStore := *store
// deployment的status子資源也是使用store, 區別是更新策略不一樣, 即在update時會用舊對象的spec和lable覆蓋新對象的,防止非status字段被用戶意外覆蓋
statusStore.UpdateStrategy = deployment.StatusStrategy
statusStore.ResetFieldsStrategy = deployment.StatusStrategy
return &REST{store}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}, nil
}
InstallAPIs? 調用鏈條比較深。參考圖1,最終會來到k8s.io/apiserver/pkg/endpoints/groupversion.go?的 InstallREST? 方法。InstallREST? 方法構造出 handler 的前綴,創建APIInstaller?,然后調用installer.Install()方法繼續handler的注冊
// k8s.io/apiserver/pkg/endpoints/groupversion.go
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
// 從InstallAPI調用鏈下來這里的g.Root為/apis,這樣就可以確定handler的前綴為/apis/{goup}/{version}
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
installer.Install()? 方法會調用registerResourceHandlers? 方法,真正開始創建和注冊處理請求的 handler,需要說明的是a.group.Storage? 是上文提到的VersionedResourcesStorageMap? 傳入版本號后獲得的 map。讀者可以自行參考圖1的調用鏈進行分析。a.registerResourceHandlers? 就是為每一種Storage注冊handlers
// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var resourceInfos []*storageversion.ResourceInfo
var errors []error
ws := a.newWebService()
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
paths := make([]string, len(a.group.Storage))
var i int = 0
// a.goup.Storage的簽名是 map[string]Storage, for循環的path是map的key,即資源名稱
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
...
}
registerResourceHandlers? 會依據rest.Storage實現的接口生成相關的action。最終根據action生成handler并注冊到rest容器中。
// k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
...
// 初始化rest容器,根目錄是APIInstaller的prefix屬性,從InstallAPI調用鏈下來值為/apis/{goup}/{version}
ws := a.newWebService()
...
// 進行類型轉換判斷當前的storage支持哪些類型的操作
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
// Get the list of actions for the given scope.
switch {
case !namespaceScoped:
// 構造有無namespace資源的action
// Handle non-namespace scoped resources like nodes.
...
default:
// 構造有namespace資源的action
// 構造handler的注冊路徑
namespaceParamName := "namespaces"
// Handler for standard REST verbs (GET, PUT, POST and DELETE).
namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
namespacedPath := namespaceParamName + "/{namespace}/" + resource
namespaceParams := []*restful.Parameter{namespaceParam}
//resourcePath的值為 /namespaces/{namespace}/{resource}
resourcePath := namespacedPath
resourceParams := namespaceParams
// itemPath: /namespaces/{namespace}/{resource}/{name}
// name是請求資源對象的名字
itemPath := namespacedPath + "/{name}"
nameParams := append(namespaceParams, nameParam)
proxyParams := append(nameParams, pathParam)
itemPathSuffix := ""
if isSubresource {
itemPathSuffix = "/" + subresource
// 有子資源等情況下 resourcePath被定義為:/namespaces/{namespace}/{resource}/{name}/{subResource}
itemPath = itemPath + itemPathSuffix
// itemPath與resourcePath的值一樣
resourcePath = itemPath
resourceParams = nameParams
}
apiResource.Name = path
apiResource.Namespaced = true
apiResource.Kind = resourceKind
namer := handlers.ContextBasedNaming{
Namer: a.group.Namer,
ClusterScoped: false,
}
// 根據storage實現的接口添加添加相關的action
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
if getSubpath {
actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
}
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
// list or post across namespace.
// For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods.
// TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete)
if !isSubresource {
actions = appendIf(actions, action{"LIST", resource, params, namer, true}, isLister)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer, true}, allowWatchList)
}
}
...
for _, action := range actions {
...
switch action.Verb {
case "GET": // Get a resource.
var handler restful.RouteFunction
// 構造get請求的handler
// restfulGetResourceWithOptions和restfulGetResource將handlers.GetResource函數轉換成restful.RouteFunction,即handler的函數簽名
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, reqScope)
}
...
// 將handler注冊到rest容器中
// action.Path是上面定義的itemPath或resourcePath,對于GET來說是itemPath
// 當前注冊的handler的路徑是ws的根路徑加上ation.Path. 完整的路徑為:/apis/{goup}/{version}/namespaces/{namespace}/{resource}/{name}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
if isGetterWithOptions {
if err := AddObjectParams(ws, route, versionedGetOptions); err != nil {
return nil, nil, err
}
}
addParams(route, action.Params)
routes = append(routes, route)
}
case "LIST": // List all resources of a kind.
...
case "PUT": // Update a resource.
...
case "PATCH": // Partially update a resource
...
case "POST": // Create a resource.
...
case "DELETE": // Delete a resource.
....
}
...
}
registerResourceHandlers? 中創建的handler并不是直接調用Creater? ,Updater?等接口定義的方法,而是在外面包了一層代碼進行一些額外的處理,例如對象的編解碼,admission control 的處理邏輯,針對 watch 這種長鏈接需要進行協議的處理等,相關的定義在k8s.io/apiserver/pkg/endpoints/handlers包下。文本以Get和Create例,分析請求的處理邏輯。
Get請求的處理過程比較簡單,通過請求的查詢串構造出metav1.GetOptions ,然后交給 Getter 接口處理,最后在將查詢結果進行轉換發回給請求者。
// k8s.io/apiserver/pkg/endpoints/handlers/get.go
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
// check for export
options := metav1.GetOptions{}
// 獲取查詢串
if values := req.URL.Query(); len(values) > 0 {
...
// 將查詢串解碼成metav1.GetOptions
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
err = errors.NewBadRequest(err.Error())
return nil, err
}
}
if trace != nil {
trace.Step("About to Get from storage")
}
// 交給Getter接口處理
return r.Get(ctx, name, &options)
})
}
// getResourceHandler is an HTTP handler function for get requests. It delegates to the
// passed-in getterFunc to perform the actual get.
func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
...
namespace, name, err := scope.Namer.Name(req)
...
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
...
result, err := getter(ctx, name, req, trace)
...
// 對處理結果進行轉化為用戶期望的格式并寫入到response中返回給用戶
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
}
}
Create的處理邏輯在 createHandler 中,代碼較長,主要做以下幾件事情:
- 對查詢串進行解碼生成 metav1.CreateOptions 。
- 對請求的body體中的數據進行解碼,生成資源對象。解碼的對象版本是 internal 版本,internal 版本是該資源對象所有版本字段的全集。針對不同版本的對象內部可以使用相同的代碼進行處理。
- 對對象進行修改的準入控制,判斷是否修需要修改對象。
- 交給creater接口創建資源對象。
- 將數據轉換為期望的格式寫入 response 中,調用 creater 接口返回的結果仍然是 internal 版本,編碼時,會編碼成用戶請求的版本返回給用戶。
// k8s.io/apiserver/pkg/endpoints/handlers/create.go
// CreateNamedResource returns a function that will handle a resource creation with name.
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
...
// 從request中取出請求body
body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
...
// 對查詢傳進行解碼生成metav1.CreateOptions
options := &metav1.CreateOptions{}
values := req.URL.Query()
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
...
}
// 將請求body解碼成資源對象, defaultGVK是用戶請求的版本,這里decoder解碼出來的對象是internal版本的對象
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
...
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
// 構建調用create方法的函數
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
...
// 執行mutation的admission操作,即在創建時對象進行修改操作。
// admin在buildGenericConfig中初始化,通過config傳遞給genericsever,然后傳遞到此處
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
// Dedup owner references again after mutating admission happens
dedupOwnerReferencesAndAddWarning(obj, req.Context(), true)
// 調用創建方法
result, err := requestFunc()
...
return result, err
})
...
// resutl也是internal版本的對象,transformResponseObject會轉換為用戶請求的版本并輸出
transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
}
Create請求的流程可以總結為下圖
圖3 create請求處理流程
總結
本文介紹了 K8s內置資源的注冊過程,對APIServer的訪問會先經過 filter,再路由給具體的 handler。filter 在 DefaultBuildHandlerChain? 中定義,主要對請求做超時處理,認證,鑒權等操作。handler 的注冊則是初始化 APIGoupInfo? 并設置其 VersionedResourcesStorageMap? 后作為入參,調用 GenericAPIServer.InstallAPIGroups?即可完成 handler 的注冊。k8s.io/apiserver/pkg/endpoints/handlers?包中的代碼則是對用戶請求做編解碼,對象版本轉換,協議處理等操作,最后在交給rest.Storage 具體實現的接口進行處理。
參考
? https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kube_apiserver.html#kube-apiserver-處理流程[1]
? https://hackerain.me/2020/10/05/kubernetes/kube-apiserver-genericapiserver.html
? https://hackerain.me/2020/09/19/kubernetes/kube-apiserver-storage-overview.html
? https://github.com/gosoon/source-code-reading-notes/blob/master/kubernetes/kube_apiserver.md
? https://time.geekbang.org/column/article/41876