美文网首页
kubelet重启导致container重启源码分析

kubelet重启导致container重启源码分析

作者: wwq2020 | 来源:发表于2021-10-09 16:02 被阅读0次

    kubelet重启后pleg的中记录pod status的cache未更新完整时,kubelet收到k8s事件,由于查询不到pod status,计算出来的hash显然和当前的不一致,故而重启

    假设kubelet升级,container这个struct产生了变化,hash变化也会导致重启

    cmd/kubelet/kubelet.go中

    func main() {
        rand.Seed(time.Now().UnixNano())
    
        command := app.NewKubeletCommand()
        logs.InitLogs()
        defer logs.FlushLogs()
    
        if err := command.Execute(); err != nil {
            os.Exit(1)
        }
    }
    

    cmd/kubelet/app/server.go中

    func NewKubeletCommand() *cobra.Command {
        ...
        if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
            klog.ErrorS(err, "Failed to run kubelet")
            os.Exit(1)
        }
        ...
    }
    
    func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
        logOption := &logs.Options{Config: s.Logging}
        logOption.Apply()
        // To help debugging, immediately log version
        klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
        if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
            return fmt.Errorf("failed OS init: %w", err)
        }
        if err := run(ctx, s, kubeDeps, featureGate); err != nil {
            return fmt.Errorf("failed to run Kubelet: %w", err)
        }
        return nil
    }
    
    func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
        ...
        if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
            return err
        }
        ...
    }
    
    func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
        ...
        k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
            kubeDeps,
            &kubeServer.ContainerRuntimeOptions,
            kubeServer.ContainerRuntime,
            hostname,
            hostnameOverridden,
            nodeName,
            nodeIPs,
            kubeServer.ProviderID,
            kubeServer.CloudProvider,
            kubeServer.CertDirectory,
            kubeServer.RootDirectory,
            kubeServer.ImageCredentialProviderConfigFile,
            kubeServer.ImageCredentialProviderBinDir,
            kubeServer.RegisterNode,
            kubeServer.RegisterWithTaints,
            kubeServer.AllowedUnsafeSysctls,
            kubeServer.ExperimentalMounterPath,
            kubeServer.KernelMemcgNotification,
            kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
            kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
            kubeServer.MinimumGCAge,
            kubeServer.MaxPerPodContainerCount,
            kubeServer.MaxContainerCount,
            kubeServer.MasterServiceNamespace,
            kubeServer.RegisterSchedulable,
            kubeServer.KeepTerminatedPodVolumes,
            kubeServer.NodeLabels,
            kubeServer.NodeStatusMaxImages,
            kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault,
        )
        if err != nil {
            return fmt.Errorf("failed to create kubelet: %w", err)
        }
        ...
        startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
    }
    
    
    func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
        // start the kubelet
        go k.Run(podCfg.Updates())
    
        // start the kubelet server
        if enableServer {
            go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth)
        }
        if kubeCfg.ReadOnlyPort > 0 {
            go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
        }
        if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
            go k.ListenAndServePodResources()
        }
    }
    
    func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        kubeDeps *kubelet.Dependencies,
        crOptions *config.ContainerRuntimeOptions,
        containerRuntime string,
        hostname string,
        hostnameOverridden bool,
        nodeName types.NodeName,
        nodeIPs []net.IP,
        providerID string,
        cloudProvider string,
        certDirectory string,
        rootDirectory string,
        imageCredentialProviderConfigFile string,
        imageCredentialProviderBinDir string,
        registerNode bool,
        registerWithTaints []api.Taint,
        allowedUnsafeSysctls []string,
        experimentalMounterPath string,
        kernelMemcgNotification bool,
        experimentalCheckNodeCapabilitiesBeforeMount bool,
        experimentalNodeAllocatableIgnoreEvictionThreshold bool,
        minimumGCAge metav1.Duration,
        maxPerPodContainerCount int32,
        maxContainerCount int32,
        masterServiceNamespace string,
        registerSchedulable bool,
        keepTerminatedPodVolumes bool,
        nodeLabels map[string]string,
        nodeStatusMaxImages int32,
        seccompDefault bool,
    ) (k kubelet.Bootstrap, err error) {
        ...
        k, err = kubelet.NewMainKubelet(kubeCfg,
            kubeDeps,
            crOptions,
            containerRuntime,
            hostname,
            hostnameOverridden,
            nodeName,
            nodeIPs,
            providerID,
            cloudProvider,
            certDirectory,
            rootDirectory,
            imageCredentialProviderConfigFile,
            imageCredentialProviderBinDir,
            registerNode,
            registerWithTaints,
            allowedUnsafeSysctls,
            experimentalMounterPath,
            kernelMemcgNotification,
            experimentalCheckNodeCapabilitiesBeforeMount,
            experimentalNodeAllocatableIgnoreEvictionThreshold,
            minimumGCAge,
            maxPerPodContainerCount,
            maxContainerCount,
            masterServiceNamespace,
            registerSchedulable,
            keepTerminatedPodVolumes,
            nodeLabels,
            nodeStatusMaxImages,
            seccompDefault,
        )
        if err != nil {
            return nil, err
        }
        ...
    }
    

    pkg/kubelet/kubelet.go中

    func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        kubeDeps *Dependencies,
        crOptions *config.ContainerRuntimeOptions,
        containerRuntime string,
        hostname string,
        hostnameOverridden bool,
        nodeName types.NodeName,
        nodeIPs []net.IP,
        providerID string,
        cloudProvider string,
        certDirectory string,
        rootDirectory string,
        imageCredentialProviderConfigFile string,
        imageCredentialProviderBinDir string,
        registerNode bool,
        registerWithTaints []api.Taint,
        allowedUnsafeSysctls []string,
        experimentalMounterPath string,
        kernelMemcgNotification bool,
        experimentalCheckNodeCapabilitiesBeforeMount bool,
        experimentalNodeAllocatableIgnoreEvictionThreshold bool,
        minimumGCAge metav1.Duration,
        maxPerPodContainerCount int32,
        maxContainerCount int32,
        masterServiceNamespace string,
        registerSchedulable bool,
        keepTerminatedPodVolumes bool,
        nodeLabels map[string]string,
        nodeStatusMaxImages int32,
        seccompDefault bool,
    ) (*Kubelet, error) {
        ...
        klet := &Kubelet{
            hostname:                                hostname,
            hostnameOverridden:                      hostnameOverridden,
            nodeName:                                nodeName,
            kubeClient:                              kubeDeps.KubeClient,
            heartbeatClient:                         kubeDeps.HeartbeatClient,
            onRepeatedHeartbeatFailure:              kubeDeps.OnHeartbeatFailure,
            rootDirectory:                           rootDirectory,
            resyncInterval:                          kubeCfg.SyncFrequency.Duration,
            sourcesReady:                            config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
            registerNode:                            registerNode,
            registerWithTaints:                      registerWithTaints,
            registerSchedulable:                     registerSchedulable,
            dnsConfigurer:                           dns.NewConfigurer(kubeDeps.Recorder, nodeRef, nodeIPs, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
            serviceLister:                           serviceLister,
            serviceHasSynced:                        serviceHasSynced,
            nodeLister:                              nodeLister,
            nodeHasSynced:                           nodeHasSynced,
            masterServiceNamespace:                  masterServiceNamespace,
            streamingConnectionIdleTimeout:          kubeCfg.StreamingConnectionIdleTimeout.Duration,
            recorder:                                kubeDeps.Recorder,
            cadvisor:                                kubeDeps.CAdvisorInterface,
            cloud:                                   kubeDeps.Cloud,
            externalCloudProvider:                   cloudprovider.IsExternal(cloudProvider),
            providerID:                              providerID,
            nodeRef:                                 nodeRef,
            nodeLabels:                              nodeLabels,
            nodeStatusUpdateFrequency:               kubeCfg.NodeStatusUpdateFrequency.Duration,
            nodeStatusReportFrequency:               kubeCfg.NodeStatusReportFrequency.Duration,
            os:                                      kubeDeps.OSInterface,
            oomWatcher:                              oomWatcher,
            cgroupsPerQOS:                           kubeCfg.CgroupsPerQOS,
            cgroupRoot:                              kubeCfg.CgroupRoot,
            mounter:                                 kubeDeps.Mounter,
            hostutil:                                kubeDeps.HostUtil,
            subpather:                               kubeDeps.Subpather,
            maxPods:                                 int(kubeCfg.MaxPods),
            podsPerCore:                             int(kubeCfg.PodsPerCore),
            syncLoopMonitor:                         atomic.Value{},
            daemonEndpoints:                         daemonEndpoints,
            containerManager:                        kubeDeps.ContainerManager,
            containerRuntimeName:                    containerRuntime,
            nodeIPs:                                 nodeIPs,
            nodeIPValidator:                         validateNodeIP,
            clock:                                   clock.RealClock{},
            enableControllerAttachDetach:            kubeCfg.EnableControllerAttachDetach,
            makeIPTablesUtilChains:                  kubeCfg.MakeIPTablesUtilChains,
            iptablesMasqueradeBit:                   int(kubeCfg.IPTablesMasqueradeBit),
            iptablesDropBit:                         int(kubeCfg.IPTablesDropBit),
            experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
            keepTerminatedPodVolumes:                keepTerminatedPodVolumes,
            nodeStatusMaxImages:                     nodeStatusMaxImages,
            lastContainerStartedTime:                newTimeCache(),
        }
        ...
        klet.podCache = kubecontainer.NewCache()
        ...
        klet.podWorkers = newPodWorkers(
            klet.syncPod,
            klet.syncTerminatingPod,
            klet.syncTerminatedPod,
    
            kubeDeps.Recorder,
            klet.workQueue,
            klet.resyncInterval,
            backOffPeriod,
            klet.podCache,
        )
        ...
    
        runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
            kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
            klet.livenessManager,
            klet.readinessManager,
            klet.startupManager,
            rootDirectory,
            machineInfo,
            klet.podWorkers,
            kubeDeps.OSInterface,
            klet,
            httpClient,
            imageBackOff,
            kubeCfg.SerializeImagePulls,
            float32(kubeCfg.RegistryPullQPS),
            int(kubeCfg.RegistryBurst),
            imageCredentialProviderConfigFile,
            imageCredentialProviderBinDir,
            kubeCfg.CPUCFSQuota,
            kubeCfg.CPUCFSQuotaPeriod,
            kubeDeps.RemoteRuntimeService,
            kubeDeps.RemoteImageService,
            kubeDeps.ContainerManager.InternalContainerLifecycle(),
            kubeDeps.dockerLegacyService,
            klet.containerLogManager,
            klet.runtimeClassManager,
            seccompDefault,
            kubeCfg.MemorySwap.SwapBehavior,
            kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
            *kubeCfg.MemoryThrottlingFactor,
        )
        if err != nil {
            return nil, err
        }
        klet.containerRuntime = runtime
        klet.streamingRuntime = runtime
        klet.runner = runtime
        ...
        klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
        ...
    }
    
    func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
        ...
        kl.pleg.Start()
        kl.syncLoop(updates, kl)
        ...
    }
    
    func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
        ...
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
        ...
    }
    
    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
        syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
        ...
        handler.HandlePodAdditions(u.Pods)
        ...
    }
    
    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
        ...
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
        ...
    }
    
    func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
        // Run the sync in an async worker.
        kl.podWorkers.UpdatePod(UpdatePodOptions{
            Pod:        pod,
            MirrorPod:  mirrorPod,
            UpdateType: syncType,
            StartTime:  start,
        })
        // Note the number of containers for new pods.
        if syncType == kubetypes.SyncPodCreate {
            metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
        }
    }
    

    pkg/kubelet/pod_workers.go中

    
    func newPodWorkers(
        syncPodFn syncPodFnType,
        syncTerminatingPodFn syncTerminatingPodFnType,
        syncTerminatedPodFn syncTerminatedPodFnType,
        recorder record.EventRecorder,
        workQueue queue.WorkQueue,
        resyncInterval, backOffPeriod time.Duration,
        podCache kubecontainer.Cache,
    ) PodWorkers {
        return &podWorkers{
            podSyncStatuses:               map[types.UID]*podSyncStatus{},
            podUpdates:                    map[types.UID]chan podWork{},
            lastUndeliveredWorkUpdate:     map[types.UID]podWork{},
            terminatingStaticPodFullnames: map[string]struct{}{},
            syncPodFn:                     syncPodFn,
            syncTerminatingPodFn:          syncTerminatingPodFn,
            syncTerminatedPodFn:           syncTerminatedPodFn,
            recorder:                      recorder,
            workQueue:                     workQueue,
            resyncInterval:                resyncInterval,
            backOffPeriod:                 backOffPeriod,
            podCache:                      podCache,
        }
    }
    
    func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
        ...
        p.managePodLoop(podUpdates)
        ...
    }
    
    func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
        ...
        var status *kubecontainer.PodStatus
        ...
        status, err = p.podCache.GetNewerThan(pod.UID, lastSyncTime)
        ...
        err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
        ...
    }
    
    func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
        ...
        result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
        ...
    }
    

    pkg/kubelet/kuberuntime/kuberuntime_manager.go中

    func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
        podContainerChanges := m.computePodActions(pod, podStatus)
        ...
    }
    
    func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
        ...
        restart := shouldRestartOnFailure(pod)
        if _, _, changed := containerChanged(&container, containerStatus); changed {
                message = fmt.Sprintf("Container %s definition changed", container.Name)
                // Restart regardless of the restart policy because the container
                // spec changed.
        restart = true
        ...
    }
    
    func containerChanged(container *v1.Container, containerStatus *kubecontainer.Status) (uint64, uint64, bool) {
        expectedHash := kubecontainer.HashContainer(container)
        return expectedHash, containerStatus.Hash, containerStatus.Hash != expectedHash
    }
    
    

    pkg/kubelet/container/helpers.go中

    func HashContainer(container *v1.Container) uint64 {
        hash := fnv.New32a()
        // Omit nil or empty field when calculating hash value
        // Please see https://github.com/kubernetes/kubernetes/issues/53644
        containerJSON, _ := json.Marshal(container)
        hashutil.DeepHashObject(hash, containerJSON)
        return uint64(hash.Sum32())
    }
    

    pkg/kubelet/container/cache.go中

    func NewCache() Cache {
        return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}}
    }
    
    func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) {
        ch := c.subscribe(id, minTime)
        d := <-ch
        return d.status, d.err
    }
    
    func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data {
        ch := make(chan *data, 1)
        c.lock.Lock()
        defer c.lock.Unlock()
        d := c.getIfNewerThan(id, timestamp)
        if d != nil {
            // If the cache entry is ready, send the data and return immediately.
            ch <- d
            return ch
        }
        // Add the subscription record.
        c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
        return ch
    }
    

    pkg/kubelet/pleg/generic.go中

    func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
        relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
        return &GenericPLEG{
            relistPeriod: relistPeriod,
            runtime:      runtime,
            eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
            podRecords:   make(podRecords),
            cache:        cache,
            clock:        clock,
        }
    }
    
    func (g *GenericPLEG) Start() {
        go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
    }
    
    func (g *GenericPLEG) relist() {
        ...
        if err := g.updateCache(pod, pid); err != nil {
        ...
    }
    
    func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
        ...
        g.cache.Set(pod.ID, status, err, timestamp)
        ...
    }
    

    相关文章

      网友评论

          本文标题:kubelet重启导致container重启源码分析

          本文链接:https://www.haomeiwen.com/subject/jtbqoltx.html