PLEG执行原理
PLEG 执行原理-
在 NewMainKubelet() 方法中调用 NewGenericPLEG() 方法创建 GenericPLEG 对象
-
在 kubelet.Run() 方法中调用 pleg.Start() 方法启动 PLEG 的 relist 逻辑
-
relist是PLEG的核心,它从container runtime中查询属于kubelet管理的containers/sandboxes的信息,并与自身维护的 pods cache 信息进行对比,生成对应的 PodLifecycleEvent,然后输出到 eventChannel 中
-
syncLoop() 方法会定期通过 runtimeState 调用 Healthy() 方法判断 relist 是否正常,从而判断 node 状态是否正常
-
syncLoop() 方法会从 eventChannel 中获取 PLEG 事件,并通过 syncPod() 方法更新相关 Pod 信息
GenericPLEG
GenericPLEG 通过 relist 机制发现容器相关事件。relist 周期默认是1s,即上次 relist 完成后,等待1s再开始下次relist,relist的执行时间可能超过1s
type GenericPLEG struct {
relistPeriod time.Duration // relist 周期,默认是1s,即上次 relist 完成后,等待1s再开始下次relist,relist的执行时间可能超过1s
runtime kubecontainer.Runtime // 容器运行时
eventChannel chan *PodLifecycleEvent // PLEG 事件通道
podRecords podRecords // pod 和 container 信息的内部缓存
relistTime atomic.Value // 上次 relist 的时间
cache kubecontainer.Cache // 缓存所有 pod 的运行状态
clock clock.Clock
podsToReinspect map[types.UID]*kubecontainer.Pod // 在此次 relist 中获取状态失败的 pod,将会在下一次 relist时重试
}
NewGenericPLEG
通过调用 NewGenericPLEG() 方法创建一个新的 GenericPLEG 实例,该方法在 NewMainKubelet() 方法中被调用。
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
return &GenericPLEG{
relistPeriod: relistPeriod,
runtime: runtime,
eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
podRecords: make(podRecords),
cache: cache,
clock: clock,
}
}
Run()
Run() 方法的执行逻辑很简单,即周期性地执行 relist 操作。
func (g *GenericPLEG) Start() {
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}
relist
relist是PLEG的核心,它从container runtime中查询属于kubelet管理的containers/sandboxes的信息,并与自身维护的 pods cache 信息进行对比,生成对应的 PodLifecycleEvent,然后输出到 eventChannel 中,通过 eventChannel 发送到 kubelet syncLoop 进行消费,然后由 kubelet syncPod 来触发 pod 同步处理过程,最终达到用户的期望状态。
relist 的执行流程为:
-
从 CNI 获取所有的 Pod 信息
-
更新处于 running 状态的 pod (running_pods)和 container (running_containers)的统计指标
-
更新 podRecords 缓存,其中 pod id 为key
-
比较 podRecords 中 pod 的新旧版本中所有 container 的状态变化,并生成 PLEG 事件
-
从 pod 的新旧版本中取出所有的 container
-
遍历 container,比较 container 状态变化,并生成 PLEG 事件
-
记录 PLEG 事件
-
-
遍历上一步中生成的 PLEG 事件,如果有和某一 pod 相关的 PLEG 事件,我们需要更新 podCache
-
如果 pod == nil,则从 podCache 中删除该 pod
-
调用 docker inspect 接口获取 pod 中所有容器的状态,并更新 podCache
-
如果更新失败,将该 pod 记录到 podsToReinspect 中
-
如果更新成功,如果该 pod 在 podsToReinspect 中,从中移除
-
-
更新 podRecords 缓存,并发送 PLEG 事件到 eventChannel
-
重试 podsToReinspect 中在上次 relist 中更新失败的 pod
代码实现如下:
func (g *GenericPLEG) relist() {
// 1.从 CNI 获取所有的 Pod 信息
podList, err := g.runtime.GetPods(true)
pods := kubecontainer.Pods(podList)
// 2.更新处于 running 状态的 pod 和 container 的统计指标
updateRunningPodAndContainerMetrics(pods)
// 3.更新 podRecords 缓存
g.podRecords.setCurrent(pods)
// 4.比较 podRecords 中 pod 的新旧版本中所有 container 的状态变化,并生成事件
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// 从 pod 的新旧版本中取出所有的 container
allContainers := getContainersFromPods(oldPod, pod)
// 遍历 container,比较 container 状态变化,并生成事件
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e) // 记录事件
}
}
}
var needsReinspection map[types.UID]*kubecontainer.Pod
if g.cacheEnabled() {
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
}
// 5.如果有和某一 pod 相关的 PLEG 事件,我们需要更新 podCache
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {
// 调用 docker inspect 接口获取 pod 中所有容器的状态,并更新 podCache
if err := g.updateCache(pod, pid); err != nil {
// 记录更新失败的 pod
needsReinspection[pid] = pod
continue
} else {
// 如果该 pod 在 podsToReinspect 中,从中移除
delete(g.podsToReinspect, pid)
}
}
// 6.更新 podRecords 缓存,并发送 PLEG 事件到 eventChannel
g.podRecords.update(pid)
for i := range events {
// Filter out events that are not reliable and no other components use yet.
if events[i].Type == ContainerChanged {
continue
}
select {
case g.eventChannel <- events[i]: // 发送 PLEG 事件到 eventChannel
default:
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
}
}
}
if g.cacheEnabled() {
// 7.重试 podsToReinspect 中在上次 relist 中更新失败的 pod
if len(g.podsToReinspect) > 0 {
for pid, pod := range g.podsToReinspect {
if err := g.updateCache(pod, pid); err != nil {
needsReinspection[pid] = pod
}
}
}
}
g.podsToReinspect = needsReinspection
}
生成 PLEG 事件
-
如果 container 状态无变化,则不生成事件
-
如果 container 状态变成 running,则生成 ContainerStarted 事件
-
如果 container 状态变成 exited,则生成 ContainerDied 事件
-
如果 container 状态变成 unknown,则生成 ContainerChanged 事件
-
如果 container 状态变成 non-existent,且 pod 的旧状态为 exited,则生成 ContainerRemoved 事件,否则生成 ContainerDied 事件
func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
if newState == oldState {
return nil
}
klog.V(4).InfoS("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState)
switch newState {
case plegContainerRunning:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
case plegContainerExited:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
case plegContainerUnknown:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
case plegContainerNonExistent:
switch oldState {
case plegContainerExited:
// We already reported that the container died before.
return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
default:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
}
default:
panic(fmt.Sprintf("unrecognized container state: %v", newState))
}
}
Watch
Watch 方法用于获取 eventChannel,syncLoop() 会通过 Watch 方法获取 eventChannel,并从中获取有状态更新的 pod,然后由 kubelet syncPod 来触发 pod 同步处理过程,最终达到用户的期望状态。
func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
return g.eventChannel
}
Healthy
Kubelet中的NodeStatus机制会定期检查集群节点状况,并把节点状况同步到API Server。而NodeStatus判断节点就绪状况的一个主要依据,就是PLEG。
Healthy 方法用于检查 PLEG relist运行是否正常,如果超过 3分钟没有执行 relist,则认为该节点状态异常。
Healthy() 函数会以 “PLEG” 的形式添加到 runtimeState 中,Kubelet 在一个同步循环(syncLoop() 函数)中会定期(默认是 10s)调用 Healthy() 函数。Healthy() 函数会检查 relist 进程(PLEG 的关键任务)是否在 3 分钟内完成。如果 relist 进程的完成时间超过了 3 分钟,就会报告 PLEG is not healthy。当节点状态异常时,syncLoop 将不会继续 pod 状态同步逻辑。
func (g *GenericPLEG) Healthy() (bool, error) {
relistTime := g.getRelistTime()
if relistTime.IsZero() {
return false, fmt.Errorf("pleg has yet to be successful")
}
// Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
elapsed := g.clock.Since(relistTime)
if elapsed > relistThreshold {
return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
}
return true, nil
}
相关指标
指标名称 | 指标含义 |
---|---|
pleg_relist_duration_seconds | Duration in seconds for relisting pods in PLEG. |
pleg_discard_events | The number of discard events in PLEG. |
pleg_relist_interval_seconds | Interval in seconds between relisting in PLEG. |
pleg_last_seen_seconds | Timestamp in seconds when PLEG was last seen active. |
网友评论