成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Kubernetes CSI 實現舉例,你學會了嗎?

云計算 云原生
通過 s3fs,用戶可以像訪問本地文件系統一樣訪問 Amazon S3 存儲桶中的對象。這使得開發人員可以直接在應用程序中使用 S3 存儲桶,而無需使用 Amazon S3 的 API 進行手動操作。

前言

接下去我們簡單來看下一個 CSI Plugin 開源項目的實現,github 地址是:https://github.com/alibaba/open-object。

這個 CSI Plugin 實現了將 Minio 作為后端存儲。以這個項目為例子,主要的考慮是:(1)它的實現不復雜,整體上的代碼量就 2k 行左右,便于閱讀和了解;(2)項目實現了將 Minio 作為后端存儲,在這個項目基礎之上,我們可以進一步擴展將阿里云 OSS 作為后端存儲的實現,便于進一步的實操。(3)Minio 是對象存儲,了解它的實現可以解決我心中的一個疑惑,就是我們如何將對一個文件的讀寫訪問,轉換為對 bucket 的讀寫請求。通過后續的了解,我們可以知道這主要是依賴了像 s3f3 這樣的程序,s3f3 通過 FUSE 將一個 bucket 表現為一個文件。

從部署開始

CSIDriver 資源對象

CSIDriver 包含了部署的 CSI Plugin 有關的信息。Kubernetes AD Controller 通過該對象來決定是否需要 attach。Kubelet 通過該對象決定 mount 時是否需要傳遞 Pod 信息。CSIDriver 資源對象未劃分命名空間。

可以了解到以下幾個點:

  • OpenObject 這個 CSI 插件不需要 attach 步驟,并且在 mount 的時候需要傳遞 Pod 信息。
apiVersion: storage.k8s.io/v1beta1
kind: CSIDriver
metadata:
  name: object.csi.aliyun.com
spec:
  attachRequired: false
  podInfoOnMount: true
  volumeLifecycleModes:
  - Persistent

CSI Node Plugin

可以看到 Daemonset 中包含了 3 個容器:

  • init 容器,啟動一個 connector,這個 connector 運行在宿主機上,監聽 /etc/open-object/connector.sock。通過這個 socket 文件,connector 會收到相應的執行命令,并執行這些命令。總體上的作用相當于宿主機和容器之間的連接器。
  • driver-registrar 容器,調用 csi-plugin 的 NodeGetInfo 的接口,將 CSI Node Plugin 的信息通過 kubelet 的插件注冊機制在對應節點的 kubelet 上進行注冊,同時把 CSI Node Plugin 的通信地址同步給 kubelet。
  • CSI-Plugin 容器,這個容器里面運行的就是 CSI Node Plugin,也是我們實現的內容。
kind: DaemonSet
apiVersion: apps/v1
metadata:
  name: open-object
  namespace: {{ .Values.namespace }}
spec:
  selector:
    matchLabels:
      app: open-object
  template:
    metadata:
      labels:
        app: open-object
    spec:
      tolerations:
      - operator: Exists
      serviceAccount: open-object
      hostNetwork: true
      hostPID: true
      dnsPolicy: ClusterFirstWithHostNet
      initContainers:
      - name: run-connector
        image: {{ .Values.images.object.image }}:{{ .Values.images.object.tag }}
        securityContext:
          privileged: true
          capabilities:
            add: ["SYS_ADMIN"]
          allowPrivilegeEscalation: true
        command:
        - /run-connector.sh
        volumeMounts:
        - name: host-systemd-config
          mountPath: /host/usr/lib/systemd/system
        - name: host-etc
          mountPath: /host/etc/open-object
      containers:
      - name: driver-registrar
        image: {{ .Values.images.registrar.image }}:{{ .Values.images.registrar.tag }}
        args:
        - "--kubelet-registration-path=$(DRIVER_REG_SOCK_PATH)"
        - "--v=4"
        - "--csi-address=$(ADDRESS)"
        env:
        - name: ADDRESS
          value: /csi/csi.sock
        - name: DRIVER_REG_SOCK_PATH
          value: /var/lib/kubelet/plugins/object.csi.aliyun.com/csi.sock
        - name: KUBE_NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        ...
        volumeMounts:
        - name: plugin-dir
          mountPath: /csi
        - name: registration-dir
          mountPath: /registration/
      - name: csi-plugin
        securityContext:
          privileged: true
          capabilities:
            add: ["SYS_ADMIN"]
          allowPrivilegeEscalation: true
        image: {{ .Values.images.object.image }}:{{ .Values.images.object.tag }}
        args:
        - "csi"
        - "--endpoint=$(CSI_ENDPOINT)"
        - "--nodeID=$(NODE_ID)"
        - "--driver=object.csi.aliyun.com"
        env:
        - name: CSI_ENDPOINT
          value: unix:///csi/csi.sock
        - name: NODE_ID
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        - name: TZ
          value: Asia/Shanghai
        ...
        volumeMounts:
        - name: plugin-dir
          mountPath: /csi
        - name: pods-mount-dir
          mountPath: /var/lib/kubelet/pods
          mountPropagation: "Bidirectional"
        - name: fuse-device
          mountPath: /dev/fuse
        - name: host-etc
          mountPath: /host/etc/open-object
        - name: host-etc-os
          mountPath: /host/etc/os-release
      volumes:
      - name: registration-dir
        hostPath:
          path: /var/lib/kubelet/plugins_registry/
          type: DirectoryOrCreate
      - name: plugin-dir
        hostPath:
          path: /var/lib/kubelet/plugins/object.csi.aliyun.com
          type: DirectoryOrCreate
      - name: pods-mount-dir
        hostPath:
          path: /var/lib/kubelet/pods
          type: Directory
      - name: fuse-device
        hostPath:
          path: /dev/fuse
      - name: host-etc
        hostPath:
          path: /etc/open-object
          type: DirectoryOrCreate
      - name: host-etc-os
        hostPath:
          path: /etc/os-release
          type: File
      - name: host-systemd-config
        hostPath:
          path: /usr/lib/systemd/system
          type: DirectoryOrCreate

CSI Controller Plugin

下面這個 Deployment 部署的是 external-provisioner 組件,指定了通過哪個 socket 文件跟 CSI Controller Plugin 進行通信。但是,我們在這個 Deployment 中并沒有看到 CSI Controller Plugin 組件相關的容器。這是因為對于 OpenObject 這個項目來說,它將 CSI Node Plugin 和 CSI Controller Plugin 的代碼都實現在了上述提到的 csi-plugin 容器里。由于,csi-plugin 是通過 Daemonset 方式部署的,每個節點上都有,所以 external-provisioner 組件可以通過指定的 socket 文件跟 csi-plugin 容器通信。

kind: Deployment
apiVersion: apps/v1
metadata:
  name: open-object-csi-provisioner
  namespace: {{ .Values.namespace }}
  labels:
    app: open-object
    component: open-object-csi-provisioner
spec:
  selector:
    matchLabels:
      app: open-object
      component: open-object-csi-provisioner
  replicas: 1
  template:
    metadata:
      labels:
        app: open-object
        component: open-object-csi-provisioner
    spec:
      tolerations:
      - operator: Exists
        effect: NoSchedule
        key: node-role.kubernetes.io/master
      priorityClassName: system-cluster-critical
      serviceAccount: open-object
      hostNetwork: true
      dnsPolicy: ClusterFirstWithHostNet
      containers:
        - name: csi-provisioner
          image: {{ .Values.images.provisioner.image }}:{{ .Values.images.provisioner.tag }}
          args:
            - --csi-address=$(ADDRESS)
            - --volume-name-prefix=fuse
            - --extra-create-metadata=true
            - --timeout=10m
          env:
            - name: ADDRESS
              value: /var/lib/kubelet/plugins/object.csi.aliyun.com/csi.sock
            - name: TZ
              value: Asia/Shanghai
          ...
          volumeMounts:
            - name: socket-dir
              mountPath: /var/lib/kubelet/plugins/object.csi.aliyun.com
      volumes:
        - name: socket-dir
          hostPath:
            path: /var/lib/kubelet/plugins/object.csi.aliyun.com
            type: DirectoryOrCreate

根據上述的情況,我們總結下:

  • CSI Node Plugin 和 CSI Controller Plugin 都實現在同一個程序里。
  • 由于這個程序通過 Daemonset 部署到了每個節點上,因此相當于每個節點都運行著 CSI Node Plugin 和 CSI Controller Plugin 這兩個插件,并且都可以通過 csi.sock 文件實現跟 CSI Node Plugin 和 CSI Controller Plugin 的通信。所以,雖然 external-provisioner 是通過 Deployment 部署的,但是它仍舊可以通過 csi.sock 文件實現跟 CSI Controller Plugin 的通信。

進入源碼

代碼實現上,我們重點關注 CSI 接口的實現。

初始化

首先是整個 CSI Plugin 的初始化:

  • NewFuseDriver 函數初始化了一個 FuseDriver 結構體,這個結構體包含了 endpoint 也就是 CSI Plugin 監聽的 socket 文件地址。
  • ids、cs、ns 分別是 CSI Identity、CSI Controller 和 CSI Node 三類接口的具體實現。
func NewFuseDriver(nodeID, endpoint, driverName string, kubeClient *kubernetes.Clientset) (*FuseDriver, error) {
 driver := csicommon.NewCSIDriver(driverName, version.Version, nodeID)
 if driver == nil {
  klog.Fatalln("Failed to initialize CSI Driver.")
 }

 s3Driver := &FuseDriver{
  endpoint: endpoint,
  driver:   driver,
  ids:      newIdentityServer(driver),
  cs:       newControllerServer(driver),
  ns:       newNodeServer(driver),
 }
 return s3Driver, nil
}

func (s3 *FuseDriver) Run() {
 // Initialize default library driver
 s3.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
  csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
  csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
 })
 s3.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
  csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})

 s := csicommon.NewNonBlockingGRPCServer()
 s.Start(s3.endpoint, s3.ids, s3.cs, s3.ns)
 s.Wait()
}

CSI Controller 類接口的實現

先看下 CSI Controller 類接口的具體實現,主要實現的是 CreateVolume、DeleteVolume 和 ControllerExpandVolume 這三個接口。

  • CreateVolume 接口的核心實現就是在 minio 上創建一個 bucket。
  • DeleteVolume 接口的核心實現就是將 bucket 刪除。
  • ControllerExpandVolume 接口的核心實現就是將 bucket 的 quota 上限提高,調整 bucket 的 metadata。
type controllerServer struct {
 kubeClinet *kubernetes.Clientset
 *csicommon.DefaultControllerServer
}

func newControllerServer(d *csicommon.CSIDriver) *controllerServer {
 cfg, err := clientcmd.BuildConfigFromFlags("", "")
 if err != nil {
  klog.Fatalf("Error building kubeconfig: %s", err.Error())
 }
 kubeClient, err := kubernetes.NewForConfig(cfg)
 if err != nil {
  klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
 }
 return &controllerServer{
  DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
  kubeClinet:              kubeClient,
 }
}

func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
 ...

 // create volume
 return driver.CreateVolume(ctx, req)
}

func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
 ...

 return driver.DeleteVolume(ctx, req)
}

func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
 ...

 // expand volume
 return driver.ControllerExpandVolume(ctx, req)
}

func (driver *MinIODriver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
 ...
 if err := driver.minioClient.CreateBucket(bucketName, capacity); err != nil {
  return &csi.CreateVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
  ...
}

func (driver *MinIODriver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
 ...
 if err := driver.minioClient.DeleteBucket(bucketName); err != nil {
  return &csi.DeleteVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
 ...
}

func (driver *MinIODriver) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
 ...
 capacity := req.GetCapacityRange().RequiredBytes
 if DefaultFeatureGate.Enabled(Quota) {
  if err := driver.minioClient.SetBucketQuota(bucketName, capacity, madmin.HardQuota); err != nil {
   return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Internal, err.Error())
  }
 }

 bucketMap, err := driver.minioClient.GetBucketMetadata(bucketName)
 if err != nil {
  return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
 bucketMap[MetaDataCapacity] = strconv.FormatInt(capacity, 10)
 if err = driver.minioClient.SetBucketMetadata(bucketName, bucketMap); err != nil {
  return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }

 return &csi.ControllerExpandVolumeResponse{CapacityBytes: capacity, NodeExpansionRequired: false}, nil
}

CSI Node 類接口的實現

接下去看一下 CSI Node 類接口的具體實現,主要實現的是 NodePublishVolume、NodeUnpublishVolume 這兩個接口。NodeStageVolume 和 NodeUnstageVolume 這兩個接口并沒有相應的邏輯,也就是說不存在多個 Pod 共享一個 volume 的情況。

  • NodePublishVolume 接口會調用 MinIODriver 的 NodePublishVolume 接口。在這個接口中會封裝好的 s3fs 執行命令,并將執行命令發送給 connector,connector 在宿主機上執行相應的命令。這個命令的主要作用是將 minio 的 bucket 掛載到 target path 上,也就是 /var/lib/kubelet/pods/${pod uid}/volumes/kubernetes.io****~${CSI Plugin Name}/${PV name} 上。
  • NodeUnpublishVolume 接口會調用 FuseUmount 方法 unmount 掉 target path 上的掛載。代碼繼續跟蹤下去,它是在容器內部就執行了 unmount 方法。為什么在容器內部執行 unmount 方法就可以了呢?這是因為在部署這個容器的時候,把宿主機的 /var/lib/kubelet/pods/ 目錄就掛載到了容器的 /var/lib/kubelet/pods/ 上。
type nodeServer struct {
 *csicommon.DefaultNodeServer
}

func newNodeServer(d *csicommon.CSIDriver) *nodeServer {
 return &nodeServer{
  DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
 }
}

func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
 volumeID := req.GetVolumeId()
 targetPath := req.GetTargetPath()
  ...
  
 return driver.NodePublishVolume(ctx, req)
}

func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
 volumeID := req.GetVolumeId()
 targetPath := req.GetTargetPath()
  ...
  
 if err := common.FuseUmount(targetPath); err != nil {
  return &csi.NodeUnpublishVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
 klog.Infof("s3: mountpoint %s has been unmounted.", targetPath)

 return &csi.NodeUnpublishVolumeResponse{}, nil
}

func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
 return &csi.NodeStageVolumeResponse{}, nil
}

func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
 return &csi.NodeUnstageVolumeResponse{}, nil
}

// NodeGetCapabilities returns the supported capabilities of the node server
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
 ...
}

func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
 return &csi.NodeExpandVolumeResponse{}, status.Error(codes.Unimplemented, "NodeExpandVolume is not implemented")
}

// NodeGetVolumeStats used for csi metrics
func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
 return nil, nil
}

// NodePublishVolume 中的 driver.NodePublishVolume
func (driver *MinIODriver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
 pv, err := driver.kubeClinet.CoreV1().PersistentVolumes().Get(ctx, req.GetVolumeId(), metav1.GetOptions{})
 if err != nil {
  return &csi.NodePublishVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
 bucketName := pv.Spec.CSI.VolumeAttributes[ParamBucketNameTag]
 targetPath := req.GetTargetPath()

 notMnt, err := checkMount(targetPath)
 if err != nil {
  return &csi.NodePublishVolumeResponse{}, status.Error(codes.Internal, err.Error())
 }
 if !notMnt {
  return &csi.NodePublishVolumeResponse{}, nil
 }

 if err := S3FSMount(driver.Endpoint, bucketName, targetPath, driver.AK, driver.SK); err != nil {
  return &csi.NodePublishVolumeResponse{}, err
 }

 klog.Infof("s3: bucket %s successfully mounted to %s", bucketName, targetPath)

 return &csi.NodePublishVolumeResponse{}, nil
}

// NodeUnpublishVolume 中的 common.FuseUmount
func FuseUmount(path string) error {
 if err := mount.New("").Unmount(path); err != nil {
  return err
 }
 // as fuse quits immediately, we will try to wait until the process is done
 process, err := findFuseMountProcess(path)
 if err != nil {
  klog.Errorf("Error getting PID of fuse mount: %s", err)
  return nil
 }
 if process == nil {
  klog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path)
  return nil
 }
 klog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path)
 return waitForProcess(process, 1)
}

擴展認識

上文提到了 s3f3,s3fs 是一個開源的用戶態文件系統,它允許將 Amazon S3 bucket 掛載到 Linux 系統中,使其表現為本地文件系統。

通過 s3fs,用戶可以像訪問本地文件系統一樣訪問 Amazon S3 存儲桶中的對象。這使得開發人員可以直接在應用程序中使用 S3 存儲桶,而無需使用 Amazon S3 的 API 進行手動操作。s3fs 提供了對 Amazon S3 存儲桶的標準文件系統操作,例如讀取、寫入、復制、移動和刪除文件。它還支持文件權限、目錄結構和符號鏈接等常見的文件系統功能。由于 Amazon S3 和 minio 的 API 是兼容的,因此也可以用于 minio。

s3f3 github 地址:https://github.com/s3fs-fuse/s3fs-fuse

責任編輯:武曉燕 來源: 多選參數
相關推薦

2022-06-16 07:50:35

數據結構鏈表

2022-07-26 08:03:27

Kubernetes節點磁盤

2024-01-30 18:29:29

微服務架構Ingress

2023-08-01 12:51:18

WebGPT機器學習模型

2024-01-02 12:05:26

Java并發編程

2024-01-19 08:25:38

死鎖Java通信

2023-01-10 08:43:15

定義DDD架構

2024-02-04 00:00:00

Effect數據組件

2023-07-26 13:11:21

ChatGPT平臺工具

2023-12-07 12:29:49

Nginx負載均衡策略

2024-03-12 08:37:32

asyncawaitJavaScript

2024-08-12 08:12:38

2024-01-26 06:05:16

KuberneteseBPF網絡

2024-03-06 08:28:16

設計模式Java

2022-12-06 07:53:33

MySQL索引B+樹

2023-01-31 08:02:18

2023-10-06 14:49:21

SentinelHystrixtimeout

2022-07-13 08:16:49

RocketMQRPC日志

2023-05-05 06:54:07

MySQL數據查詢

2023-03-26 22:31:29

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲精品3 | 日韩精品一区二区三区视频播放 | 91精品国产91久久综合桃花 | 国产精品亚洲精品 | 久久aⅴ乱码一区二区三区 亚洲欧美综合精品另类天天更新 | 午夜在线小视频 | 国产日韩一区二区三区 | 国产午夜精品一区二区三区四区 | 国产98色在线 | 日韩 | 亚洲精品中文字幕在线观看 | 国产精品欧美大片 | 国产一区二区三区四区五区3d | 特级毛片| 日本久久黄色 | 青青草视频免费观看 | 日日干日日操 | 精品丝袜在线 | 在线视频中文字幕 | 久久久久国产一区二区三区不卡 | 中文字幕一区二区三区四区 | 成人免费小视频 | 奇米久久久 | 色网站在线| 免费看色| 日韩视频中文字幕 | 国产精品高潮呻吟久久aⅴ码 | 午夜影院网站 | 亚洲电影一区二区三区 | 国产成人精品午夜视频免费 | 亚洲精品一区在线观看 | 久久久人成影片免费观看 | 成年人黄色一级片 | 午夜影院| 日韩在线免费 | 日韩一区二区三区av | 亚洲成人午夜电影 | 国产sm主人调教女m视频 | 欧美综合一区二区三区 | 黄色片网站在线观看 | 国产乱码精品一区二区三区五月婷 | 日韩精品三区 |