美文网首页Kubernetes
Kubernetes PLEG 原理

Kubernetes PLEG 原理

作者: 王勇1024 | 来源:发表于2022-06-13 16:34 被阅读0次

PLEG执行原理

PLEG 执行原理
  1. 在 NewMainKubelet() 方法中调用 NewGenericPLEG() 方法创建 GenericPLEG 对象

  2. 在 kubelet.Run() 方法中调用 pleg.Start() 方法启动 PLEG 的 relist 逻辑

  3. relist是PLEG的核心,它从container runtime中查询属于kubelet管理的containers/sandboxes的信息,并与自身维护的 pods cache 信息进行对比,生成对应的 PodLifecycleEvent,然后输出到 eventChannel 中

  4. syncLoop() 方法会定期通过 runtimeState 调用 Healthy() 方法判断 relist 是否正常,从而判断 node 状态是否正常

  5. syncLoop() 方法会从 eventChannel 中获取 PLEG 事件,并通过 syncPod() 方法更新相关 Pod 信息

GenericPLEG

GenericPLEG 通过 relist 机制发现容器相关事件。relist 周期默认是1s,即上次 relist 完成后,等待1s再开始下次relist,relist的执行时间可能超过1s

type GenericPLEG struct {
    relistPeriod time.Duration             // relist 周期,默认是1s,即上次 relist 完成后,等待1s再开始下次relist,relist的执行时间可能超过1s
    runtime kubecontainer.Runtime          // 容器运行时
    eventChannel chan *PodLifecycleEvent   // PLEG 事件通道
    podRecords podRecords                  // pod 和 container 信息的内部缓存
    relistTime atomic.Value                // 上次 relist 的时间
    cache kubecontainer.Cache              // 缓存所有 pod 的运行状态
    clock clock.Clock
    podsToReinspect map[types.UID]*kubecontainer.Pod // 在此次 relist 中获取状态失败的 pod,将会在下一次 relist时重试
}

NewGenericPLEG

通过调用 NewGenericPLEG() 方法创建一个新的 GenericPLEG 实例,该方法在 NewMainKubelet() 方法中被调用。

func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
    relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
    return &GenericPLEG{
        relistPeriod: relistPeriod,
        runtime:      runtime,
        eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
        podRecords:   make(podRecords),
        cache:        cache,
        clock:        clock,
    }
}

Run()

Run() 方法的执行逻辑很简单,即周期性地执行 relist 操作。

func (g *GenericPLEG) Start() {
    go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}

relist

relist是PLEG的核心,它从container runtime中查询属于kubelet管理的containers/sandboxes的信息,并与自身维护的 pods cache 信息进行对比,生成对应的 PodLifecycleEvent,然后输出到 eventChannel 中,通过 eventChannel 发送到 kubelet syncLoop 进行消费,然后由 kubelet syncPod 来触发 pod 同步处理过程,最终达到用户的期望状态。

relist 的执行流程为:

  1. 从 CNI 获取所有的 Pod 信息

  2. 更新处于 running 状态的 pod (running_pods)和 container (running_containers)的统计指标

  3. 更新 podRecords 缓存,其中 pod id 为key

  4. 比较 podRecords 中 pod 的新旧版本中所有 container 的状态变化,并生成 PLEG 事件

    1. 从 pod 的新旧版本中取出所有的 container

    2. 遍历 container,比较 container 状态变化,并生成 PLEG 事件

    3. 记录 PLEG 事件

  5. 遍历上一步中生成的 PLEG 事件,如果有和某一 pod 相关的 PLEG 事件,我们需要更新 podCache

    1. 如果 pod == nil,则从 podCache 中删除该 pod

    2. 调用 docker inspect 接口获取 pod 中所有容器的状态,并更新 podCache

    3. 如果更新失败,将该 pod 记录到 podsToReinspect 中

    4. 如果更新成功,如果该 pod 在 podsToReinspect 中,从中移除

  6. 更新 podRecords 缓存,并发送 PLEG 事件到 eventChannel

  7. 重试 podsToReinspect 中在上次 relist 中更新失败的 pod

代码实现如下:

func (g *GenericPLEG) relist() {
    // 1.从 CNI 获取所有的 Pod 信息
    podList, err := g.runtime.GetPods(true)
    pods := kubecontainer.Pods(podList)
    // 2.更新处于 running 状态的 pod 和 container 的统计指标
    updateRunningPodAndContainerMetrics(pods)
  // 3.更新 podRecords 缓存
    g.podRecords.setCurrent(pods)
    // 4.比较 podRecords 中 pod 的新旧版本中所有 container 的状态变化,并生成事件
    eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
    for pid := range g.podRecords {
        oldPod := g.podRecords.getOld(pid)
        pod := g.podRecords.getCurrent(pid)
        // 从 pod 的新旧版本中取出所有的 container
        allContainers := getContainersFromPods(oldPod, pod)
    // 遍历 container,比较 container 状态变化,并生成事件
        for _, container := range allContainers {
            events := computeEvents(oldPod, pod, &container.ID)
            for _, e := range events {
                updateEvents(eventsByPodID, e) // 记录事件
            }
        }
    }

    var needsReinspection map[types.UID]*kubecontainer.Pod
    if g.cacheEnabled() {
        needsReinspection = make(map[types.UID]*kubecontainer.Pod)
    }
    // 5.如果有和某一 pod 相关的 PLEG 事件,我们需要更新 podCache
    for pid, events := range eventsByPodID {
        pod := g.podRecords.getCurrent(pid)
        if g.cacheEnabled() {
            // 调用 docker inspect 接口获取 pod 中所有容器的状态,并更新 podCache
            if err := g.updateCache(pod, pid); err != nil {
        // 记录更新失败的 pod
                needsReinspection[pid] = pod
                continue
            } else {
        // 如果该 pod 在 podsToReinspect 中,从中移除
                delete(g.podsToReinspect, pid)
            }
        }
    // 6.更新 podRecords 缓存,并发送 PLEG 事件到 eventChannel
        g.podRecords.update(pid)
        for i := range events {
            // Filter out events that are not reliable and no other components use yet.
            if events[i].Type == ContainerChanged {
                continue
            }
            select {
            case g.eventChannel <- events[i]: // 发送 PLEG 事件到 eventChannel
            default:
                metrics.PLEGDiscardEvents.Inc()
                klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
            }
        }
    }

    if g.cacheEnabled() {
        // 7.重试 podsToReinspect 中在上次 relist 中更新失败的 pod
        if len(g.podsToReinspect) > 0 {
            for pid, pod := range g.podsToReinspect {
                if err := g.updateCache(pod, pid); err != nil {
                    needsReinspection[pid] = pod
                }
            }
        }
    }
    g.podsToReinspect = needsReinspection
}

生成 PLEG 事件

  1. 如果 container 状态无变化,则不生成事件

  2. 如果 container 状态变成 running,则生成 ContainerStarted 事件

  3. 如果 container 状态变成 exited,则生成 ContainerDied 事件

  4. 如果 container 状态变成 unknown,则生成 ContainerChanged 事件

  5. 如果 container 状态变成 non-existent,且 pod 的旧状态为 exited,则生成 ContainerRemoved 事件,否则生成 ContainerDied 事件

func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
    if newState == oldState {
        return nil
    }

    klog.V(4).InfoS("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState)
    switch newState {
    case plegContainerRunning:
        return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
    case plegContainerExited:
        return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
    case plegContainerUnknown:
        return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
    case plegContainerNonExistent:
        switch oldState {
        case plegContainerExited:
            // We already reported that the container died before.
            return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
        default:
            return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
        }
    default:
        panic(fmt.Sprintf("unrecognized container state: %v", newState))
    }
}

Watch

Watch 方法用于获取 eventChannel,syncLoop() 会通过 Watch 方法获取 eventChannel,并从中获取有状态更新的 pod,然后由 kubelet syncPod 来触发 pod 同步处理过程,最终达到用户的期望状态。

func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
    return g.eventChannel
}

Healthy

Kubelet中的NodeStatus机制会定期检查集群节点状况,并把节点状况同步到API Server。而NodeStatus判断节点就绪状况的一个主要依据,就是PLEG。

Healthy 方法用于检查 PLEG relist运行是否正常,如果超过 3分钟没有执行 relist,则认为该节点状态异常。

Healthy() 函数会以 “PLEG” 的形式添加到 runtimeState 中,Kubelet 在一个同步循环(syncLoop() 函数)中会定期(默认是 10s)调用 Healthy() 函数。Healthy() 函数会检查 relist 进程(PLEG 的关键任务)是否在 3 分钟内完成。如果 relist 进程的完成时间超过了 3 分钟,就会报告 PLEG is not healthy。当节点状态异常时,syncLoop 将不会继续 pod 状态同步逻辑。

func (g *GenericPLEG) Healthy() (bool, error) {
    relistTime := g.getRelistTime()
    if relistTime.IsZero() {
        return false, fmt.Errorf("pleg has yet to be successful")
    }
    // Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
    metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
    elapsed := g.clock.Since(relistTime)
    if elapsed > relistThreshold {
        return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
    }
    return true, nil
}

相关指标

指标名称 指标含义
pleg_relist_duration_seconds Duration in seconds for relisting pods in PLEG.
pleg_discard_events The number of discard events in PLEG.
pleg_relist_interval_seconds Interval in seconds between relisting in PLEG.
pleg_last_seen_seconds Timestamp in seconds when PLEG was last seen active.

相关文章

网友评论

    本文标题:Kubernetes PLEG 原理

    本文链接:https://www.haomeiwen.com/subject/azbomrtx.html