成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

一篇帶給你pod創建源碼分析

運維 系統運維
接觸kubernetes已經4年多了,不過多是停留在能夠使用,對其原理、源碼不是很熟悉。對于平常執行的命令,它背后執行的流程、邏輯也不是很清楚。

[[424936]]

大家好,我是華仔。

接觸kubernetes已經4年多了,不過多是停留在能夠使用,對其原理、源碼不是很熟悉。對于平常執行的命令,它背后執行的流程、邏輯也不是很清楚。所以,最近打算去看看k8s各模塊的源碼。一來是加深對k8s各模塊的理解和認識;二來是方便以后遇到問題好分析問題的根本原因,有理有據,則可以服人;再者后續跳槽也不怕被面試官的技術問題所難到了。那么今天,就來簡單說一說pod創建的源碼吧。文章有錯誤的地方還請指正,輕噴。首先,k8s的源碼在github上即可獲取。本次我看的是1.21.3。另外,很多翻譯都是直譯或翻譯軟件翻譯的。請諒解。

正文

1、k8s源碼中針對pod的增刪改查是在源碼包/pkg/kubelet/kubelet.go中的syncLoop()進行。如下所示:

  1. // syncLoop is the main loop for processing changes. It watches for changes from 
  2. // three channels (file, apiserver, and http) and creates a union of them. For 
  3. // any new change seen, will run a sync against desired state and running state. If 
  4. // no changes are seen to the configuration, will synchronize the last known desired 
  5. // state every sync-frequency seconds. Never returns
  6. // syncLoop是處理更改的主循環。它感知來自三個channel(file,apiserver,http)的pod的變化,并且聚合它們。有任何的改變發生,將運行狀態同步為期望狀態。反之,則在每個同步周期內同步最后已知的期望狀態。 
  7. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { 
  8.  klog.InfoS("Starting kubelet main sync loop"

在syncLoop()中則通過kl.syncLoopIteration()針對pod具體執行具體的操作。

  1. kl.syncLoopMonitor.Store(kl.clock.Now()) 
  2. if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { 
  3.  break 

2、在syncLoopIteration有幾個重要的參數,如下所示:

  1. // Arguments: 
  2. // 1.  configCh:       a channel to read config events from 
  3. // 2.  handler:        the SyncHandler to dispatch pods to 
  4. // 3.  syncCh:         a channel to read periodic sync events from 
  5. // 4.  housekeepingCh: a channel to read housekeeping events from 
  6. // 5.  plegCh:         a channel to read PLEG updates from 
  7.  
  8. // * configCh: dispatch the pods for the config change to the appropriate 
  9. //             handler callback for the event type 
  10. // * plegCh: update the runtime cache; sync pod 
  11. // * syncCh: sync all pods waiting for sync 
  12. // * housekeepingCh: trigger cleanup of pods 
  13. // * health manager: sync pods that have failed or in which one or more 
  14. //                     containers have failed health checks 
  15.  
  16. func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, 
  17.  syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { 
  18.  select { 
  19.  case u, open := <-configCh: 
  20.   // Update from a config source; dispatch it to the right handler 
  21.   // callback. 
  22.   if !open { 
  23.    klog.ErrorS(nil, "Update channel is closed, exiting the sync loop"
  24.    return false 
  25.   } 
  26.      

SyncHandler是一個interface。包含對pod常見操作的幾個方法。該接口由kubelet來實現。如下所示:

  1. // SyncHandler is an interface implemented by Kubelet, for testability 
  2. # pod創建、更新、 刪除... 
  3. type SyncHandler interface { 
  4.  HandlePodAdditions(pods []*v1.Pod)  
  5.  HandlePodUpdates(pods []*v1.Pod)  
  6.  HandlePodRemoves(pods []*v1.Pod) 
  7.  HandlePodReconcile(pods []*v1.Pod) 
  8.  HandlePodSyncs(pods []*v1.Pod) 
  9.  HandlePodCleanups() error 

3、針對pod可進行的操作如下,每個操作都有對應的方法。比如ADD,就會去執行HandlePodAdditions方法

  1. // These constants identify the PodOperations that can be made on a pod configuration. 
  2. const ( 
  3.  // SET is the current pod configuration. 
  4.  SET PodOperation = iota 
  5.  // ADD signifies pods that are new to this source. 
  6.  ADD 
  7.  // DELETE signifies pods that are gracefully deleted from this source. 
  8.  DELETE 
  9.  // REMOVE signifies pods that have been removed from this source. 
  10.  REMOVE 
  11.  // UPDATE signifies pods have been updated in this source. 
  12.  UPDATE 
  13.  // RECONCILE signifies pods that have unexpected status in this source, 
  14.  // kubelet should reconcile status with this source. 
  15.  RECONCILE 
  16.  
  17.  
  18. switch u.Op { 
  19.   case kubetypes.ADD
  20.    klog.V(2).InfoS("SyncLoop ADD""source", u.Source, "pods", format.Pods(u.Pods)) 
  21.    // After restarting, kubelet will get all existing pods through 
  22.    // ADD as if they are new pods. These pods will then go through the 
  23.    // admission process and *may* be rejected. This can be resolved 
  24.    // once we have checkpointing. 
  25.    handler.HandlePodAdditions(u.Pods) 

4、HandlePodAdditions又是如何去執行創建pod的呢?主要有以下幾個操作:

  1. 1. 根據pod的創建時間進行排序 
  2. sort.Sort(sliceutils.PodsByCreationTime(pods)) 
  3.  
  4. 2. 將pod添加到podmanager中.因為kubelet它會依賴這個pod manager作為期望狀態的一個憑證。 
  5. 如果一個在pod manager中無法查詢,那么就意味著它已經被apiserver刪除了,不再需要其他操作 
  6. // Always add the pod to the pod manager. Kubelet relies on the pod 
  7. // manager as the source of truth for the desired state. If a pod does 
  8. // not exist in the pod manager, it means that it has been deleted in 
  9. // the apiserver and no action (other than cleanup) is required. 
  10. kl.podManager.AddPod(pod) 
  11.  
  12. 3. 判斷pod是不是靜態pod 
  13. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) 
  14.  
  15. 4. 通過dispatchWork分發任務 
  16. kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) 
  17.  
  18. 5. 將pod加入到probe manager,即健康檢查.包括startup probe、liveness probe、readiness probe。  
  19. kl.probeManager.AddPod(pod) 

dispatchWork又做了哪些事情呢?如下:

  1. // Run the sync in an async worker. 在一個異步worker中執行同步 
  2.  kl.podWorkers.UpdatePod(&UpdatePodOptions{ 
  3.   Pod:        pod, 
  4.   MirrorPod:  mirrorPod, 
  5.   UpdateType: syncType, 
  6.   OnCompleteFunc: func(err error) { 
  7.    if err != nil { 
  8.     metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start)) 
  9.    } 
  10.   }, 
  11.  }) 

那么UpdatePod()又做哪些事情呢?

  1. // Creating a new pod worker either means this is a new pod, or that the 
  2. // kubelet just restarted. In either case the kubelet is willing to believe 
  3. // the status of the pod for the first pod worker sync. See corresponding 
  4. // comment in syncPod. 
  5.   // 創建一個新的pod worker,意味著這是一個新的pod 
  6. go func() { 
  7.  defer runtime.HandleCrash() 
  8.  p.managePodLoop(podUpdates) 
  9. }() 

managePodLoop()去執行同步。

  1. for update := range podUpdates { 
  2.   err := func() error { 
  3.    podUID := update.Pod.UID 
  4.    // This is a blocking call that would return only if the cache 
  5.    // has an entry for the pod that is newer than minRuntimeCache 
  6.    // Time. This ensures the worker doesn't start syncing until 
  7.    // after the cache is at least newer than the finished time of 
  8.    // the previous sync. 
  9.    status, err := p.podCache.GetNewerThan(podUID, lastSyncTime) 
  10.    if err != nil { 
  11.     // This is the legacy event thrown by manage pod loop 
  12.     // all other events are now dispatched from syncPodFn 
  13.     p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err) 
  14.     return err 
  15.    } 
  16.       // 這里去做同步 
  17.    err = p.syncPodFn(syncPodOptions{ 
  18.     mirrorPod:      update.MirrorPod, 
  19.     pod:            update.Pod, 
  20.     podStatus:      status, 
  21.     killPodOptions: update.KillPodOptions, 
  22.     updateType:     update.UpdateType, 
  23.    }) 
  24.    lastSyncTime = time.Now() 
  25.    return err 
  26.   }() 

5、最終調用到pkg/kubelet/kuberuntime/kuberuntime_manager.go中SyncPod()進行pod的創建

  1. // SyncPod syncs the running pod into the desired pod by executing following steps: 
  2. // 執行以下的步驟將運行的pod同步到期望的狀態 
  3. //  1. Compute sandbox and container changes. 
  4. // 計算sanbox和container改變 
  5. //  2. Kill pod sandbox if necessary. 
  6. // 如果有必要就刪除pod sandbox 
  7. //  3. Kill any containers that should not be running. 
  8. // 刪除不需要運行的容器 
  9. //  4. Create sandbox if necessary. 
  10. // 需要的情況下創建sandbox 
  11. //  5. Create ephemeral containers. 
  12. // 創建臨時容器 
  13. //  6. Create init containers. 
  14. // 創建初始化容器 
  15. //  7. Create normal containers. 
  16. // 創建普通容器 
  17. func (m *kubeGenericRuntimeManager) SyncPod() 
  18.  
  19. // Step 1: Compute sandbox and container changes. 
  20.  podContainerChanges := m.computePodActions(pod, podStatus) 
  21.  klog.V(3).InfoS("computePodActions got for pod""podActions", podContainerChanges, "pod", klog.KObj(pod)) 
  22.  if podContainerChanges.CreateSandbox { 
  23.   ref, err := ref.GetReference(legacyscheme.Scheme, pod) 
  24.   if err != nil { 
  25.    klog.ErrorS(err, "Couldn't make a ref to pod""pod", klog.KObj(pod)) 
  26.   } 
  27.   if podContainerChanges.SandboxID != "" { 
  28.    m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created."
  29.   } else { 
  30.    klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it""pod", klog.KObj(pod)) 
  31.   } 
  32.  } 
  33.    
  34.   // Step 2: Kill the pod if the sandbox has changed. 
  35.  if podContainerChanges.KillPod { 
  36.    
  37.   // Step 3: kill any running containers in this pod which are not to keep. 
  38.   for containerID, containerInfo := range podContainerChanges.ContainersToKill { 
  39.    klog.V(3).InfoS("Killing unwanted container for pod""containerName", containerInfo.name"containerID", containerID, "pod", klog.KObj(pod)) 
  40.    killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name
  41.    result.AddSyncResult(killContainerResult) 
  42.    if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil { 
  43.     killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) 
  44.     klog.ErrorS(err, "killContainer for pod failed""containerName", containerInfo.name"containerID", containerID, "pod", klog.KObj(pod)) 
  45.     return 
  46.    } 
  47.        
  48.       // Step 4: Create a sandbox for the pod if necessary. 
  49.  podSandboxID := podContainerChanges.SandboxID 
  50.  if podContainerChanges.CreateSandbox { 
  51.   var msg string 
  52.   var err error 
  53.  
  54.   klog.V(4).InfoS("Creating PodSandbox for pod""pod", klog.KObj(pod)) 
  55.   createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod)) 
  56.   result.AddSyncResult(createSandboxResult) 
  57.   podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt) 
  58.      
  59.     // Step 5: start ephemeral containers 
  60.  // These are started "prior" to init containers to allow running ephemeral containers even when there 
  61.  // are errors starting an init container. In practice init containers will start first since ephemeral 
  62.  // containers cannot be specified on pod creation. 
  63.  if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) { 
  64.   for _, idx := range podContainerChanges.EphemeralContainersToStart { 
  65.    start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx])) 
  66.   } 
  67.  } 
  68.    
  69.   // Step 6: start the init container. 
  70.  if container := podContainerChanges.NextInitContainerToStart; container != nil { 
  71.   // Start the next init container. 
  72.   if err := start("init container", containerStartSpec(container)); err != nil { 
  73.    return 
  74.   } 
  75.  
  76.   // Successfully started the container; clear the entry in the failure 
  77.   klog.V(4).InfoS("Completed init container for pod""containerName", container.Name"pod", klog.KObj(pod)) 
  78.  } 
  79.  
  80.  // Step 7: start containers in podContainerChanges.ContainersToStart. 
  81.  for _, idx := range podContainerChanges.ContainersToStart { 
  82.   start("container", containerStartSpec(&pod.Spec.Containers[idx])) 
  83.  } 

6、另外,pod worker還要做以下事情:

  1. # 創建pod數據目錄、volume、獲取image pull secrets。。。 
  2. newPodWorkers(klet.syncPod --->pkg/kubelet/kubelet.go) //通過syncPod 
  3.                                kubetypes.SyncPodKill 
  4.                                 kubetypes.SyncPodCreate 
  5.                                 podStatus.IPs = append(podStatus.IPs, ipInfo.IP) 
  6.                                 runnable.Admit 
  7.                                 kubetypes.IsStaticPod(pod) 
  8.                                kl.makePodDataDirs(pod) 
  9.                                 kl.volumeManager.WaitForAttachAndMount(pod) 
  10.                                 kl.getPullSecretsForPod(pod) 
  11.                                 kl.containerRuntime.SyncPod(pkg/kubelet/container/runtime.go) 

本文轉載自微信公眾號「運維開發故事」

 

責任編輯:姜華 來源: 運維開發故事
相關推薦

2021-07-12 06:11:14

SkyWalking 儀表板UI篇

2022-03-01 07:51:53

IDEAMaven父子工程

2021-06-21 14:36:46

Vite 前端工程化工具

2021-01-28 08:55:48

Elasticsear數據庫數據存儲

2023-03-29 07:45:58

VS編輯區編程工具

2021-04-14 14:16:58

HttpHttp協議網絡協議

2021-04-08 11:00:56

CountDownLaJava進階開發

2022-03-22 09:09:17

HookReact前端

2021-07-21 09:48:20

etcd-wal模塊解析數據庫

2021-04-01 10:51:55

MySQL鎖機制數據庫

2021-03-12 09:21:31

MySQL數據庫邏輯架構

2022-02-17 08:53:38

ElasticSea集群部署

2022-04-29 14:38:49

class文件結構分析

2024-06-13 08:34:48

2022-02-25 15:50:05

OpenHarmonToggle組件鴻蒙

2021-10-28 08:51:53

GPIO軟件框架 Linux

2023-03-13 09:31:04

2021-07-08 07:30:13

Webpack 前端Tree shakin

2021-05-08 08:36:40

ObjectString前端

2021-04-23 08:59:35

ClickHouse集群搭建數據庫
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲一级毛片 | 一区日韩| 日日干天天干 | 成年人网站免费视频 | 亚洲国产高清在线 | 国产清纯白嫩初高生在线播放视频 | 国产精品www | 色综合激情 | 99爱视频 | www.日本国产 | 日韩中文在线观看 | 日韩一级 | av手机免费在线观看 | 国产成人免费 | 亚洲精品在线看 | 天堂综合网 | 一区二区三区欧美 | a亚洲精品 | 日本 欧美 国产 | 国产伦精品一区二区三毛 | 精品在线 | www.yw193.com| 中文字幕一级毛片 | 午夜精品一区二区三区在线观看 | 欧美男人的天堂 | 欧美专区在线 | 精品国产乱码久久久久久蜜退臀 | 国产精品久久久久久久久久免费看 | 亚洲欧美日韩精品久久亚洲区 | 欧美综合国产精品久久丁香 | 久久免费视频在线 | 伊人久久在线观看 | 国产精品视频一区二区三区不卡 | 毛片免费看 | 色综合视频 | 久久99这里只有精品 | 国产999精品久久久影片官网 | 精品亚洲一区二区 | 在线看无码的免费网站 | 国产一级一级毛片 | 成人精品视频99在线观看免费 |