美文网首页Docker容器k8s那点事儿Kubernetes
[k8s源码分析][kubelet] devicemanager

[k8s源码分析][kubelet] devicemanager

作者: nicktming | 来源:发表于2019-10-30 22:50 被阅读0次

    1. 前言

    转载请说明原文出处, 尊重他人劳动成果!

    源码位置: https://github.com/nicktming/kubernetes/tree/tming-v1.13/pkg/kubelet/cm/devicemanager
    分支: tming-v1.13 (基于v1.13版本)

    device manager and device plugin
    1. [k8s源码分析][kubelet] devicemanager 之 pod_devices 和 checkpoint
    2. [k8s源码分析][kubelet] devicemanager 之 使用device-plugin(模拟gpu)
    3. [k8s源码分析][kubelet] devicemanager 之 device-plugin向kubelet注册
    4. [k8s源码分析][kubelet] devicemanager 之 kubelet申请资源
    5. [k8s源码分析][kubelet] devicemanager 之 重启kubelet和device-plugin

    本文将分析devicemanager中会用到的podDevices. 该类主要用于记录pod使用了哪些resource.
    在持久化时会生成kubelet_internal_checkpoint保存到宿主机中, kubelet重启的时候会加载里面的数据.

    2. kubelet_internal_checkpoint

    {
      "Data": {
        "PodDeviceEntries": [
          {
            "PodUID": "pod1",
            "ContainerName": "con1",
            "ResourceName": "domain1.com/resource1",
            "DeviceIDs": [
              "dev1",
              "dev2"
            ],
            "AllocResp": "Eh0KDC9ob21lL3IxbGliMRILL3Vzci9yMWxpYjEYARofCgsvZGV2L3IxZGV2MRILL2Rldi9yMWRldjEaA21ydxofCgsvZGV2L3IxZGV2MhILL2Rldi9yMWRldjIaA21ydw=="
          },
          {
            "PodUID": "pod2",
            "ContainerName": "con1",
            "ResourceName": "domain1.com/resource1",
            "DeviceIDs": [
              "dev4"
            ],
            "AllocResp": "Eh0KDC9ob21lL3IxbGliMRILL3Vzci9yMWxpYjEYARofCgsvZGV2L3IxZGV2NBILL2Rldi9yMWRldjQaA21ydw=="
          }
        ],
        "RegisteredDevices": {
          "domain1.com/resource1": [
            "dev5",
            "dev1",
            "dev2",
            "dev3",
            "dev4"
          ],
          "domain2.com/resource2": [
            "dev1",
            "dev2"
          ]
        }
      },
      "Checksum": 3854436589
    }
    

    这是一个kubelet_internal_checkpoint例子, 可以看到有两个比较重要的部分:
    RegisteredDevices: 当前向kubelet注册的资源以及这些资源所拥有的设备, 需要指出的是这些设备是healthy的才会出现在这里, unhealthy的设备不会持久到kubelet_internal_checkpoint中.

    PodDeviceEntries: 这个就是本文要涉及的内容, 可以看到PodDeviceEntries中有五个属性分别为PodUID, ContainerName, ResourceName以及DeviceIDsAllocResp. 表明该pod(pod1)中的容器(con1)使用了资源(domain1.com/resource1)的设备(dev1dev2).

    另外AllocResp指的是一些env, mount信息等等是真正指示docker在启动容器的时候需要做什么操作. 比如(NVIDIA gpu-device-plugin中用了gpu dev1dev2会把它放到envNVIDIA_VISIBLE_DEVICES=dev1,dev2, 这样的话nvidia docker在启动的时候才会把相应的gpu设备投到容器中)

    该文件对应的数据结构如下:

    // pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go
    type PodDevicesEntry struct {
        PodUID        string
        ContainerName string
        ResourceName  string
        DeviceIDs     []string
        AllocResp     []byte
    }
    type checkpointData struct {
        PodDeviceEntries  []PodDevicesEntry
        RegisteredDevices map[string][]string
    }
    type Data struct {
        Data     checkpointData
        Checksum checksum.Checksum
    }
    

    另外对于该文件的操作

    // pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go
    type DeviceManagerCheckpoint interface {
        checkpointmanager.Checkpoint
        GetData() ([]PodDevicesEntry, map[string][]string)
    }
    func (cp *Data) MarshalCheckpoint() ([]byte, error) {
        cp.Checksum = checksum.New(cp.Data)
        return json.Marshal(*cp)
    }
    func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
        return json.Unmarshal(blob, cp)
    }
    func (cp *Data) VerifyChecksum() error {
        return cp.Checksum.Verify(cp.Data)
    }
    func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) {
        return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices
    }
    

    关于真正的文件操作这里就不多说了, 上面的主要做一些转化工作.

    3. 结构

    type deviceAllocateInfo struct {
        // deviceIds contains device Ids allocated to this container for the given resourceName.
        deviceIds sets.String
        // allocResp contains cached rpc AllocateResponse.
        allocResp *pluginapi.ContainerAllocateResponse
    }
    type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName.
    type containerDevices map[string]resourceAllocateInfo   // Keyed by containerName.
    type podDevices map[string]containerDevices             // Keyed by podUID.
    

    看了上面的kubelet_internal_checkpoint中的PodDeviceEntries, 再看这个结构很容易明白.

    就是podUID-containerName-resourceName才可以构成一个uuid, 然后这个uuid的内容就是deviceAllocateInfo.

    example.png

    看图片可能会更清晰点.

    4. 方法

    insert, delete, pods, get

    // 存入 重复的的会覆盖
    func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) {
        if _, podExists := pdev[podUID]; !podExists {
            pdev[podUID] = make(containerDevices)
        }
        if _, contExists := pdev[podUID][contName]; !contExists {
            pdev[podUID][contName] = make(resourceAllocateInfo)
        }
        pdev[podUID][contName][resource] = deviceAllocateInfo{
            deviceIds: devices,
            allocResp: resp,
        }
    }
    // 删除
    func (pdev podDevices) delete(pods []string) {
        for _, uid := range pods {
            delete(pdev, uid)
        }
    }
    
    

    常规对map的操作.

    get操作

    // 返回podUID-containerName-resoureName使用的设备
    func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.String {
        if _, podExists := pdev[podUID]; !podExists {
            return nil
        }
        if _, contExists := pdev[podUID][contName]; !contExists {
            return nil
        }
        devs, resourceExists := pdev[podUID][contName][resource]
        if !resourceExists {
            return nil
        }
        return devs.deviceIds
    }
    // 返回所有的pods
    func (pdev podDevices) pods() sets.String {
        ret := sets.NewString()
        for k := range pdev {
            ret.Insert(k)
        }
        return ret
    }
    //返回所有使用的设备 根据resourceName来区分(不区分pod,container)
    func (pdev podDevices) devices() map[string]sets.String {
        ret := make(map[string]sets.String)
        for _, containerDevices := range pdev {
            for _, resources := range containerDevices {
                for resource, devices := range resources {
                    if _, exists := ret[resource]; !exists {
                        ret[resource] = sets.NewString()
                    }
                    if devices.allocResp != nil {
                        ret[resource] = ret[resource].Union(devices.deviceIds)
                    }
                }
            }
        }
        return ret
    }
    // 返回podUID-containerName-resoureName使用的设备
    func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.String {
        if _, podExists := pdev[podUID]; !podExists {
            return nil
        }
        if _, contExists := pdev[podUID][contName]; !contExists {
            return nil
        }
        devs, resourceExists := pdev[podUID][contName][resource]
        if !resourceExists {
            return nil
        }
        return devs.deviceIds
    }
    

    map中根据各种规则来获取元素.

    toCheckpointData 和 fromCheckpointData

    // 将podDevices 转变成PodDevicesEntry
    func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
        var data []checkpoint.PodDevicesEntry
        for podUID, containerDevices := range pdev {
            for conName, resources := range containerDevices {
                for resource, devices := range resources {
                    devIds := devices.deviceIds.UnsortedList()
                    if devices.allocResp == nil {
                        klog.Errorf("Can't marshal allocResp for %v %v %v: allocation response is missing", podUID, conName, resource)
                        continue
                    }
                    // 将allocResp marshal
                    allocResp, err := devices.allocResp.Marshal()
                    if err != nil {
                        klog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
                        continue
                    }
                    data = append(data, checkpoint.PodDevicesEntry{
                        PodUID:        podUID,
                        ContainerName: conName,
                        ResourceName:  resource,
                        DeviceIDs:     devIds,
                        AllocResp:     allocResp})
                }
            }
        }
        return data
    }
    
    // Populates podDevices from the passed in checkpointData.
    func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
        for _, entry := range data {
            klog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n",
                entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp)
            devIDs := sets.NewString()
            for _, devID := range entry.DeviceIDs {
                devIDs.Insert(devID)
            }
            allocResp := &pluginapi.ContainerAllocateResponse{}
            err := allocResp.Unmarshal(entry.AllocResp)
            if err != nil {
                klog.Errorf("Can't unmarshal allocResp for %v %v %v: %v", entry.PodUID, entry.ContainerName, entry.ResourceName, err)
                continue
            }
            // 相当于每个数据再重新插入一次
            pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, devIDs, allocResp)
        }
    }
    

    toCheckpointData:podDevices转成PodDevicesEntry数组, PodDevicesEntry就是kubelet_internal_checkpoint的内容, 很清晰.
    fromCheckpointData:PodDevicesEntry数组的每个个体重新插入到podDevcies.

    就是数据结构直接的转换.

    deviceRunContainerOptions

    // 将某个pod中的某个容器的所有资源信息组合到一起
    // 因为某个pod中的某个容器很可能使用了很多资源, 每个资源的allocResp中包含了env, mount等等信息
    // 该方法就是将这个容器的每个资源的 env全部放一起 mount全部放一起 Annotations全部放一起等等 
    // 组成一个新的结构DeviceRunContainerOptions
    func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions {
        containers, exists := pdev[podUID]
        if !exists {
            return nil
        }
        resources, exists := containers[contName]
        if !exists {
            return nil
        }
        opts := &DeviceRunContainerOptions{}
        // Maps to detect duplicate settings.
        devsMap := make(map[string]string)
        mountsMap := make(map[string]string)
        envsMap := make(map[string]string)
        annotationsMap := make(map[string]string)
        // Loops through AllocationResponses of all cached device resources.
        for _, devices := range resources {
            resp := devices.allocResp
            for k, v := range resp.Envs {
                if e, ok := envsMap[k]; ok {
                    klog.V(4).Infof("Skip existing env %s %s", k, v)
                    if e != v {
                        klog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v)
                    }
                    continue
                }
                klog.V(4).Infof("Add env %s %s", k, v)
                envsMap[k] = v
                opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v})
            }
            ...
        }
        return opts
    }
    

    将某个pod中的某个容器的所有资源信息组合到一起
    因为某个pod中的某个容器很可能使用了很多资源, 每个资源的allocResp中包含了env, mount等等信息
    该方法就是将这个容器的每个资源的env全部放一起, mount全部放一起, Annotations全部放一起等等
    组成一个新的结构DeviceRunContainerOptions.

    // pkg/kubelet/cm/devicemanager/types.go
    type DeviceRunContainerOptions struct {
        // The environment variables list.
        Envs []kubecontainer.EnvVar
        // The mounts for the container.
        Mounts []kubecontainer.Mount
        // The host devices mapped into the container.
        Devices []kubecontainer.DeviceInfo
        // The Annotations for the container
        Annotations []kubecontainer.Annotation
    }
    

    相关文章

      网友评论

        本文标题:[k8s源码分析][kubelet] devicemanager

        本文链接:https://www.haomeiwen.com/subject/uxyavctx.html