美文网首页
Kubelet Status Manager代码走读

Kubelet Status Manager代码走读

作者: 昔召阆梦 | 来源:发表于2020-03-03 21:00 被阅读0次

    kubelet status manager,这部分功能主要是负责同步pod.status,status manager并不像controller一样,主动监听pod状态的变化,而是提供接口给其他manager调用,主要是pod manager和prober manager使用。

    1. 初始化

    kubelet的结构体中由statusManager的定义,因此在初始化过程中,创建statusManager对象。这段代码在NewMainKubelet()中。

    // pkg/kubelet/kubelet.go: 671
    
    func NewMainKubelet(...){
        ...
        klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
        ...
    }
    

    NewManager()方法很简单,就是赋值。下面先看下status manager的结构体定义。

    2. 结构体定义

    // pkg/kubelet/status/status_manager.go: 62
    type manager struct {
        // 连接kube-apiserver的client
        kubeClient clientset.Interface
        // kubelet的组件,维护内存中pod数据
        podManager kubepod.Manager
        // 相当于tatus manager的缓存,保存的是podUID和状态的对应关系
        podStatuses      map[types.UID]versionedPodStatus
        // 互斥锁,更新podStatus时用
        podStatusesLock  sync.RWMutex
        // 接收更新podStatus的管道,收到消息就触发一次sync动作
        podStatusChannel chan podStatusSyncRequest
        // 维护mirror pod的status的版本号,更新一次加1,互斥访问
        apiStatusVersions map[kubetypes.MirrorPodUID]uint64
        // 删除pod的接口
        podDeletionSafety PodDeletionSafetyProvider
    }
    

    结构体中属性也相对较少,一眼就可以看出,status manager就是个工具类。

    3. 对外接口

    按照惯例,有了对象定义,就会实现一组接口或者方法,对外提供功能。Manager囊括了对外提供的接口定义。

    // pkg/kubelet/status/status_manager.go: 91
    type Manager interface {
        // 这也是个接口,通过podUID获取podStatus对象
        PodStatusProvider
    
        // status manager的启动方法
        Start()
    
        // 设置pod.status,同时触发一次sync动作
        SetPodStatus(pod *v1.Pod, status v1.PodStatus)
    
        // 检查pod的containerStatus是否ready,不是触发sync
        SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
    
        // 设置pod的containerStatus是否完成startUp,不是触发sync
        SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
    
        // 设置pod的container和init containers的状态为terminated
        TerminatePod(pod *v1.Pod)
    
        // 删除statusManager中缓存的无用数据
        RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
    }
    

    上面的接口都是对status字段操作,下面看一个pod.status具体长什么样:

    status:
      conditions:
      - lastProbeTime: null
        lastTransitionTime: "2020-02-22T05:54:27Z"
        status: "True"
        type: Initialized
      - lastProbeTime: null
        lastTransitionTime: "2020-02-22T05:54:37Z"
        status: "True"
        type: Ready
      - lastProbeTime: null
        lastTransitionTime: "2020-02-22T05:54:37Z"
        status: "True"
        type: ContainersReady
      - lastProbeTime: null
        lastTransitionTime: "2020-02-22T05:54:27Z"
        status: "True"
        type: PodScheduled
      containerStatuses:
      - containerID: docker://6bc05dd746f0cdc176efcc85a530f5936662d04909077c382491e8d5d38a7cdc
        image: 132.65.110.170:20202/op_svc_pom/webhook-arm_64:1.0.50
        imageID: docker://sha256:8623c55d61494ab0463c6febc3cab977df37563d467dab785e917cb7e22682b3
        lastState: {}
        name: webhook
        ready: true
        restartCount: 0
        state:
          running:
            startedAt: "2020-02-22T05:52:28Z"
      hostIP: 132.65.110.133
      initContainerStatuses:
      - containerID: docker://778b25dd389ffb59c1518b929c50ed37cdbe894590f7930f560eb54c351c491f
        image: 132.65.110.170:20202/op_svc_pom/webhook-arm_64:1.0.50
        imageID: docker://sha256:8623c55d61494ab0463c6febc3cab977df37563d467dab785e917cb7e22682b3
        lastState: {}
        name: webhook-init
        ready: true
        restartCount: 0
        state:
          terminated:
            containerID: docker://778b25dd389ffb59c1518b929c50ed37cdbe894590f7930f560eb54c351c491f
            exitCode: 0
            finishedAt: "2020-02-22T05:52:26Z"
            reason: Completed
            startedAt: "2020-02-22T05:52:26Z"
      managementIP: 132.65.110.133
      phase: Running
      podIP: 132.65.110.133
      podNetworks:
      - iP:
        - ""
        name: eth0
        network: fst-manage
      qosClass: Burstable
      startTime: "2020-02-22T05:54:27Z"
    

    3.1 Start()

    首先看下start接口,kubelet的Run()方法,将statusManager跑起来的。

    // pkg/kubelet/kubelet.go:1442
    func Run(updates <-chan kubetypes.PodUpdate){
        ...
        kl.statusManager.Start()
        ...
    }
    

    Start()代码如下:

    // pkg/kubelet/status/status_manager.go: 149
    func (m *manager) Start() {
        if m.kubeClient == nil {
            klog.Infof("Kubernetes client is nil, not starting status manager.")
            return
        }
    
        klog.Info("Starting to sync pod status with apiserver")
        // 1. 开启定时器,每隔10s触发一次批量同步
        syncTicker := time.Tick(syncPeriod)
        // 2. 启动协程,处理podStatusChannel传过来的待更新pod
        go wait.Forever(func() {
            select {
            case syncRequest := <-m.podStatusChannel:
                klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
                    syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
                m.syncPod(syncRequest.podUID, syncRequest.status)
            case <-syncTicker:
                m.syncBatch()
            }
        }, 0)
    }
    

    Start()就做了2件事,其一是作为消费者,等待管道那头的通知,sync某一个pod;其二是每个10秒来个批量同步。

    m.syncPod()

    // pkg/kubelet/status/status_manager.go: 516
    func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
        // 1.判断要不要更新,不需要更新直接return
        if !m.needsUpdate(uid, status) {
            klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
            return
        }
    
        // 2.获取pod当前的实例
        pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
        if errors.IsNotFound(err) {
            klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid))
            // If the Pod is deleted the status will be cleared in
            // RemoveOrphanedStatuses, so we just ignore the update here.
            return
        }
        if err != nil {
            klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
            return
        }
    
        // 3.拿到pod的真实UID,对于mirror pod,获取静态pod的id,其他pod就是转换id的类型
        translatedUID := m.podManager.TranslatePodUID(pod.UID)
        // 4.确认pod是否刚刚重建过,是就return
        if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
            klog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
            m.deletePodStatus(uid)
            return
        }
    
        // 5. 检查通过,调用patch接口
        oldStatus := pod.Status.DeepCopy()
        newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
        klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
        if err != nil {
            klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
            return
        }
        pod = newPod
    
        klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
        m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
    
        // 6. 判断status是否可以删除,删除pod并清理statusManager保存该pod的缓存数据
        if m.canBeDeleted(pod, status.status) {
            deleteOptions := metav1.NewDeleteOptions(0)
            // Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace.
            deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
            err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
            if err != nil {
                klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
                return
            }
            klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
            m.deletePodStatus(uid)
        }
    }
    

    上面代码的注释,基本描述了syncPod的主体逻辑,还有几个细节注意一下:

    needUpdate()

    除了mirror pod和已经删除的pod需要特殊处理,其他都要看pod的资源是不是都回收完成了,也就是PodResourcesAreReclaimed()。

    // pkg/kubelet/status/status_manager.go: 572
    func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
        latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)]
        // 如果不是mirror pod或者缓存的status version较低,要更新
        if !ok || latest < status.version {
            return true
        }
        // pod不存在,不更新
        pod, ok := m.podManager.GetPodByUID(uid)
        if !ok {
            return false
        }
        return m.canBeDeleted(pod, status.status)
    }
    
    func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus) bool {
        // 没有删除时间或者是mirror pod,不需要更新
        if pod.DeletionTimestamp == nil || kubetypes.IsMirrorPod(pod) {
            return false
        }
        return m.podDeletionSafety.PodResourcesAreReclaimed(pod, status)
    }
    

    PodResourcesAreReclaimed()

    判断pod的资源是否回收完成,PodResourcesAreReclaimed()检查以下几个维度:

    • pod中所有container都不是Running状态
    • 检查kubelet的podCache,pod的container是否删干净
    • pod的volume是否清理干净
    • pod的cgroup是否清理干净

    以上检查均通过,表示可以更新。

    // pkg/kubelet/kubelet_pods.go: 912
    func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
        // 1.pod的container已经不是running状态,不更新
        if !notRunning(status.ContainerStatuses) {
            // We shouldn't delete pods that still have running containers
            klog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
            return false
        }
        // 2.从kubelet保存的podCache中取出pod,pod已经被删除,或者container有残留,不更新
        runtimeStatus, err := kl.podCache.Get(pod.UID)
        if err != nil {
            klog.V(3).Infof("Pod %q is terminated, Error getting runtimeStatus from the podCache: %s", format.Pod(pod), err)
            return false
        }
        if len(runtimeStatus.ContainerStatuses) > 0 {
            var statusStr string
            for _, status := range runtimeStatus.ContainerStatuses {
                statusStr += fmt.Sprintf("%+v ", *status)
            }
            klog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %s", format.Pod(pod), statusStr)
            return false
        }
        // 3.pod的volume还有残留,不更新
        if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
            klog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
            return false
        }
        // 4.pod的cgroup没有清理,不更新
        if kl.kubeletConfiguration.CgroupsPerQOS {
            pcm := kl.containerManager.NewPodContainerManager()
            if pcm.Exists(pod) {
                klog.V(3).Infof("Pod %q is terminated, but pod cgroup sandbox has not been cleaned up", format.Pod(pod))
                return false
            }
        }
        return true
    }
    

    m.syncBacth()

    在看m.syncBacth()代码之前,既然是全量更新,应该是维护status manager维护的缓存,调用syncPod依次更新。

    // pkg/kubelet/status/status_manager.go: 471
    func (m *manager) syncBatch() {
        var updatedStatuses []podStatusSyncRequest
        podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
        // 核心逻辑包装成了匿名函数
        func() { // Critical section
            m.podStatusesLock.RLock()
            defer m.podStatusesLock.RUnlock()
    
            // 清理mirror pod中的残留
            for uid := range m.apiStatusVersions {
                _, hasPod := m.podStatuses[types.UID(uid)]
                _, hasMirror := mirrorToPod[uid]
                if !hasPod && !hasMirror {
                    delete(m.apiStatusVersions, uid)
                }
            }
    
            // 遍历维护的缓存podStatus
            for uid, status := range m.podStatuses {
                syncedUID := kubetypes.MirrorPodUID(uid)
                // mirror pod 跳过
                if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
                    if mirrorUID == "" {
                        klog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
                        continue
                    }
                    syncedUID = mirrorUID
                }
                if m.needsUpdate(types.UID(syncedUID), status) {
                    // 需要更新的加入待更新数组
                    updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
                } else if m.needsReconcile(uid, status.status) {
                    // status manager维护的缓存和请求apiserver得到的结果不一致的,也需要更新
                    delete(m.apiStatusVersions, syncedUID)
                    updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
                }
            }
        }()
    
        // 依次更新
        for _, update := range updatedStatuses {
            klog.V(5).Infof("Status Manager: syncPod in syncbatch. pod UID: %q", update.podUID)
            m.syncPod(update.podUID, update.status)
        }
    }
    

    看了代码,果然不出所料。总结一下,syncBatch()做了以下操作:

    • 获取当前static pod 和 mirror pod互相对应关系,删除status manager维护的apiStatusVersions中多余的数据
    • 遍历status manager维护的podStatus缓存,筛选出status version不一致和pod status需要更新的的podUID,依次调用syncPod()方法更新。

    needsReconcile()

    其中有个needsReconcile()方法,判断入参status和从apiserver请求回来的status是否一致,核心就是检查pod.status.conditions内容是否一致。kubelet的conditions有:Unschedulable、PodScheduled、Initialized、ContainersReady、Ready。具体代码如下:

    // pkg/kubelet/status/status_manager.go: 690
    func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
        // 1.请求apiserver,获取当前pod对象内容
        pod, ok := m.podManager.GetPodByUID(uid)
        if !ok {
            klog.V(4).Infof("Pod %q has been deleted, no need to reconcile", string(uid))
            return false
        }
        // 2.如果pod是静态pod,找到它的mirror pod
        if kubetypes.IsStaticPod(pod) {
            mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod)
            if !ok {
                klog.V(4).Infof("Static pod %q has no corresponding mirror pod, no need to reconcile", format.Pod(pod))
                return false
            }
            pod = mirrorPod
        }
    
        podStatus := pod.Status.DeepCopy()
        // 3.这个方法是podStatus中时间戳字段,精度降低,从*RFC339NANO* -> *RFC3339*
        normalizeStatus(pod, podStatus)
    
        // 4.检查podStatus的conditions,属于kubelet维护的那些conditions,检查是否一致
        if isPodStatusByKubeletEqual(podStatus, &status) {
            // If the status from the source is the same with the cached status,
            // reconcile is not needed. Just return.
            return false
        }
        klog.V(3).Infof("Pod status is inconsistent with cached status for pod %q, a reconciliation should be triggered:\n %s", format.Pod(pod),
            diff.ObjectDiff(podStatus, &status))
    
        return true
    }
    

    3.2 SetPodStatus()

    给定pod设置pod status,是提供给kubelet的核心业务syncPod()调用的。方法很简单,就是调用updateStatusInternal()方法触发更新。

    // pkg/kubelet/status/status_manager.go: 181
    func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
        m.podStatusesLock.Lock()
        defer m.podStatusesLock.Unlock()
    
        for _, c := range pod.Status.Conditions {
            if !kubetypes.PodConditionByKubelet(c.Type) {
                klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+
                    "But it is not owned by kubelet.", string(c.Type), format.Pod(pod))
            }
        }
        // Make sure we're caching a deep copy.
        status = *status.DeepCopy()
    
        m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil)
    }
    

    m.updateStatusInternal()

    核心业务都在这里实现,前面做一大堆准备工作,其实就是构建newPodStatus,然后往podStatusChannel发送一个信号,触发status manager的syncPod()动作。

    // pkg/kubelet/status/status_manager.go: 364
    func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {
        // 1.从缓存中取出old status
        var oldStatus v1.PodStatus
        cachedStatus, isCached := m.podStatuses[pod.UID]
        if isCached {
            oldStatus = cachedStatus.status
        } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
            oldStatus = mirrorPod.Status
        } else {
            oldStatus = pod.Status
        }
    
        // 2.检查pod的containers和init containers不会从teminated状态转化到非terminated
        if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
            klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
            return false
        }
        if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
            klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
            return false
        }
    
        // 3.更新操作时间
        updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)
        updateLastTransitionTime(&status, &oldStatus, v1.PodReady)
        updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)
        updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)
    
        // 4.维护status.startTime
        if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
            status.StartTime = oldStatus.StartTime
        } else if status.StartTime.IsZero() {
            // if the status has no start time, we need to set an initial time
            now := metav1.Now()
            status.StartTime = &now
        }
    
        // 5.格式化时间戳
        normalizeStatus(pod, &status)
        // 6.只允许单调更新
        if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
            klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
            return false // No new status.
        }
    
        // 组装对象
        newStatus := versionedPodStatus{
            status:       status,
            version:      cachedStatus.version + 1,
            podName:      pod.Name,
            podNamespace: pod.Namespace,
        }
        m.podStatuses[pod.UID] = newStatus
    
        select {
            // 7.发送信号,触发syncPod动作
        case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
            klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
                pod.UID, newStatus.version, newStatus.status)
            return true
        default:
            klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
                format.Pod(pod), status)
            return false
        }
    }
    

    其他的几个接口,代码流程基本一致,这里就不一一赘述了。

    • SetContainerStartup()和SetContainerReadiness()是给prober manager用的,探针检查完毕后,更新下容器状态。startUp是0/1,readiness是1/1。
    • TerminatePod()是kubelet在dispatchWork()时,做更新之前,检查container是否是待删除状态。
    • RemoveOrphanedStatuses()是kubelet在执行houseKeeping时调用,cleanPod同时,顺便清理下status manager的无用缓存。

    相关文章

      网友评论

          本文标题:Kubelet Status Manager代码走读

          本文链接:https://www.haomeiwen.com/subject/kbsllhtx.html