美文网首页
k8s 之 kubelet 源码简单分析

k8s 之 kubelet 源码简单分析

作者: wwq2020 | 来源:发表于2020-07-16 17:24 被阅读0次

    简介

    kubelet 监听资源的变更,并通过容器运行时运行pod
    对Pod进行健康检查,并且把状态通过apiserver的接口更新到etcd

    cmd/kubelet/kubelet.go 中

    func main() {
        ...
        command := app.NewKubeletCommand()
        ...
        if err := command.Execute(); err != nil {
            os.Exit(1)
        }
    }
    

    cmd/kubelet/app/server.go 中

    func NewKubeletCommand() *cobra.Command {
        ...
        kubeletServer := &options.KubeletServer{
            KubeletFlags:         *kubeletFlags,
            KubeletConfiguration: *kubeletConfig,
        }
    
        kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
        if err != nil {
            klog.Fatal(err)
        }
        ...
        if err := Run(kubeletServer, kubeletDeps, utilfeature.              DefaultFeatureGate, stopCh); err != nil {
                    klog.Fatal(err)
        }
        ...
    }
    
    func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
        // Initialize the TLS Options
        tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
        if err != nil {
            return nil, err
        }
    
        mounter := mount.New(s.ExperimentalMounterPath)
        subpather := subpath.New(mounter)
        hu := hostutil.NewHostUtil()
        var pluginRunner = exec.New()
    
        var dockerOptions *kubelet.DockerOptions
        if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
            dockerOptions = &kubelet.DockerOptions{
                DockerEndpoint:            s.DockerEndpoint,
                RuntimeRequestTimeout:     s.RuntimeRequestTimeout.Duration,
                ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration,
            }
        }
    
        plugins, err := ProbeVolumePlugins(featureGate)
        if err != nil {
            return nil, err
        }
        return &kubelet.Dependencies{
            Auth:                nil, // default does not enforce auth[nz]
            CAdvisorInterface:   nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
            Cloud:               nil, // cloud provider might start background processes
            ContainerManager:    nil,
            DockerOptions:       dockerOptions,
            KubeClient:          nil,
            HeartbeatClient:     nil,
            EventClient:         nil,
            HostUtil:            hu,
            Mounter:             mounter,
            Subpather:           subpather,
            OOMAdjuster:         oom.NewOOMAdjuster(),
            OSInterface:         kubecontainer.RealOS{},
            VolumePlugins:       plugins,
            DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
            TLSOptions:          tlsOptions}, nil
    }
    
    func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {
        // To help debugging, immediately log version
        klog.Infof("Version: %+v", version.Get())
        if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
            return fmt.Errorf("failed OS init: %v", err)
        }
        if err := run(s, kubeDeps, featureGate, stopCh); err != nil {
            return fmt.Errorf("failed to run Kubelet: %v", err)
        }
        return nil
    }
    
    func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
        ...
        if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
            return err
        }
        ...
    }
    
    func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
        hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
        if err != nil {
            return err
        }
        // Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
        nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
        if err != nil {
            return err
        }
        hostnameOverridden := len(kubeServer.HostnameOverride) > 0
        // Setup event recorder if required.
        makeEventRecorder(kubeDeps, nodeName)
    
        capabilities.Initialize(capabilities.Capabilities{
            AllowPrivileged: true,
        })
    
        credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
        klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)
    
        if kubeDeps.OSInterface == nil {
            kubeDeps.OSInterface = kubecontainer.RealOS{}
        }
    
        k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
            kubeDeps,
            &kubeServer.ContainerRuntimeOptions,
            kubeServer.ContainerRuntime,
            hostname,
            hostnameOverridden,
            nodeName,
            kubeServer.NodeIP,
            kubeServer.ProviderID,
            kubeServer.CloudProvider,
            kubeServer.CertDirectory,
            kubeServer.RootDirectory,
            kubeServer.RegisterNode,
            kubeServer.RegisterWithTaints,
            kubeServer.AllowedUnsafeSysctls,
            kubeServer.ExperimentalMounterPath,
            kubeServer.KernelMemcgNotification,
            kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
            kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
            kubeServer.MinimumGCAge,
            kubeServer.MaxPerPodContainerCount,
            kubeServer.MaxContainerCount,
            kubeServer.MasterServiceNamespace,
            kubeServer.RegisterSchedulable,
            kubeServer.KeepTerminatedPodVolumes,
            kubeServer.NodeLabels,
            kubeServer.SeccompProfileRoot,
            kubeServer.BootstrapCheckpointPath,
            kubeServer.NodeStatusMaxImages)
        if err != nil {
            return fmt.Errorf("failed to create kubelet: %v", err)
        }
    
        // NewMainKubelet should have set up a pod source config if one didn't exist
        // when the builder was run. This is just a precaution.
        if kubeDeps.PodConfig == nil {
            return fmt.Errorf("failed to create kubelet, pod source config was nil")
        }
        podCfg := kubeDeps.PodConfig
    
        if err := rlimit.SetNumFiles(uint64(kubeServer.MaxOpenFiles)); err != nil {
            klog.Errorf("Failed to set rlimit on max file handles: %v", err)
        }
    
        // process pods and exit.
        if runOnce {
            if _, err := k.RunOnce(podCfg.Updates()); err != nil {
                return fmt.Errorf("runonce failed: %v", err)
            }
            klog.Info("Started kubelet as runonce")
        } else {
            startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
            klog.Info("Started kubelet")
        }
        return nil
    }
    
    func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
        // start the kubelet
        go k.Run(podCfg.Updates())
    
        // start the kubelet server
        if enableServer {
            go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
    
        }
        if kubeCfg.ReadOnlyPort > 0 {
            go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
        }
        if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
            go k.ListenAndServePodResources()
        }
    }
    
    
    func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        kubeDeps *kubelet.Dependencies,
        crOptions *config.ContainerRuntimeOptions,
        containerRuntime string,
        hostname string,
        hostnameOverridden bool,
        nodeName types.NodeName,
        nodeIP string,
        providerID string,
        cloudProvider string,
        certDirectory string,
        rootDirectory string,
        registerNode bool,
        registerWithTaints []api.Taint,
        allowedUnsafeSysctls []string,
        experimentalMounterPath string,
        kernelMemcgNotification bool,
        experimentalCheckNodeCapabilitiesBeforeMount bool,
        experimentalNodeAllocatableIgnoreEvictionThreshold bool,
        minimumGCAge metav1.Duration,
        maxPerPodContainerCount int32,
        maxContainerCount int32,
        masterServiceNamespace string,
        registerSchedulable bool,
        keepTerminatedPodVolumes bool,
        nodeLabels map[string]string,
        seccompProfileRoot string,
        bootstrapCheckpointPath string,
        nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
        // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
        // up into "per source" synchronizations
    
        k, err = kubelet.NewMainKubelet(kubeCfg,
            kubeDeps,
            crOptions,
            containerRuntime,
            hostname,
            hostnameOverridden,
            nodeName,
            nodeIP,
            providerID,
            cloudProvider,
            certDirectory,
            rootDirectory,
            registerNode,
            registerWithTaints,
            allowedUnsafeSysctls,
            experimentalMounterPath,
            kernelMemcgNotification,
            experimentalCheckNodeCapabilitiesBeforeMount,
            experimentalNodeAllocatableIgnoreEvictionThreshold,
            minimumGCAge,
            maxPerPodContainerCount,
            maxContainerCount,
            masterServiceNamespace,
            registerSchedulable,
            keepTerminatedPodVolumes,
            nodeLabels,
            seccompProfileRoot,
            bootstrapCheckpointPath,
        if err != nil {
            return nil, err
        }
    
        k.BirthCry()
    
        k.StartGarbageCollection()
    
        return k, nil
    }
    
    

    pkg/kubelet/kubelet.go 中

    func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        kubeDeps *Dependencies,
        crOptions *config.ContainerRuntimeOptions,
        containerRuntime string,
        hostname string,
        hostnameOverridden bool,
        nodeName types.NodeName,
        nodeIP string,
        providerID string,
        cloudProvider string,
        certDirectory string,
        rootDirectory string,
        registerNode bool,
        registerWithTaints []api.Taint,
        allowedUnsafeSysctls []string,
        experimentalMounterPath string,
        kernelMemcgNotification bool,
        experimentalCheckNodeCapabilitiesBeforeMount bool,
        experimentalNodeAllocatableIgnoreEvictionThreshold bool,
        minimumGCAge metav1.Duration,
        maxPerPodContainerCount int32,
        maxContainerCount int32,
        masterServiceNamespace string,
        registerSchedulable bool,
        keepTerminatedPodVolumes bool,
        nodeLabels map[string]string,
        seccompProfileRoot string,
        bootstrapCheckpointPath string,
        nodeStatusMaxImages int32) (*Kubelet, error) {
        ...
        klet.probeManager = prober.NewManager(
            klet.statusManager,
            klet.livenessManager,
            klet.startupManager,
            klet.runner,
            kubeDeps.Recorder)
        ...
        klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
        ...
    }
    
    func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
        ...
         kl.probeManager.Start()
        ...
        kl.syncLoop(updates, kl)
    }
    
    func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
        klog.Info("Starting kubelet main sync loop.")
        // The syncTicker wakes up kubelet to checks if there are any pod workers
        // that need to be sync'd. A one-second period is sufficient because the
        // sync interval is defaulted to 10s.
        syncTicker := time.NewTicker(time.Second)
        defer syncTicker.Stop()
        housekeepingTicker := time.NewTicker(housekeepingPeriod)
        defer housekeepingTicker.Stop()
        plegCh := kl.pleg.Watch()
        const (
            base   = 100 * time.Millisecond
            max    = 5 * time.Second
            factor = 2
        )
        duration := base
        // Responsible for checking limits in resolv.conf
        // The limits do not have anything to do with individual pods
        // Since this is called in syncLoop, we don't need to call it anywhere else
        if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
            kl.dnsConfigurer.CheckLimitsForResolvConf()
        }
    
        for {
            if err := kl.runtimeState.runtimeErrors(); err != nil {
                klog.Errorf("skipping pod synchronization - %v", err)
                // exponential backoff
                time.Sleep(duration)
                duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
                continue
            }
            // reset backoff if we have a success
            duration = base
    
            kl.syncLoopMonitor.Store(kl.clock.Now())
            if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
                break
            }
            kl.syncLoopMonitor.Store(kl.clock.Now())
        }
    }
    
    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
        syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
        select {
        case u, open := <-configCh:
            // Update from a config source; dispatch it to the right handler
            // callback.
            if !open {
                klog.Errorf("Update channel is closed. Exiting the sync loop.")
                return false
            }
    
            switch u.Op {
            case kubetypes.ADD:
                klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
                // After restarting, kubelet will get all existing pods through
                // ADD as if they are new pods. These pods will then go through the
                // admission process and *may* be rejected. This can be resolved
                // once we have checkpointing.
                handler.HandlePodAdditions(u.Pods)
            case kubetypes.UPDATE:
                klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
                handler.HandlePodUpdates(u.Pods)
            case kubetypes.REMOVE:
                klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
                handler.HandlePodRemoves(u.Pods)
            case kubetypes.RECONCILE:
                klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
                handler.HandlePodReconcile(u.Pods)
            case kubetypes.DELETE:
                klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
                // DELETE is treated as a UPDATE because of graceful deletion.
                handler.HandlePodUpdates(u.Pods)
            case kubetypes.RESTORE:
                klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
                // These are pods restored from the checkpoint. Treat them as new
                // pods.
                handler.HandlePodAdditions(u.Pods)
            case kubetypes.SET:
                // TODO: Do we want to support this?
                klog.Errorf("Kubelet does not support snapshot update")
            }
    
            if u.Op != kubetypes.RESTORE {
                // If the update type is RESTORE, it means that the update is from
                // the pod checkpoints and may be incomplete. Do not mark the
                // source as ready.
    
                // Mark the source ready after receiving at least one update from the
                // source. Once all the sources are marked ready, various cleanup
                // routines will start reclaiming resources. It is important that this
                // takes place only after kubelet calls the update handler to process
                // the update to ensure the internal pod cache is up-to-date.
                kl.sourcesReady.AddSource(u.Source)
            }
        case e := <-plegCh:
            if isSyncPodWorthy(e) {
                // PLEG event for a pod; sync it.
                if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                    klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
                    handler.HandlePodSyncs([]*v1.Pod{pod})
                } else {
                    // If the pod no longer exists, ignore the event.
                    klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
                }
            }
    
            if e.Type == pleg.ContainerDied {
                if containerID, ok := e.Data.(string); ok {
                    kl.cleanUpContainersInPod(e.ID, containerID)
                }
            }
        case <-syncCh:
            // Sync pods waiting for sync
            podsToSync := kl.getPodsToSync()
            if len(podsToSync) == 0 {
                break
            }
            klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
            handler.HandlePodSyncs(podsToSync)
        case update := <-kl.livenessManager.Updates():
            if update.Result == proberesults.Failure {
                // The liveness manager detected a failure; sync the pod.
    
                // We should not use the pod from livenessManager, because it is never updated after
                // initialization.
                pod, ok := kl.podManager.GetPodByUID(update.PodUID)
                if !ok {
                    // If the pod no longer exists, ignore the update.
                    klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
                    break
                }
                klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
                handler.HandlePodSyncs([]*v1.Pod{pod})
            }
        case <-housekeepingCh:
            if !kl.sourcesReady.AllReady() {
                // If the sources aren't ready or volume manager has not yet synced the states,
                // skip housekeeping, as we may accidentally delete pods from unready sources.
                klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
            } else {
                klog.V(4).Infof("SyncLoop (housekeeping)")
                if err := handler.HandlePodCleanups(); err != nil {
                    klog.Errorf("Failed cleaning pods: %v", err)
                }
            }
        }
        return true
    }
    
    
    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
        start := kl.clock.Now()
        sort.Sort(sliceutils.PodsByCreationTime(pods))
        for _, pod := range pods {
            existingPods := kl.podManager.GetPods()
            // Always add the pod to the pod manager. Kubelet relies on the pod
            // manager as the source of truth for the desired state. If a pod does
            // not exist in the pod manager, it means that it has been deleted in
            // the apiserver and no action (other than cleanup) is required.
            kl.podManager.AddPod(pod)
    
            if kubetypes.IsMirrorPod(pod) {
                kl.handleMirrorPod(pod, start)
                continue
            }
    
            if !kl.podIsTerminated(pod) {
                // Only go through the admission process if the pod is not
                // terminated.
    
                // We failed pods that we rejected, so activePods include all admitted
                // pods that are alive.
                activePods := kl.filterOutTerminatedPods(existingPods)
    
                // Check if we can admit the pod; if not, reject it.
                if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
                    kl.rejectPod(pod, reason, message)
                    continue
                }
            }
            mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
            kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
            kl.probeManager.AddPod(pod)
        }
    }
    
    func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
        // check whether we are ready to delete the pod from the API server (all status up to date)
        containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
        if pod.DeletionTimestamp != nil && containersTerminal {
            klog.V(4).Infof("Pod %q has completed execution and should be deleted from the API server: %s", format.Pod(pod), syncType)
            kl.statusManager.TerminatePod(pod)
            return
        }
    
        // optimization: avoid invoking the pod worker if no further changes are possible to the pod definition
        if podWorkerTerminal {
            klog.V(4).Infof("Pod %q has completed, ignoring remaining sync work: %s", format.Pod(pod), syncType)
            return
        }
    
        // Run the sync in an async worker.
        kl.podWorkers.UpdatePod(&UpdatePodOptions{
            Pod:        pod,
            MirrorPod:  mirrorPod,
            UpdateType: syncType,
            OnCompleteFunc: func(err error) {
                if err != nil {
                    metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
                }
            },
        })
        // Note the number of containers for new pods.
        if syncType == kubetypes.SyncPodCreate {
            metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
        }
    }
    
    
    func (kl *Kubelet) syncPod(o syncPodOptions) error {
        // pull out the required options
        pod := o.pod
        mirrorPod := o.mirrorPod
        podStatus := o.podStatus
        updateType := o.updateType
    
        // if we want to kill a pod, do it now!
        if updateType == kubetypes.SyncPodKill {
            killPodOptions := o.killPodOptions
            if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
                return fmt.Errorf("kill pod options are required if update type is kill")
            }
            apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
            kl.statusManager.SetPodStatus(pod, apiPodStatus)
            // we kill the pod with the specified grace period since this is a termination
            if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
                kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
                // there was an error killing the pod, so we return that error directly
                utilruntime.HandleError(err)
                return err
            }
            return nil
        }
    
        // Latency measurements for the main workflow are relative to the
        // first time the pod was seen by the API server.
        var firstSeenTime time.Time
        if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
            firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
        }
    
        // Record pod worker start latency if being created
        // TODO: make pod workers record their own latencies
        if updateType == kubetypes.SyncPodCreate {
            if !firstSeenTime.IsZero() {
                // This is the first time we are syncing the pod. Record the latency
                // since kubelet first saw the pod if firstSeenTime is set.
                metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
            } else {
                klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
            }
        }
    
        // Generate final API pod status with pod and status manager status
        apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
        // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
        // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
        // set pod IP to hostIP directly in runtime.GetPodStatus
        podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
        for _, ipInfo := range apiPodStatus.PodIPs {
            podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
        }
    
        if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
            podStatus.IPs = []string{apiPodStatus.PodIP}
        }
    
        // Record the time it takes for the pod to become running.
        existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
        if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
            !firstSeenTime.IsZero() {
            metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
        }
    
        runnable := kl.canRunPod(pod)
        if !runnable.Admit {
            // Pod is not runnable; update the Pod and Container statuses to why.
            apiPodStatus.Reason = runnable.Reason
            apiPodStatus.Message = runnable.Message
            // Waiting containers are not creating.
            const waitingReason = "Blocked"
            for _, cs := range apiPodStatus.InitContainerStatuses {
                if cs.State.Waiting != nil {
                    cs.State.Waiting.Reason = waitingReason
                }
            }
            for _, cs := range apiPodStatus.ContainerStatuses {
                if cs.State.Waiting != nil {
                    cs.State.Waiting.Reason = waitingReason
                }
            }
        }
    
        // Update status in the status manager
        kl.statusManager.SetPodStatus(pod, apiPodStatus)
    
        // Kill pod if it should not be running
        if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
            var syncErr error
            if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
                kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
                syncErr = fmt.Errorf("error killing pod: %v", err)
                utilruntime.HandleError(syncErr)
            } else {
                if !runnable.Admit {
                    // There was no error killing the pod, but the pod cannot be run.
                    // Return an error to signal that the sync loop should back off.
                    syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
                }
            }
            return syncErr
        }
    
        // If the network plugin is not ready, only start the pod if it uses the host network
        if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
            return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
        }
    
        // Create Cgroups for the pod and apply resource parameters
        // to them if cgroups-per-qos flag is enabled.
        pcm := kl.containerManager.NewPodContainerManager()
        // If pod has already been terminated then we need not create
        // or update the pod's cgroup
        if !kl.podIsTerminated(pod) {
            // When the kubelet is restarted with the cgroups-per-qos
            // flag enabled, all the pod's running containers
            // should be killed intermittently and brought back up
            // under the qos cgroup hierarchy.
            // Check if this is the pod's first sync
            firstSync := true
            for _, containerStatus := range apiPodStatus.ContainerStatuses {
                if containerStatus.State.Running != nil {
                    firstSync = false
                    break
                }
            }
            // Don't kill containers in pod if pod's cgroups already
            // exists or the pod is running for the first time
            podKilled := false
            if !pcm.Exists(pod) && !firstSync {
                if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
                    podKilled = true
                }
            }
            // Create and Update pod's Cgroups
            // Don't create cgroups for run once pod if it was killed above
            // The current policy is not to restart the run once pods when
            // the kubelet is restarted with the new flag as run once pods are
            // expected to run only once and if the kubelet is restarted then
            // they are not expected to run again.
            // We don't create and apply updates to cgroup if its a run once pod and was killed above
            if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
                if !pcm.Exists(pod) {
                    if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
                        klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
                    }
                    if err := pcm.EnsureExists(pod); err != nil {
                        kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
                        return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
                    }
                }
            }
        }
    
        // Create Mirror Pod for Static Pod if it doesn't already exist
        if kubetypes.IsStaticPod(pod) {
            podFullName := kubecontainer.GetPodFullName(pod)
            deleted := false
            if mirrorPod != nil {
                if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
                    // The mirror pod is semantically different from the static pod. Remove
                    // it. The mirror pod will get recreated later.
                    klog.Infof("Trying to delete pod %s %v", podFullName, mirrorPod.ObjectMeta.UID)
                    var err error
                    deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
                    if deleted {
                        klog.Warningf("Deleted mirror pod %q because it is outdated", format.Pod(mirrorPod))
                    } else if err != nil {
                        klog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
                    }
                }
            }
            if mirrorPod == nil || deleted {
                node, err := kl.GetNode()
                if err != nil || node.DeletionTimestamp != nil {
                    klog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
                } else {
                    klog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
                    if err := kl.podManager.CreateMirrorPod(pod); err != nil {
                        klog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
                    }
                }
            }
        }
    
        // Make data directories for the pod
        if err := kl.makePodDataDirs(pod); err != nil {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
            klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
            return err
        }
    
        // Volume manager will not mount volumes for terminated pods
        if !kl.podIsTerminated(pod) {
            // Wait for volumes to attach/mount
            if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
                kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
                klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
                return err
            }
        }
    
        // Fetch the pull secrets for the pod
        pullSecrets := kl.getPullSecretsForPod(pod)
    
        // Call the container runtime's SyncPod callback
        result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
        kl.reasonCache.Update(pod.UID, result)
        if err := result.Error(); err != nil {
            // Do not return error if the only failures were pods in backoff
            for _, r := range result.SyncResults {
                if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
                    // Do not record an event here, as we keep all event logging for sync pod failures
                    // local to container runtime so we get better errors
                    return err
                }
            }
    
            return nil
        }
    
        return nil
    }
    
    

    pkg/kubelet/pod_workers.go 中

    func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
        resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
        return &podWorkers{
            podUpdates:                map[types.UID]chan UpdatePodOptions{},
            isWorking:                 map[types.UID]bool{},
            lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},
            syncPodFn:                 syncPodFn,
            recorder:                  recorder,
            workQueue:                 workQueue,
            resyncInterval:            resyncInterval,
            backOffPeriod:             backOffPeriod,
            podCache:                  podCache,
        }
    }
    
    func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
        pod := options.Pod
        uid := pod.UID
        var podUpdates chan UpdatePodOptions
        var exists bool
    
        p.podLock.Lock()
        defer p.podLock.Unlock()
        if podUpdates, exists = p.podUpdates[uid]; !exists {
            // We need to have a buffer here, because checkForUpdates() method that
            // puts an update into channel is called from the same goroutine where
            // the channel is consumed. However, it is guaranteed that in such case
            // the channel is empty, so buffer of size 1 is enough.
            podUpdates = make(chan UpdatePodOptions, 1)
            p.podUpdates[uid] = podUpdates
    
            // Creating a new pod worker either means this is a new pod, or that the
            // kubelet just restarted. In either case the kubelet is willing to believe
            // the status of the pod for the first pod worker sync. See corresponding
            // comment in syncPod.
            go func() {
                defer runtime.HandleCrash()
                p.managePodLoop(podUpdates)
            }()
        }
        if !p.isWorking[pod.UID] {
            p.isWorking[pod.UID] = true
            podUpdates <- *options
        } else {
            // if a request to kill a pod is pending, we do not let anything overwrite that request.
            update, found := p.lastUndeliveredWorkUpdate[pod.UID]
            if !found || update.UpdateType != kubetypes.SyncPodKill {
                p.lastUndeliveredWorkUpdate[pod.UID] = *options
            }
        }
    }
    
    func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
        var lastSyncTime time.Time
        for update := range podUpdates {
            err := func() error {
                podUID := update.Pod.UID
                // This is a blocking call that would return only if the cache
                // has an entry for the pod that is newer than minRuntimeCache
                // Time. This ensures the worker doesn't start syncing until
                // after the cache is at least newer than the finished time of
                // the previous sync.
                status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
                if err != nil {
                    // This is the legacy event thrown by manage pod loop
                    // all other events are now dispatched from syncPodFn
                    p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
                    return err
                }
                err = p.syncPodFn(syncPodOptions{
                    mirrorPod:      update.MirrorPod,
                    pod:            update.Pod,
                    podStatus:      status,
                    killPodOptions: update.KillPodOptions,
                    updateType:     update.UpdateType,
                })
                lastSyncTime = time.Now()
                return err
            }()
            // notify the call-back function if the operation succeeded or not
            if update.OnCompleteFunc != nil {
                update.OnCompleteFunc(err)
            }
            if err != nil {
                // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
                klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
            }
            p.wrapUp(update.Pod.UID, err)
        }
    }
    

    pkg/kubelet/config/config.go中

    func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
        updates := make(chan kubetypes.PodUpdate, 50)
        storage := newPodStorage(updates, mode, recorder)
        podConfig := &PodConfig{
            pods:    storage,
            mux:     config.NewMux(storage),
            updates: updates,
            sources: sets.String{},
        }
        return podConfig
    }
    
    func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
        return &podStorage{
            pods:        make(map[string]map[types.UID]*v1.Pod),
            mode:        mode,
            updates:     updates,
            sourcesSeen: sets.String{},
            recorder:    recorder,
        }
    }
    
    func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
        return c.updates
    }
    
    func (c *PodConfig) Channel(source string) chan<- interface{} {
        c.sourcesLock.Lock()
        defer c.sourcesLock.Unlock()
        c.sources.Insert(source)
        return c.mux.Channel(source)
    }
    
    func NewMux(merger Merger) *Mux {
        mux := &Mux{
            sources: make(map[string]chan interface{}),
            merger:  merger,
        }
        return mux
    }
    
    func (m *Mux) Channel(source string) chan interface{} {
        if len(source) == 0 {
            panic("Channel given an empty name")
        }
        m.sourceLock.Lock()
        defer m.sourceLock.Unlock()
        channel, exists := m.sources[source]
        if exists {
            return channel
        }
        newChannel := make(chan interface{})
        m.sources[source] = newChannel
        go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
        return newChannel
    }
    
    func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
        for update := range listenChannel {
            m.merger.Merge(source, update)
        }
    }
    
    func (s *podStorage) Merge(source string, change interface{}) error {
        s.updateLock.Lock()
        defer s.updateLock.Unlock()
    
        seenBefore := s.sourcesSeen.Has(source)
        adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
        firstSet := !seenBefore && s.sourcesSeen.Has(source)
    
        // deliver update notifications
        switch s.mode {
        case PodConfigNotificationIncremental:
            if len(removes.Pods) > 0 {
                s.updates <- *removes
            }
            if len(adds.Pods) > 0 {
                s.updates <- *adds
            }
            if len(updates.Pods) > 0 {
                s.updates <- *updates
            }
            if len(deletes.Pods) > 0 {
                s.updates <- *deletes
            }
            if len(restores.Pods) > 0 {
                s.updates <- *restores
            }
            if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
                // Send an empty update when first seeing the source and there are
                // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
                // the source is ready.
                s.updates <- *adds
            }
            // Only add reconcile support here, because kubelet doesn't support Snapshot update now.
            if len(reconciles.Pods) > 0 {
                s.updates <- *reconciles
            }
    
        case PodConfigNotificationSnapshotAndUpdates:
            if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
                s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
            }
            if len(updates.Pods) > 0 {
                s.updates <- *updates
            }
            if len(deletes.Pods) > 0 {
                s.updates <- *deletes
            }
    
        case PodConfigNotificationSnapshot:
            if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
                s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
            }
    
        case PodConfigNotificationUnknown:
            fallthrough
        default:
            panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
        }
    
        return nil
    }
    
    

    pkg/kubelet/config/apiserver.go中

    func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
        lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
        newSourceApiserverFromLW(lw, updates)
    }
    
    func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
        send := func(objs []interface{}) {
            var pods []*v1.Pod
            for _, o := range objs {
                pods = append(pods, o.(*v1.Pod))
            }
            updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
        }
        r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
        go r.Run(wait.NeverStop)
    }
    

    k8s.io/client-go/tools/cache/reflector.go中

    func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
        return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
    }
    
    // NewNamedReflector same as NewReflector, but with a specified name for logging
    func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
        realClock := &clock.RealClock{}
        r := &Reflector{
            name:          name,
            listerWatcher: lw,
            store:         store,
            // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
            // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
            // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
            backoffManager:    wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
            resyncPeriod:      resyncPeriod,
            clock:             realClock,
            watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
        }
        r.setExpectedType(expectedType)
        return r
    }
    
    func (r *Reflector) Run(stopCh <-chan struct{}) {
        klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
        wait.BackoffUntil(func() {
            if err := r.ListAndWatch(stopCh); err != nil {
                r.watchErrorHandler(r, err)
            }
        }, r.backoffManager, true, stopCh)
        klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    }
    
    func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
        klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
        var resourceVersion string
    
        options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
    
        if err := func() error {
            initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
            defer initTrace.LogIfLong(10 * time.Second)
            var list runtime.Object
            var paginatedResult bool
            var err error
            listCh := make(chan struct{}, 1)
            panicCh := make(chan interface{}, 1)
            go func() {
                defer func() {
                    if r := recover(); r != nil {
                        panicCh <- r
                    }
                }()
                // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
                // list request will return the full response.
                pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                    return r.listerWatcher.List(opts)
                }))
                switch {
                case r.WatchListPageSize != 0:
                    pager.PageSize = r.WatchListPageSize
                case r.paginatedResult:
                    // We got a paginated result initially. Assume this resource and server honor
                    // paging requests (i.e. watch cache is probably disabled) and leave the default
                    // pager size set.
                case options.ResourceVersion != "" && options.ResourceVersion != "0":
                    // User didn't explicitly request pagination.
                    //
                    // With ResourceVersion != "", we have a possibility to list from watch cache,
                    // but we do that (for ResourceVersion != "0") only if Limit is unset.
                    // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
                    // switch off pagination to force listing from watch cache (if enabled).
                    // With the existing semantic of RV (result is at least as fresh as provided RV),
                    // this is correct and doesn't lead to going back in time.
                    //
                    // We also don't turn off pagination for ResourceVersion="0", since watch cache
                    // is ignoring Limit in that case anyway, and if watch cache is not enabled
                    // we don't introduce regression.
                    pager.PageSize = 0
                }
    
                list, paginatedResult, err = pager.List(context.Background(), options)
                if isExpiredError(err) || isTooLargeResourceVersionError(err) {
                    r.setIsLastSyncResourceVersionUnavailable(true)
                    // Retry immediately if the resource version used to list is unavailable.
                    // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
                    // continuation pages, but the pager might not be enabled, the full list might fail because the
                    // resource version it is listing at is expired or the cache may not yet be synced to the provided
                    // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
                    // the reflector makes forward progress.
                    list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
                }
                close(listCh)
            }()
            select {
            case <-stopCh:
                return nil
            case r := <-panicCh:
                panic(r)
            case <-listCh:
            }
            if err != nil {
                return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
            }
    
            // We check if the list was paginated and if so set the paginatedResult based on that.
            // However, we want to do that only for the initial list (which is the only case
            // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
            // situations we may force listing directly from etcd (by setting ResourceVersion="")
            // which will return paginated result, even if watch cache is enabled. However, in
            // that case, we still want to prefer sending requests to watch cache if possible.
            //
            // Paginated result returned for request with ResourceVersion="0" mean that watch
            // cache is disabled and there are a lot of objects of a given type. In such case,
            // there is no need to prefer listing from watch cache.
            if options.ResourceVersion == "0" && paginatedResult {
                r.paginatedResult = true
            }
    
            r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
            initTrace.Step("Objects listed")
            listMetaInterface, err := meta.ListAccessor(list)
            if err != nil {
                return fmt.Errorf("unable to understand list result %#v: %v", list, err)
            }
            resourceVersion = listMetaInterface.GetResourceVersion()
            initTrace.Step("Resource version extracted")
            items, err := meta.ExtractList(list)
            if err != nil {
                return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
            }
            initTrace.Step("Objects extracted")
            if err := r.syncWith(items, resourceVersion); err != nil {
                return fmt.Errorf("unable to sync list result: %v", err)
            }
            initTrace.Step("SyncWith done")
            r.setLastSyncResourceVersion(resourceVersion)
            initTrace.Step("Resource version updated")
            return nil
        }(); err != nil {
            return err
        }
    
        resyncerrc := make(chan error, 1)
        cancelCh := make(chan struct{})
        defer close(cancelCh)
        go func() {
            resyncCh, cleanup := r.resyncChan()
            defer func() {
                cleanup() // Call the last one written into cleanup
            }()
            for {
                select {
                case <-resyncCh:
                case <-stopCh:
                    return
                case <-cancelCh:
                    return
                }
                if r.ShouldResync == nil || r.ShouldResync() {
                    klog.V(4).Infof("%s: forcing resync", r.name)
                    if err := r.store.Resync(); err != nil {
                        resyncerrc <- err
                        return
                    }
                }
                cleanup()
                resyncCh, cleanup = r.resyncChan()
            }
        }()
    
        for {
            // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
            select {
            case <-stopCh:
                return nil
            default:
            }
    
            timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
            options = metav1.ListOptions{
                ResourceVersion: resourceVersion,
                // We want to avoid situations of hanging watchers. Stop any wachers that do not
                // receive any events within the timeout window.
                TimeoutSeconds: &timeoutSeconds,
                // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
                // Reflector doesn't assume bookmarks are returned at all (if the server do not support
                // watch bookmarks, it will ignore this field).
                AllowWatchBookmarks: true,
            }
    
            // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
            start := r.clock.Now()
            w, err := r.listerWatcher.Watch(options)
            if err != nil {
                // If this is "connection refused" error, it means that most likely apiserver is not responsive.
                // It doesn't make sense to re-list all objects because most likely we will be able to restart
                // watch where we ended.
                // If that's the case wait and resend watch request.
                if utilnet.IsConnectionRefused(err) {
                    time.Sleep(time.Second)
                    continue
                }
                return err
            }
    
            if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
                if err != errorStopRequested {
                    switch {
                    case isExpiredError(err):
                        // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
                        // has a semantic that it returns data at least as fresh as provided RV.
                        // So first try to LIST with setting RV to resource version of last observed object.
                        klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
                    default:
                        klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                    }
                }
                return nil
            }
        }
    }
    

    pkg/kubelet/prober/prober_manager.go中

    func NewManager(
        statusManager status.Manager,
        livenessManager results.Manager,
        startupManager results.Manager,
        runner kubecontainer.CommandRunner,
        recorder record.EventRecorder) Manager {
    
        prober := newProber(runner, recorder)
        readinessManager := results.NewManager()
        return &manager{
            statusManager:    statusManager,
            prober:           prober,
            readinessManager: readinessManager,
            livenessManager:  livenessManager,
            startupManager:   startupManager,
            workers:          make(map[probeKey]*worker),
        }
    }
    
    func (m *manager) Start() {
        // Start syncing readiness.
        go wait.Forever(m.updateReadiness, 0)
        // Start syncing startup.
        go wait.Forever(m.updateStartup, 0)
    }
    
    func (m *manager) updateReadiness() {
        update := <-m.readinessManager.Updates()
    
        ready := update.Result == results.Success
        m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
    }
    
    func (m *manager) updateStartup() {
        update := <-m.startupManager.Updates()
    
        started := update.Result == results.Success
        m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
    }
    
    func (m *manager) AddPod(pod *v1.Pod) {
        m.workerLock.Lock()
        defer m.workerLock.Unlock()
    
        key := probeKey{podUID: pod.UID}
        for _, c := range pod.Spec.Containers {
            key.containerName = c.Name
    
            if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
                key.probeType = startup
                if _, ok := m.workers[key]; ok {
                    klog.Errorf("Startup probe already exists! %v - %v",
                        format.Pod(pod), c.Name)
                    return
                }
                w := newWorker(m, startup, pod, c)
                m.workers[key] = w
                go w.run()
            }
    
            if c.ReadinessProbe != nil {
                key.probeType = readiness
                if _, ok := m.workers[key]; ok {
                    klog.Errorf("Readiness probe already exists! %v - %v",
                        format.Pod(pod), c.Name)
                    return
                }
                w := newWorker(m, readiness, pod, c)
                m.workers[key] = w
                go w.run()
            }
    
            if c.LivenessProbe != nil {
                key.probeType = liveness
                if _, ok := m.workers[key]; ok {
                    klog.Errorf("Liveness probe already exists! %v - %v",
                        format.Pod(pod), c.Name)
                    return
                }
                w := newWorker(m, liveness, pod, c)
                m.workers[key] = w
                go w.run()
            }
        }
    }
    

    pkg/kubelet/status/status_manager.go中

    func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager {
        return &manager{
            kubeClient:        kubeClient,
            podManager:        podManager,
            podStatuses:       make(map[types.UID]versionedPodStatus),
            podStatusChannel:  make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
            apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
            podDeletionSafety: podDeletionSafety,
        }
    }
    func (m *manager) Start() {
        // Don't start the status manager if we don't have a client. This will happen
        // on the master, where the kubelet is responsible for bootstrapping the pods
        // of the master components.
        if m.kubeClient == nil {
            klog.Infof("Kubernetes client is nil, not starting status manager.")
            return
        }
    
        klog.Info("Starting to sync pod status with apiserver")
        //lint:ignore SA1015 Ticker can link since this is only called once and doesn't handle termination.
        syncTicker := time.Tick(syncPeriod)
        // syncPod and syncBatch share the same go routine to avoid sync races.
        go wait.Forever(func() {
            for {
                select {
                case syncRequest := <-m.podStatusChannel:
                    klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
                        syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
                    m.syncPod(syncRequest.podUID, syncRequest.status)
                case <-syncTicker:
                    klog.V(5).Infof("Status Manager: syncing batch")
                    // remove any entries in the status channel since the batch will handle them
                    for i := len(m.podStatusChannel); i > 0; i-- {
                        <-m.podStatusChannel
                    }
                    m.syncBatch()
                }
            }
        }, 0)
    }
    func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
        m.podStatusesLock.Lock()
        defer m.podStatusesLock.Unlock()
    
        pod, ok := m.podManager.GetPodByUID(podUID)
        if !ok {
            klog.V(4).Infof("Pod %q has been deleted, no need to update startup", string(podUID))
            return
        }
    
        oldStatus, found := m.podStatuses[pod.UID]
        if !found {
            klog.Warningf("Container startup changed before pod has synced: %q - %q",
                format.Pod(pod), containerID.String())
            return
        }
    
        // Find the container to update.
        containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
        if !ok {
            klog.Warningf("Container startup changed for unknown container: %q - %q",
                format.Pod(pod), containerID.String())
            return
        }
    
        if containerStatus.Started != nil && *containerStatus.Started == started {
            klog.V(4).Infof("Container startup unchanged (%v): %q - %q", started,
                format.Pod(pod), containerID.String())
            return
        }
    
        // Make sure we're not updating the cached version.
        status := *oldStatus.status.DeepCopy()
        containerStatus, _, _ = findContainerStatus(&status, containerID.String())
        containerStatus.Started = &started
    
        m.updateStatusInternal(pod, status, false)
    }
    
    func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {
        var oldStatus v1.PodStatus
        cachedStatus, isCached := m.podStatuses[pod.UID]
        if isCached {
            oldStatus = cachedStatus.status
        } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
            oldStatus = mirrorPod.Status
        } else {
            oldStatus = pod.Status
        }
    
        // Check for illegal state transition in containers
        if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
            klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
            return false
        }
        if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
            klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
            return false
        }
    
        // Set ContainersReadyCondition.LastTransitionTime.
        updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)
    
        // Set ReadyCondition.LastTransitionTime.
        updateLastTransitionTime(&status, &oldStatus, v1.PodReady)
    
        // Set InitializedCondition.LastTransitionTime.
        updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)
    
        // Set PodScheduledCondition.LastTransitionTime.
        updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)
    
        // ensure that the start time does not change across updates.
        if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
            status.StartTime = oldStatus.StartTime
        } else if status.StartTime.IsZero() {
            // if the status has no start time, we need to set an initial time
            now := metav1.Now()
            status.StartTime = &now
        }
    
        normalizeStatus(pod, &status)
        // The intent here is to prevent concurrent updates to a pod's status from
        // clobbering each other so the phase of a pod progresses monotonically.
        if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
            klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
            return false // No new status.
        }
    
        newStatus := versionedPodStatus{
            status:       status,
            version:      cachedStatus.version + 1,
            podName:      pod.Name,
            podNamespace: pod.Namespace,
        }
        m.podStatuses[pod.UID] = newStatus
    
        select {
        case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
            klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
                pod.UID, newStatus.version, newStatus.status)
            return true
        default:
            // Let the periodic syncBatch handle the update if the channel is full.
            // We can't block, since we hold the mutex lock.
            klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
                format.Pod(pod), status)
            return false
        }
    }
    
    func (m *manager) syncBatch() {
        var updatedStatuses []podStatusSyncRequest
        podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
        func() { // Critical section
            m.podStatusesLock.RLock()
            defer m.podStatusesLock.RUnlock()
    
            // Clean up orphaned versions.
            for uid := range m.apiStatusVersions {
                _, hasPod := m.podStatuses[types.UID(uid)]
                _, hasMirror := mirrorToPod[uid]
                if !hasPod && !hasMirror {
                    delete(m.apiStatusVersions, uid)
                }
            }
    
            for uid, status := range m.podStatuses {
                syncedUID := kubetypes.MirrorPodUID(uid)
                if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
                    if mirrorUID == "" {
                        klog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
                        continue
                    }
                    syncedUID = mirrorUID
                }
                if m.needsUpdate(types.UID(syncedUID), status) {
                    updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
                } else if m.needsReconcile(uid, status.status) {
                    // Delete the apiStatusVersions here to force an update on the pod status
                    // In most cases the deleted apiStatusVersions here should be filled
                    // soon after the following syncPod() [If the syncPod() sync an update
                    // successfully].
                    delete(m.apiStatusVersions, syncedUID)
                    updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
                }
            }
        }()
    
        for _, update := range updatedStatuses {
            klog.V(5).Infof("Status Manager: syncPod in syncbatch. pod UID: %q", update.podUID)
            m.syncPod(update.podUID, update.status)
        }
    }
    
    func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
        if !m.needsUpdate(uid, status) {
            klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
            return
        }
    
        // TODO: make me easier to express from client code
        pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
        if errors.IsNotFound(err) {
            klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid))
            // If the Pod is deleted the status will be cleared in
            // RemoveOrphanedStatuses, so we just ignore the update here.
            return
        }
        if err != nil {
            klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
            return
        }
    
        translatedUID := m.podManager.TranslatePodUID(pod.UID)
        // Type convert original uid just for the purpose of comparison.
        if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
            klog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
            m.deletePodStatus(uid)
            return
        }
    
        oldStatus := pod.Status.DeepCopy()
        newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
        klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
        if err != nil {
            klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
            return
        }
        if unchanged {
            klog.V(3).Infof("Status for pod %q is up-to-date: (%d)", format.Pod(pod), status.version)
        } else {
            klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
            pod = newPod
        }
    
        m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
    
        // We don't handle graceful deletion of mirror pods.
        if m.canBeDeleted(pod, status.status) {
            deleteOptions := metav1.DeleteOptions{
                GracePeriodSeconds: new(int64),
                // Use the pod UID as the precondition for deletion to prevent deleting a
                // newly created pod with the same name and namespace.
                Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
            }
            err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
            if err != nil {
                klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
                return
            }
            klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
            m.deletePodStatus(uid)
        }
    }
    
    

    pkg/kubelet/prober/worker.go中

    func newWorker(
        m *manager,
        probeType probeType,
        pod *v1.Pod,
        container v1.Container) *worker {
    
        w := &worker{
            stopCh:       make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
            pod:          pod,
            container:    container,
            probeType:    probeType,
            probeManager: m,
        }
    
        switch probeType {
        case readiness:
            w.spec = container.ReadinessProbe
            w.resultsManager = m.readinessManager
            w.initialValue = results.Failure
        case liveness:
            w.spec = container.LivenessProbe
            w.resultsManager = m.livenessManager
            w.initialValue = results.Success
        case startup:
            w.spec = container.StartupProbe
            w.resultsManager = m.startupManager
            w.initialValue = results.Unknown
        }
    
        basicMetricLabels := metrics.Labels{
            "probe_type": w.probeType.String(),
            "container":  w.container.Name,
            "pod":        w.pod.Name,
            "namespace":  w.pod.Namespace,
            "pod_uid":    string(w.pod.UID),
        }
    
        w.proberResultsSuccessfulMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
        w.proberResultsSuccessfulMetricLabels["result"] = probeResultSuccessful
    
        w.proberResultsFailedMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
        w.proberResultsFailedMetricLabels["result"] = probeResultFailed
    
        w.proberResultsUnknownMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
        w.proberResultsUnknownMetricLabels["result"] = probeResultUnknown
    
        return w
    }
    
    func (w *worker) run() {
        probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
    
        // If kubelet restarted the probes could be started in rapid succession.
        // Let the worker wait for a random portion of tickerPeriod before probing.
        time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
    
        probeTicker := time.NewTicker(probeTickerPeriod)
    
        defer func() {
            // Clean up.
            probeTicker.Stop()
            if !w.containerID.IsEmpty() {
                w.resultsManager.Remove(w.containerID)
            }
    
            w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
            ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
            ProberResults.Delete(w.proberResultsFailedMetricLabels)
            ProberResults.Delete(w.proberResultsUnknownMetricLabels)
        }()
    
    probeLoop:
        for w.doProbe() {
            // Wait for next probe tick.
            select {
            case <-w.stopCh:
                break probeLoop
            case <-probeTicker.C:
                // continue
            }
        }
    }
    
    func (w *worker) doProbe() (keepGoing bool) {
        defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
        defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
    
        status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
        if !ok {
            // Either the pod has not been created yet, or it was already deleted.
            klog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
            return true
        }
    
        // Worker should terminate if pod is terminated.
        if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
            klog.V(3).Infof("Pod %v %v, exiting probe worker",
                format.Pod(w.pod), status.Phase)
            return false
        }
    
        c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
        if !ok || len(c.ContainerID) == 0 {
            // Either the container has not been created yet, or it was deleted.
            klog.V(3).Infof("Probe target container not found: %v - %v",
                format.Pod(w.pod), w.container.Name)
            return true // Wait for more information.
        }
    
        if w.containerID.String() != c.ContainerID {
            if !w.containerID.IsEmpty() {
                w.resultsManager.Remove(w.containerID)
            }
            w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
            w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
            // We've got a new container; resume probing.
            w.onHold = false
        }
    
        if w.onHold {
            // Worker is on hold until there is a new container.
            return true
        }
    
        if c.State.Running == nil {
            klog.V(3).Infof("Non-running container probed: %v - %v",
                format.Pod(w.pod), w.container.Name)
            if !w.containerID.IsEmpty() {
                w.resultsManager.Set(w.containerID, results.Failure, w.pod)
            }
            // Abort if the container will not be restarted.
            return c.State.Terminated == nil ||
                w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
        }
    
        // Probe disabled for InitialDelaySeconds.
        if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
            return true
        }
    
        if c.Started != nil && *c.Started {
            // Stop probing for startup once container has started.
            if w.probeType == startup {
                return true
            }
        } else {
            // Disable other probes until container has started.
            if w.probeType != startup {
                return true
            }
        }
    
        // TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct
        // the full container environment here, OR we must make a call to the CRI in order to get those environment
        // values from the running container.
        result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
        if err != nil {
            // Prober error, throw away the result.
            return true
        }
    
        switch result {
        case results.Success:
            ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
        case results.Failure:
            ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
        default:
            ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
        }
    
        if w.lastResult == result {
            w.resultRun++
        } else {
            w.lastResult = result
            w.resultRun = 1
        }
    
        if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
            (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
            // Success or failure is below threshold - leave the probe state unchanged.
            return true
        }
    
        w.resultsManager.Set(w.containerID, result, w.pod)
    
        if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
            // The container fails a liveness/startup check, it will need to be restarted.
            // Stop probing until we see a new container ID. This is to reduce the
            // chance of hitting #21751, where running `docker exec` when a
            // container is being stopped may lead to corrupted container state.
            w.onHold = true
            w.resultRun = 0
        }
    
        return true
    }
    

    pkg/kubelet/kuberuntime/kuberuntime_manager.go中

    func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
        // Step 1: Compute sandbox and container changes.
        podContainerChanges := m.computePodActions(pod, podStatus)
        klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
        if podContainerChanges.CreateSandbox {
            ref, err := ref.GetReference(legacyscheme.Scheme, pod)
            if err != nil {
                klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
            }
            if podContainerChanges.SandboxID != "" {
                m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
            } else {
                klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
            }
        }
    
        // Step 2: Kill the pod if the sandbox has changed.
        if podContainerChanges.KillPod {
            if podContainerChanges.CreateSandbox {
                klog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
            } else {
                klog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
            }
    
            killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
            result.AddPodSyncResult(killResult)
            if killResult.Error() != nil {
                klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
                return
            }
    
            if podContainerChanges.CreateSandbox {
                m.purgeInitContainers(pod, podStatus)
            }
        } else {
            // Step 3: kill any running containers in this pod which are not to keep.
            for containerID, containerInfo := range podContainerChanges.ContainersToKill {
                klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
                killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
                result.AddSyncResult(killContainerResult)
                if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
                    killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
                    klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
                    return
                }
            }
        }
    
        // Keep terminated init containers fairly aggressively controlled
        // This is an optimization because container removals are typically handled
        // by container garbage collector.
        m.pruneInitContainersBeforeStart(pod, podStatus)
    
        // We pass the value of the PRIMARY podIP and list of podIPs down to
        // generatePodSandboxConfig and generateContainerConfig, which in turn
        // passes it to various other functions, in order to facilitate functionality
        // that requires this value (hosts file and downward API) and avoid races determining
        // the pod IP in cases where a container requires restart but the
        // podIP isn't in the status manager yet. The list of podIPs is used to
        // generate the hosts file.
        //
        // We default to the IPs in the passed-in pod status, and overwrite them if the
        // sandbox needs to be (re)started.
        var podIPs []string
        if podStatus != nil {
            podIPs = podStatus.IPs
        }
    
        // Step 4: Create a sandbox for the pod if necessary.
        podSandboxID := podContainerChanges.SandboxID
        if podContainerChanges.CreateSandbox {
            var msg string
            var err error
    
            klog.V(4).Infof("Creating PodSandbox for pod %q", format.Pod(pod))
            createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
            result.AddSyncResult(createSandboxResult)
            podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
            if err != nil {
                createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
                klog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
                ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
                if referr != nil {
                    klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
                }
                m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)
                return
            }
            klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))
    
            podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
            if err != nil {
                ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
                if referr != nil {
                    klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
                }
                m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
                klog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
                result.Fail(err)
                return
            }
    
            // If we ever allow updating a pod from non-host-network to
            // host-network, we may use a stale IP.
            if !kubecontainer.IsHostNetworkPod(pod) {
                // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
                podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus)
                klog.V(4).Infof("Determined the ip %v for pod %q after sandbox changed", podIPs, format.Pod(pod))
            }
        }
    
        // the start containers routines depend on pod ip(as in primary pod ip)
        // instead of trying to figure out if we have 0 < len(podIPs)
        // everytime, we short circuit it here
        podIP := ""
        if len(podIPs) != 0 {
            podIP = podIPs[0]
        }
    
        // Get podSandboxConfig for containers to start.
        configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
        result.AddSyncResult(configPodSandboxResult)
        podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
        if err != nil {
            message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
            klog.Error(message)
            configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
            return
        }
    
        // Helper containing boilerplate common to starting all types of containers.
        // typeName is a label used to describe this type of container in log messages,
        // currently: "container", "init container" or "ephemeral container"
        start := func(typeName string, spec *startSpec) error {
            startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
            result.AddSyncResult(startContainerResult)
    
            isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
            if isInBackOff {
                startContainerResult.Fail(err, msg)
                klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
                return err
            }
    
            klog.V(4).Infof("Creating %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
            // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
            if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
                startContainerResult.Fail(err, msg)
                // known errors that are logged in other places are logged at higher levels here to avoid
                // repetitive log spam
                switch {
                case err == images.ErrImagePullBackOff:
                    klog.V(3).Infof("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg)
                default:
                    utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
                }
                return err
            }
    
            return nil
        }
    
        // Step 5: start ephemeral containers
        // These are started "prior" to init containers to allow running ephemeral containers even when there
        // are errors starting an init container. In practice init containers will start first since ephemeral
        // containers cannot be specified on pod creation.
        if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
            for _, idx := range podContainerChanges.EphemeralContainersToStart {
                start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
            }
        }
    
        // Step 6: start the init container.
        if container := podContainerChanges.NextInitContainerToStart; container != nil {
            // Start the next init container.
            if err := start("init container", containerStartSpec(container)); err != nil {
                return
            }
    
            // Successfully started the container; clear the entry in the failure
            klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
        }
    
        // Step 7: start containers in podContainerChanges.ContainersToStart.
        for _, idx := range podContainerChanges.ContainersToStart {
            start("container", containerStartSpec(&pod.Spec.Containers[idx]))
        }
    
        return
    }
    

    相关文章

      网友评论

          本文标题:k8s 之 kubelet 源码简单分析

          本文链接:https://www.haomeiwen.com/subject/tfhmhktx.html