美文网首页
从源码理解InPlacePodVerticalScaling如何

从源码理解InPlacePodVerticalScaling如何

作者: wwq2020 | 来源:发表于2023-12-29 12:19 被阅读0次

背景

k8s 1.27前无法原地对pod进行垂直扩容(需要重建容器),因为kubelet根据pod中container的hash(包括resource)是否变化决定是否重建container
1.27后 根据pod.spec.containers.resizePolicy.restartPolicy是否需要重启

NotRequired不重启
RestartContainer重启

pod.status.resize会显示扩缩容处理情况

Proposed 已接受
InProgress 处理中
Deferred 当前无法扩缩容,后续重试
Infeasible 超过node可用资源

源码

pkg/kubelet/kuberuntime/kuberuntime_manager.go中
pod crud主流程

func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
...
    podContainerChanges := m.computePodActions(ctx, pod, podStatus)

...
    if isInPlacePodVerticalScalingAllowed(pod) {
        if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
            m.doPodResizeAction(pod, podStatus, podContainerChanges, result)
        }
    }
...
}

计算pod动作
func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
...
        } else if isInPlacePodVerticalScalingAllowed(pod) && !m.computePodResizeAction(pod, idx, containerStatus, &changes) {
            // computePodResizeAction updates 'changes' if resize policy requires restarting this container
            continue
...
}

计算扩缩容动作(哪些容器需要更新,容器是否需要重启)
func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) bool {
...
    determineContainerResize := func(rName v1.ResourceName, specValue, statusValue int64) (resize, restart bool) {
        if specValue == statusValue {
            return false, false
        }
        if resizePolicy[rName] == v1.RestartContainer {
            return true, true
        }
        return true, false
    }
...
    markContainerForUpdate := func(rName v1.ResourceName, specValue, statusValue int64) {
        cUpdateInfo := containerToUpdateInfo{
            apiContainerIdx:           containerIdx,
            kubeContainerID:           kubeContainerStatus.ID,
            desiredContainerResources: desiredResources,
            currentContainerResources: &currentResources,
        }
        // Order the container updates such that resource decreases are applied before increases
        switch {
        case specValue > statusValue: // append
            changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], cUpdateInfo)
        case specValue < statusValue: // prepend
            changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], containerToUpdateInfo{})
            copy(changes.ContainersToUpdate[rName][1:], changes.ContainersToUpdate[rName])
            changes.ContainersToUpdate[rName][0] = cUpdateInfo
        }
    }
...
    resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit)
    resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredCPULimit, currentCPULimit)
    resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredCPURequest, currentCPURequest)
    if restartCPULim || restartCPUReq || restartMemLim {
        // resize policy requires this container to restart
        changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{
            name:      kubeContainerStatus.Name,
            container: &pod.Spec.Containers[containerIdx],
            message:   fmt.Sprintf("Container %s resize requires restart", container.Name),
        }
        changes.ContainersToStart = append(changes.ContainersToStart, containerIdx)
        changes.UpdatePodResources = true
        return false
    } else {
        if resizeMemLim {
            markContainerForUpdate(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit)
        }
        if resizeCPULim {
            markContainerForUpdate(v1.ResourceCPU, desiredCPULimit, currentCPULimit)
        } else if resizeCPUReq {
            markContainerForUpdate(v1.ResourceCPU, desiredCPURequest, currentCPURequest)
        }
    }
...
}


pod扩缩容
func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podContainerChanges podActions, result kubecontainer.PodSyncResult) {
...
    resizeContainers := func(rName v1.ResourceName, currPodCgLimValue, newPodCgLimValue, currPodCgReqValue, newPodCgReqValue int64) error {
        var err error
        if newPodCgLimValue > currPodCgLimValue {
            if err = setPodCgroupConfig(rName, true); err != nil {
                return err
            }
        }
        if newPodCgReqValue > currPodCgReqValue {
            if err = setPodCgroupConfig(rName, false); err != nil {
                return err
            }
        }
        if len(podContainerChanges.ContainersToUpdate[rName]) > 0 {
            if err = m.updatePodContainerResources(pod, rName, podContainerChanges.ContainersToUpdate[rName]); err != nil {
                klog.ErrorS(err, "updatePodContainerResources failed", "pod", format.Pod(pod), "resource", rName)
                return err
            }
        }
        if newPodCgLimValue < currPodCgLimValue {
            err = setPodCgroupConfig(rName, true)
        }
        if newPodCgReqValue < currPodCgReqValue {
            if err = setPodCgroupConfig(rName, false); err != nil {
                return err
            }
        }
        return err
    }
...
    if len(podContainerChanges.ContainersToUpdate[v1.ResourceMemory]) > 0 || podContainerChanges.UpdatePodResources {
...
        if errResize := resizeContainers(v1.ResourceMemory, int64(*currentPodMemoryConfig.Memory), *podResources.Memory, 0, 0); errResize != nil {
            result.Fail(errResize)
            return
        }
    }
    if len(podContainerChanges.ContainersToUpdate[v1.ResourceCPU]) > 0 || podContainerChanges.UpdatePodResources {
...
        if errResize := resizeContainers(v1.ResourceCPU, *currentPodCpuConfig.CPUQuota, *podResources.CPUQuota,
            int64(*currentPodCpuConfig.CPUShares), int64(*podResources.CPUShares)); errResize != nil {
            result.Fail(errResize)
            return
        }
    }
}

更新pod容器资源
func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, resourceName v1.ResourceName, containersToUpdate []containerToUpdateInfo) error {
...
        if err := m.updateContainerResources(pod, container, cInfo.kubeContainerID); err != nil {
...
}


pkg/kubelet/kuberuntime/kuberuntime_container.go中
更新容器资源

func (m *kubeGenericRuntimeManager) updateContainerResources(pod *v1.Pod, container *v1.Container, containerID kubecontainer.ContainerID) error {
...
    err := m.runtimeService.UpdateContainerResources(ctx, containerID.ID, containerResources)

...
}

pkg/kubelet/cm/pod_container_manager_linux.go中
设置pod cgroup配置

func (m *podContainerManagerImpl) SetPodCgroupConfig(pod *v1.Pod, resource v1.ResourceName, resourceConfig *ResourceConfig) error {
    podCgroupName, _ := m.GetPodContainerName(pod)
    return m.cgroupManager.SetCgroupConfig(podCgroupName, resource, resourceConfig)
}

相关文章

网友评论

      本文标题:从源码理解InPlacePodVerticalScaling如何

      本文链接:https://www.haomeiwen.com/subject/kfojndtx.html