美文网首页
从源码理解InPlacePodVerticalScaling如何

从源码理解InPlacePodVerticalScaling如何

作者: wwq2020 | 来源:发表于2023-12-29 12:19 被阅读0次

    背景

    k8s 1.27前无法原地对pod进行垂直扩容(需要重建容器),因为kubelet根据pod中container的hash(包括resource)是否变化决定是否重建container
    1.27后 根据pod.spec.containers.resizePolicy.restartPolicy是否需要重启

    NotRequired不重启
    RestartContainer重启
    

    pod.status.resize会显示扩缩容处理情况

    Proposed 已接受
    InProgress 处理中
    Deferred 当前无法扩缩容,后续重试
    Infeasible 超过node可用资源
    

    源码

    pkg/kubelet/kuberuntime/kuberuntime_manager.go中
    pod crud主流程

    func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    ...
        podContainerChanges := m.computePodActions(ctx, pod, podStatus)
    
    ...
        if isInPlacePodVerticalScalingAllowed(pod) {
            if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
                m.doPodResizeAction(pod, podStatus, podContainerChanges, result)
            }
        }
    ...
    }
    
    计算pod动作
    func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
    ...
            } else if isInPlacePodVerticalScalingAllowed(pod) && !m.computePodResizeAction(pod, idx, containerStatus, &changes) {
                // computePodResizeAction updates 'changes' if resize policy requires restarting this container
                continue
    ...
    }
    
    计算扩缩容动作(哪些容器需要更新,容器是否需要重启)
    func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) bool {
    ...
        determineContainerResize := func(rName v1.ResourceName, specValue, statusValue int64) (resize, restart bool) {
            if specValue == statusValue {
                return false, false
            }
            if resizePolicy[rName] == v1.RestartContainer {
                return true, true
            }
            return true, false
        }
    ...
        markContainerForUpdate := func(rName v1.ResourceName, specValue, statusValue int64) {
            cUpdateInfo := containerToUpdateInfo{
                apiContainerIdx:           containerIdx,
                kubeContainerID:           kubeContainerStatus.ID,
                desiredContainerResources: desiredResources,
                currentContainerResources: &currentResources,
            }
            // Order the container updates such that resource decreases are applied before increases
            switch {
            case specValue > statusValue: // append
                changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], cUpdateInfo)
            case specValue < statusValue: // prepend
                changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], containerToUpdateInfo{})
                copy(changes.ContainersToUpdate[rName][1:], changes.ContainersToUpdate[rName])
                changes.ContainersToUpdate[rName][0] = cUpdateInfo
            }
        }
    ...
        resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit)
        resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredCPULimit, currentCPULimit)
        resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredCPURequest, currentCPURequest)
        if restartCPULim || restartCPUReq || restartMemLim {
            // resize policy requires this container to restart
            changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{
                name:      kubeContainerStatus.Name,
                container: &pod.Spec.Containers[containerIdx],
                message:   fmt.Sprintf("Container %s resize requires restart", container.Name),
            }
            changes.ContainersToStart = append(changes.ContainersToStart, containerIdx)
            changes.UpdatePodResources = true
            return false
        } else {
            if resizeMemLim {
                markContainerForUpdate(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit)
            }
            if resizeCPULim {
                markContainerForUpdate(v1.ResourceCPU, desiredCPULimit, currentCPULimit)
            } else if resizeCPUReq {
                markContainerForUpdate(v1.ResourceCPU, desiredCPURequest, currentCPURequest)
            }
        }
    ...
    }
    
    
    pod扩缩容
    func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podContainerChanges podActions, result kubecontainer.PodSyncResult) {
    ...
        resizeContainers := func(rName v1.ResourceName, currPodCgLimValue, newPodCgLimValue, currPodCgReqValue, newPodCgReqValue int64) error {
            var err error
            if newPodCgLimValue > currPodCgLimValue {
                if err = setPodCgroupConfig(rName, true); err != nil {
                    return err
                }
            }
            if newPodCgReqValue > currPodCgReqValue {
                if err = setPodCgroupConfig(rName, false); err != nil {
                    return err
                }
            }
            if len(podContainerChanges.ContainersToUpdate[rName]) > 0 {
                if err = m.updatePodContainerResources(pod, rName, podContainerChanges.ContainersToUpdate[rName]); err != nil {
                    klog.ErrorS(err, "updatePodContainerResources failed", "pod", format.Pod(pod), "resource", rName)
                    return err
                }
            }
            if newPodCgLimValue < currPodCgLimValue {
                err = setPodCgroupConfig(rName, true)
            }
            if newPodCgReqValue < currPodCgReqValue {
                if err = setPodCgroupConfig(rName, false); err != nil {
                    return err
                }
            }
            return err
        }
    ...
        if len(podContainerChanges.ContainersToUpdate[v1.ResourceMemory]) > 0 || podContainerChanges.UpdatePodResources {
    ...
            if errResize := resizeContainers(v1.ResourceMemory, int64(*currentPodMemoryConfig.Memory), *podResources.Memory, 0, 0); errResize != nil {
                result.Fail(errResize)
                return
            }
        }
        if len(podContainerChanges.ContainersToUpdate[v1.ResourceCPU]) > 0 || podContainerChanges.UpdatePodResources {
    ...
            if errResize := resizeContainers(v1.ResourceCPU, *currentPodCpuConfig.CPUQuota, *podResources.CPUQuota,
                int64(*currentPodCpuConfig.CPUShares), int64(*podResources.CPUShares)); errResize != nil {
                result.Fail(errResize)
                return
            }
        }
    }
    
    更新pod容器资源
    func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, resourceName v1.ResourceName, containersToUpdate []containerToUpdateInfo) error {
    ...
            if err := m.updateContainerResources(pod, container, cInfo.kubeContainerID); err != nil {
    ...
    }
    
    
    

    pkg/kubelet/kuberuntime/kuberuntime_container.go中
    更新容器资源

    func (m *kubeGenericRuntimeManager) updateContainerResources(pod *v1.Pod, container *v1.Container, containerID kubecontainer.ContainerID) error {
    ...
        err := m.runtimeService.UpdateContainerResources(ctx, containerID.ID, containerResources)
    
    ...
    }
    

    pkg/kubelet/cm/pod_container_manager_linux.go中
    设置pod cgroup配置

    func (m *podContainerManagerImpl) SetPodCgroupConfig(pod *v1.Pod, resource v1.ResourceName, resourceConfig *ResourceConfig) error {
        podCgroupName, _ := m.GetPodContainerName(pod)
        return m.cgroupManager.SetCgroupConfig(podCgroupName, resource, resourceConfig)
    }
    

    相关文章

      网友评论

          本文标题:从源码理解InPlacePodVerticalScaling如何

          本文链接:https://www.haomeiwen.com/subject/kfojndtx.html