美文网首页
从源码理解kubelet pleg

从源码理解kubelet pleg

作者: wwq2020 | 来源:发表于2024-01-04 11:08 被阅读0次

    pleg相关代码

    pkg/kubelet/pleg/generic.go中

    构建pleg
    func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
        relistDuration *RelistDuration, cache kubecontainer.Cache,
        clock clock.Clock) PodLifecycleEventGenerator {
        return &GenericPLEG{
            relistDuration: relistDuration,
            runtime:        runtime,
            eventChannel:   eventChannel,
            podRecords:     make(podRecords),
            cache:          cache,
            clock:          clock,
        }
    }
    
    func (g *GenericPLEG) Start() {
        g.runningMu.Lock()
        defer g.runningMu.Unlock()
        if !g.isRunning {
            g.isRunning = true
            g.stopCh = make(chan struct{})
            go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
        }
    }
    
    从cri获取pod状态更新cache
    func (g *GenericPLEG) Relist() {
    ...
    从cri获取pod
        podList, err := g.runtime.GetPods(ctx, true)
        if err != nil {
            klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
            return
        }
    ...
    更新relisttime
        g.updateRelistTime(timestamp)
    
    ...
    计算pod event
        eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
        for pid := range g.podRecords {
            oldPod := g.podRecords.getOld(pid)
            pod := g.podRecords.getCurrent(pid)
            // Get all containers in the old and the new pod.
            allContainers := getContainersFromPods(oldPod, pod)
            for _, container := range allContainers {
                events := computeEvents(oldPod, pod, &container.ID)
                for _, e := range events {
                    updateEvents(eventsByPodID, e)
                }
            }
        }
    ...
        for pid, events := range eventsByPodID {
    ...
    更新缓存
                    if err, _ := g.updateCache(ctx, pod, pid); err != nil {
    ...
    发送pod事件给kubelet sync循环
            for i := range events {
                // Filter out events that are not reliable and no other components use yet.
                if events[i].Type == ContainerChanged {
                    continue
                }
                select {
                case g.eventChannel <- events[i]:
                default:
                    metrics.PLEGDiscardEvents.Inc()
                    klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
                }
    ...
    }
    ...
    
    }
    
    
    更新cache
    func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (error, bool) {
    ...
        status, err := g.runtime.GetPodStatus(ctx, pod.ID, pod.Name, pod.Namespace)
    ...
        return err, g.cache.Set(pod.ID, status, err, timestamp)
    
    }
    
    
    是否健康(上次relist的时间到现在是否查过RelistThreshold,也就是3m)
    func (g *GenericPLEG) Healthy() (bool, error) {
        relistTime := g.getRelistTime()
        if relistTime.IsZero() {
            return false, fmt.Errorf("pleg has yet to be successful")
        }
        // Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
        metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
        elapsed := g.clock.Since(relistTime)
        if elapsed > g.relistDuration.RelistThreshold {
            return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, g.relistDuration.RelistThreshold)
        }
        return true, nil
    }
    
    
    
    更新本次relist的时间
    func (g *GenericPLEG) updateRelistTime(timestamp time.Time) {
        g.relistTime.Store(timestamp)
    }
    
    

    kubelet其余

        plegChannelCapacity = 1000
    
    func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        kubeDeps *Dependencies,
        crOptions *config.ContainerRuntimeOptions,
        hostname string,
        hostnameOverridden bool,
        nodeName types.NodeName,
        nodeIPs []net.IP,
        providerID string,
        cloudProvider string,
        certDirectory string,
        rootDirectory string,
        imageCredentialProviderConfigFile string,
        imageCredentialProviderBinDir string,
        registerNode bool,
        registerWithTaints []v1.Taint,
        allowedUnsafeSysctls []string,
        experimentalMounterPath string,
        kernelMemcgNotification bool,
        experimentalNodeAllocatableIgnoreEvictionThreshold bool,
        minimumGCAge metav1.Duration,
        maxPerPodContainerCount int32,
        maxContainerCount int32,
        registerSchedulable bool,
        keepTerminatedPodVolumes bool,
        nodeLabels map[string]string,
        nodeStatusMaxImages int32,
        seccompDefault bool,
    ) (*Kubelet, error) {
    ...
        eventChannel := make(chan *pleg.PodLifecycleEvent, plegChannelCapacity)
    
    ...
            genericRelistDuration := &pleg.RelistDuration{
                RelistPeriod:    genericPlegRelistPeriod,
                RelistThreshold: genericPlegRelistThreshold,
            }
            klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
    ...
        klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
    
        klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
        
    ...
    }
    
    sync 循环
    func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    ...
        plegCh := kl.pleg.Watch()
    ...
            if err := kl.runtimeState.runtimeErrors(); err != nil {
                klog.ErrorS(err, "Skipping pod synchronization")
                // exponential backoff
                time.Sleep(duration)
                duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
                continue
            }
    ...
            if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
                break
            }
    ...
    }
    
    从各种事件源获取事件,执行sync操作
    func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    ...
        case e := <-plegCh:
            if isSyncPodWorthy(e) {
                // PLEG event for a pod; sync it.
                if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                    klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
                    handler.HandlePodSyncs([]*v1.Pod{pod})
                } else {
                    // If the pod no longer exists, ignore the event.
                    klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
                }
            }
    
            if e.Type == pleg.ContainerDied {
                if containerID, ok := e.Data.(string); ok {
                    kl.cleanUpContainersInPod(e.ID, containerID)
                }
            }
    ...
    }
    

    pkg/kubelet/runtime.go中

    func (s *runtimeState) runtimeErrors() error {
        s.RLock()
        defer s.RUnlock()
        errs := []error{}
        if s.lastBaseRuntimeSync.IsZero() {
            errs = append(errs, errors.New("container runtime status check may not have completed yet"))
        } else if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
            errs = append(errs, errors.New("container runtime is down"))
        }
        for _, hc := range s.healthChecks {
            if ok, err := hc.fn(); !ok {
                errs = append(errs, fmt.Errorf("%s is not healthy: %v", hc.name, err))
            }
        }
        if s.runtimeError != nil {
            errs = append(errs, s.runtimeError)
        }
    
        return utilerrors.NewAggregate(errs)
    }
    
    

    pkg/kubelet/kubelet_node_status.go中

    pleg not healthy会导致node not ready
    func (kl *Kubelet) defaultNodeStatusFuncs() []func(context.Context, *v1.Node) error {
        setters = append(setters,
            nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
            nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
                kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent, kl.supportLocalStorageCapacityIsolation()),
            nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
            nodestatus.DaemonEndpoints(kl.daemonEndpoints),
            nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
            nodestatus.GoRuntime(),
        )
    }
    

    相关指标

    kubelet_pleg_relist_duration_seconds pleg relist耗时(单位秒)
    kubelet_pleg_discard_events pleg丢弃的事件数,可能sync loop处理过慢或事件过多,超过channel cap(1000)
    kubelet_pleg_relist_interval_seconds relist间隔(单位秒)
    kubelet_pleg_last_seen_seconds pleg上次活跃的时间戳(单位秒)
    

    相关文章

      网友评论

          本文标题:从源码理解kubelet pleg

          本文链接:https://www.haomeiwen.com/subject/uftbndtx.html