美文网首页
configmap更新后volume更新问题

configmap更新后volume更新问题

作者: wwq2020 | 来源:发表于2022-01-05 17:04 被阅读0次

    简单总结

    kubelet对于成功sync的pod会重新添加回工作队列(SyncFrequency+SyncFrequency*随机小数(0-1))) (SyncFrequency默认1分钟)
    kubelet每隔1秒会获取需要同步的pod,分发到podworker
    desiredStateOfWorldPopulator会定期获取之前没有处理过的pod进行处理(0.1秒)
    podworker会把desiredStateOfWorldPopulator中pod设置为未处理过

    也就是说,configmap更新后,需要等待约SyncFrequency+SyncFrequency*随机小数(0-1)+0.1秒+1秒

    相关代码

    pkg/kubelet/apis/config/v1beta1/zz_generated.defaults.go中

    func RegisterDefaults(scheme *runtime.Scheme) error {
        scheme.AddTypeDefaultingFunc(&v1beta1.KubeletConfiguration{}, func(obj interface{}) { SetObjectDefaults_KubeletConfiguration(obj.(*v1beta1.KubeletConfiguration)) })
        return nil
    }
    
    func SetObjectDefaults_KubeletConfiguration(in *v1beta1.KubeletConfiguration) {
        SetDefaults_KubeletConfiguration(in)
        for i := range in.ReservedMemory {
            a := &in.ReservedMemory[i]
            v1.SetDefaults_ResourceList(&a.Limits)
        }
    }
    
    

    pkg/kubelet/apis/config/v1beta1/defaults.go中

    func addDefaultingFuncs(scheme *kruntime.Scheme) error {
        return RegisterDefaults(scheme)
    }
    
    func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfiguration) {
        if obj.EnableServer == nil {
            obj.EnableServer = utilpointer.BoolPtr(true)
        }
        if obj.SyncFrequency == zeroDuration {
            obj.SyncFrequency = metav1.Duration{Duration: 1 * time.Minute}
        }
        if obj.FileCheckFrequency == zeroDuration {
            obj.FileCheckFrequency = metav1.Duration{Duration: 20 * time.Second}
        }
        if obj.HTTPCheckFrequency == zeroDuration {
            obj.HTTPCheckFrequency = metav1.Duration{Duration: 20 * time.Second}
        }
        if obj.Address == "" {
            obj.Address = "0.0.0.0"
        }
        if obj.Port == 0 {
            obj.Port = ports.KubeletPort
        }
        if obj.Authentication.Anonymous.Enabled == nil {
            obj.Authentication.Anonymous.Enabled = utilpointer.BoolPtr(false)
        }
        if obj.Authentication.Webhook.Enabled == nil {
            obj.Authentication.Webhook.Enabled = utilpointer.BoolPtr(true)
        }
        if obj.Authentication.Webhook.CacheTTL == zeroDuration {
            obj.Authentication.Webhook.CacheTTL = metav1.Duration{Duration: 2 * time.Minute}
        }
        if obj.Authorization.Mode == "" {
            obj.Authorization.Mode = kubeletconfigv1beta1.KubeletAuthorizationModeWebhook
        }
        if obj.Authorization.Webhook.CacheAuthorizedTTL == zeroDuration {
            obj.Authorization.Webhook.CacheAuthorizedTTL = metav1.Duration{Duration: 5 * time.Minute}
        }
        if obj.Authorization.Webhook.CacheUnauthorizedTTL == zeroDuration {
            obj.Authorization.Webhook.CacheUnauthorizedTTL = metav1.Duration{Duration: 30 * time.Second}
        }
        if obj.RegistryPullQPS == nil {
            obj.RegistryPullQPS = utilpointer.Int32Ptr(5)
        }
        if obj.RegistryBurst == 0 {
            obj.RegistryBurst = 10
        }
        if obj.EventRecordQPS == nil {
            obj.EventRecordQPS = utilpointer.Int32Ptr(5)
        }
        if obj.EventBurst == 0 {
            obj.EventBurst = 10
        }
        if obj.EnableDebuggingHandlers == nil {
            obj.EnableDebuggingHandlers = utilpointer.BoolPtr(true)
        }
        if obj.HealthzPort == nil {
            obj.HealthzPort = utilpointer.Int32Ptr(10248)
        }
        if obj.HealthzBindAddress == "" {
            obj.HealthzBindAddress = "127.0.0.1"
        }
        if obj.OOMScoreAdj == nil {
            obj.OOMScoreAdj = utilpointer.Int32Ptr(int32(qos.KubeletOOMScoreAdj))
        }
        if obj.StreamingConnectionIdleTimeout == zeroDuration {
            obj.StreamingConnectionIdleTimeout = metav1.Duration{Duration: 4 * time.Hour}
        }
        if obj.NodeStatusReportFrequency == zeroDuration {
            // For backward compatibility, NodeStatusReportFrequency's default value is
            // set to NodeStatusUpdateFrequency if NodeStatusUpdateFrequency is set
            // explicitly.
            if obj.NodeStatusUpdateFrequency == zeroDuration {
                obj.NodeStatusReportFrequency = metav1.Duration{Duration: 5 * time.Minute}
            } else {
                obj.NodeStatusReportFrequency = obj.NodeStatusUpdateFrequency
            }
        }
        if obj.NodeStatusUpdateFrequency == zeroDuration {
            obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
        }
        if obj.NodeLeaseDurationSeconds == 0 {
            obj.NodeLeaseDurationSeconds = 40
        }
        if obj.ImageMinimumGCAge == zeroDuration {
            obj.ImageMinimumGCAge = metav1.Duration{Duration: 2 * time.Minute}
        }
        if obj.ImageGCHighThresholdPercent == nil {
            // default is below docker's default dm.min_free_space of 90%
            obj.ImageGCHighThresholdPercent = utilpointer.Int32Ptr(85)
        }
        if obj.ImageGCLowThresholdPercent == nil {
            obj.ImageGCLowThresholdPercent = utilpointer.Int32Ptr(80)
        }
        if obj.VolumeStatsAggPeriod == zeroDuration {
            obj.VolumeStatsAggPeriod = metav1.Duration{Duration: time.Minute}
        }
        if obj.CgroupsPerQOS == nil {
            obj.CgroupsPerQOS = utilpointer.BoolPtr(true)
        }
        if obj.CgroupDriver == "" {
            obj.CgroupDriver = "cgroupfs"
        }
        if obj.CPUManagerPolicy == "" {
            obj.CPUManagerPolicy = "none"
        }
        if obj.CPUManagerReconcilePeriod == zeroDuration {
            // Keep the same as default NodeStatusUpdateFrequency
            obj.CPUManagerReconcilePeriod = metav1.Duration{Duration: 10 * time.Second}
        }
        if obj.MemoryManagerPolicy == "" {
            obj.MemoryManagerPolicy = kubeletconfigv1beta1.NoneMemoryManagerPolicy
        }
        if obj.TopologyManagerPolicy == "" {
            obj.TopologyManagerPolicy = kubeletconfigv1beta1.NoneTopologyManagerPolicy
        }
        if obj.TopologyManagerScope == "" {
            obj.TopologyManagerScope = kubeletconfigv1beta1.ContainerTopologyManagerScope
        }
        if obj.RuntimeRequestTimeout == zeroDuration {
            obj.RuntimeRequestTimeout = metav1.Duration{Duration: 2 * time.Minute}
        }
        if obj.HairpinMode == "" {
            obj.HairpinMode = kubeletconfigv1beta1.PromiscuousBridge
        }
        if obj.MaxPods == 0 {
            obj.MaxPods = 110
        }
        // default nil or negative value to -1 (implies node allocatable pid limit)
        if obj.PodPidsLimit == nil || *obj.PodPidsLimit < int64(0) {
            obj.PodPidsLimit = utilpointer.Int64(-1)
        }
    
        if obj.ResolverConfig == nil {
            obj.ResolverConfig = utilpointer.String(kubetypes.ResolvConfDefault)
        }
        if obj.CPUCFSQuota == nil {
            obj.CPUCFSQuota = utilpointer.BoolPtr(true)
        }
        if obj.CPUCFSQuotaPeriod == nil {
            obj.CPUCFSQuotaPeriod = &metav1.Duration{Duration: 100 * time.Millisecond}
        }
        if obj.NodeStatusMaxImages == nil {
            obj.NodeStatusMaxImages = utilpointer.Int32Ptr(50)
        }
        if obj.MaxOpenFiles == 0 {
            obj.MaxOpenFiles = 1000000
        }
        if obj.ContentType == "" {
            obj.ContentType = "application/vnd.kubernetes.protobuf"
        }
        if obj.KubeAPIQPS == nil {
            obj.KubeAPIQPS = utilpointer.Int32Ptr(5)
        }
        if obj.KubeAPIBurst == 0 {
            obj.KubeAPIBurst = 10
        }
        if obj.SerializeImagePulls == nil {
            obj.SerializeImagePulls = utilpointer.BoolPtr(true)
        }
        if obj.EvictionHard == nil {
            obj.EvictionHard = DefaultEvictionHard
        }
        if obj.EvictionPressureTransitionPeriod == zeroDuration {
            obj.EvictionPressureTransitionPeriod = metav1.Duration{Duration: 5 * time.Minute}
        }
        if obj.EnableControllerAttachDetach == nil {
            obj.EnableControllerAttachDetach = utilpointer.BoolPtr(true)
        }
        if obj.MakeIPTablesUtilChains == nil {
            obj.MakeIPTablesUtilChains = utilpointer.BoolPtr(true)
        }
        if obj.IPTablesMasqueradeBit == nil {
            obj.IPTablesMasqueradeBit = utilpointer.Int32Ptr(DefaultIPTablesMasqueradeBit)
        }
        if obj.IPTablesDropBit == nil {
            obj.IPTablesDropBit = utilpointer.Int32Ptr(DefaultIPTablesDropBit)
        }
        if obj.FailSwapOn == nil {
            obj.FailSwapOn = utilpointer.BoolPtr(true)
        }
        if obj.ContainerLogMaxSize == "" {
            obj.ContainerLogMaxSize = "10Mi"
        }
        if obj.ContainerLogMaxFiles == nil {
            obj.ContainerLogMaxFiles = utilpointer.Int32Ptr(5)
        }
        if obj.ConfigMapAndSecretChangeDetectionStrategy == "" {
            obj.ConfigMapAndSecretChangeDetectionStrategy = kubeletconfigv1beta1.WatchChangeDetectionStrategy
        }
        if obj.EnforceNodeAllocatable == nil {
            obj.EnforceNodeAllocatable = DefaultNodeAllocatableEnforcement
        }
        if obj.VolumePluginDir == "" {
            obj.VolumePluginDir = DefaultVolumePluginDir
        }
        // Use the Default LoggingConfiguration option
        componentbaseconfigv1alpha1.RecommendedLoggingConfiguration(&obj.Logging)
        if obj.EnableSystemLogHandler == nil {
            obj.EnableSystemLogHandler = utilpointer.BoolPtr(true)
        }
        if obj.EnableProfilingHandler == nil {
            obj.EnableProfilingHandler = utilpointer.BoolPtr(true)
        }
        if obj.EnableDebugFlagsHandler == nil {
            obj.EnableDebugFlagsHandler = utilpointer.BoolPtr(true)
        }
        if obj.SeccompDefault == nil {
            obj.SeccompDefault = utilpointer.BoolPtr(false)
        }
        if obj.MemoryThrottlingFactor == nil {
            obj.MemoryThrottlingFactor = utilpointer.Float64Ptr(DefaultMemoryThrottlingFactor)
        }
    }
    

    pkg/kubelet/apis/config/v1beta1/register.go中

    func init() {
        localSchemeBuilder.Register(addDefaultingFuncs)
    }
    

    pkg/kubelet/apis/config/scheme/scheme.go中

    func NewSchemeAndCodecs(mutators ...serializer.CodecFactoryOptionsMutator) (*runtime.Scheme, *serializer.CodecFactory, error) {
        scheme := runtime.NewScheme()
        if err := kubeletconfig.AddToScheme(scheme); err != nil {
            return nil, nil, err
        }
        if err := kubeletconfigv1beta1.AddToScheme(scheme); err != nil {
            return nil, nil, err
        }
        codecs := serializer.NewCodecFactory(scheme, mutators...)
        return scheme, &codecs, nil
    }
    

    cmd/kubelet/app/options/options.go中

    func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfiguration) {
        ...
        fs.DurationVar(&c.SyncFrequency.Duration, "sync-frequency", c.SyncFrequency.Duration, "Max period between synchronizing running containers and config")
        ...
    }
    
    func NewKubeletConfiguration() (*kubeletconfig.KubeletConfiguration, error) {
        scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
        if err != nil {
            return nil, err
        }
        versioned := &v1beta1.KubeletConfiguration{}
        scheme.Default(versioned)
        config := &kubeletconfig.KubeletConfiguration{}
        if err := scheme.Convert(versioned, config, nil); err != nil {
            return nil, err
        }
        applyLegacyDefaults(config)
        return config, nil
    }
    

    cmd/kubelet/app/server.go中

    func NewKubeletCommand() *cobra.Command {
        ...
        kubeletConfig, err := options.NewKubeletConfiguration()
        ...
        if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
            ...
        }
        ...
        options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
    }
    
    func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
        logOption := &logs.Options{Config: s.Logging}
        logOption.Apply()
        // To help debugging, immediately log version
        klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
        if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
            return fmt.Errorf("failed OS init: %w", err)
        }
        if err := run(ctx, s, kubeDeps, featureGate); err != nil {
            return fmt.Errorf("failed to run Kubelet: %w", err)
        }
        return nil
    }
    
    func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
    
        ...
        if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
            return err
        }
        ...
    }
    
    func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
        ...
        k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
            kubeDeps,
            &kubeServer.ContainerRuntimeOptions,
            kubeServer.ContainerRuntime,
            hostname,
            hostnameOverridden,
            nodeName,
            nodeIPs,
            kubeServer.ProviderID,
            kubeServer.CloudProvider,
            kubeServer.CertDirectory,
            kubeServer.RootDirectory,
            kubeServer.ImageCredentialProviderConfigFile,
            kubeServer.ImageCredentialProviderBinDir,
            kubeServer.RegisterNode,
            kubeServer.RegisterWithTaints,
            kubeServer.AllowedUnsafeSysctls,
            kubeServer.ExperimentalMounterPath,
            kubeServer.KernelMemcgNotification,
            kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
            kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
            kubeServer.MinimumGCAge,
            kubeServer.MaxPerPodContainerCount,
            kubeServer.MaxContainerCount,
            kubeServer.MasterServiceNamespace,
            kubeServer.RegisterSchedulable,
            kubeServer.KeepTerminatedPodVolumes,
            kubeServer.NodeLabels,
            kubeServer.NodeStatusMaxImages,
            kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault,
        )
        ...
    }
    func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        kubeDeps *kubelet.Dependencies,
        crOptions *config.ContainerRuntimeOptions,
        containerRuntime string,
        hostname string,
        hostnameOverridden bool,
        nodeName types.NodeName,
        nodeIPs []net.IP,
        providerID string,
        cloudProvider string,
        certDirectory string,
        rootDirectory string,
        imageCredentialProviderConfigFile string,
        imageCredentialProviderBinDir string,
        registerNode bool,
        registerWithTaints []api.Taint,
        allowedUnsafeSysctls []string,
        experimentalMounterPath string,
        kernelMemcgNotification bool,
        experimentalCheckNodeCapabilitiesBeforeMount bool,
        experimentalNodeAllocatableIgnoreEvictionThreshold bool,
        minimumGCAge metav1.Duration,
        maxPerPodContainerCount int32,
        maxContainerCount int32,
        masterServiceNamespace string,
        registerSchedulable bool,
        keepTerminatedPodVolumes bool,
        nodeLabels map[string]string,
        nodeStatusMaxImages int32,
        seccompDefault bool,
    ) (k kubelet.Bootstrap, err error) {
        ...
        k, err = kubelet.NewMainKubelet(kubeCfg,
            kubeDeps,
            crOptions,
            containerRuntime,
            hostname,
            hostnameOverridden,
            nodeName,
            nodeIPs,
            providerID,
            cloudProvider,
            certDirectory,
            rootDirectory,
            imageCredentialProviderConfigFile,
            imageCredentialProviderBinDir,
            registerNode,
            registerWithTaints,
            allowedUnsafeSysctls,
            experimentalMounterPath,
            kernelMemcgNotification,
            experimentalCheckNodeCapabilitiesBeforeMount,
            experimentalNodeAllocatableIgnoreEvictionThreshold,
            minimumGCAge,
            maxPerPodContainerCount,
            maxContainerCount,
            masterServiceNamespace,
            registerSchedulable,
            keepTerminatedPodVolumes,
            nodeLabels,
            nodeStatusMaxImages,
            seccompDefault,
        )
        ...
    }
    

    pkg/kubelet/kubelet.go中

    func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        kubeDeps *Dependencies,
        crOptions *config.ContainerRuntimeOptions,
        containerRuntime string,
        hostname string,
        hostnameOverridden bool,
        nodeName types.NodeName,
        nodeIPs []net.IP,
        providerID string,
        cloudProvider string,
        certDirectory string,
        rootDirectory string,
        imageCredentialProviderConfigFile string,
        imageCredentialProviderBinDir string,
        registerNode bool,
        registerWithTaints []api.Taint,
        allowedUnsafeSysctls []string,
        experimentalMounterPath string,
        kernelMemcgNotification bool,
        experimentalCheckNodeCapabilitiesBeforeMount bool,
        experimentalNodeAllocatableIgnoreEvictionThreshold bool,
        minimumGCAge metav1.Duration,
        maxPerPodContainerCount int32,
        maxContainerCount int32,
        masterServiceNamespace string,
        registerSchedulable bool,
        keepTerminatedPodVolumes bool,
        nodeLabels map[string]string,
        nodeStatusMaxImages int32,
        seccompDefault bool,
    ) (*Kubelet, error) {
        ...
        klet := &Kubelet{
            ...
            resyncInterval:                          kubeCfg.SyncFrequency.Duration,
            ...
        }
        ...
        activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
        if err != nil {
            return nil, err
        }
        klet.AddPodSyncLoopHandler(activeDeadlineHandler)
        ...
        klet.podWorkers = newPodWorkers(
            klet.syncPod,
            klet.syncTerminatingPod,
            klet.syncTerminatedPod,
    
            kubeDeps.Recorder,
            klet.workQueue,
            klet.resyncInterval,
            backOffPeriod,
            klet.podCache,
        )
        ...
        klet.volumeManager = volumemanager.NewVolumeManager(
            kubeCfg.EnableControllerAttachDetach,
            nodeName,
            klet.podManager,
            klet.podWorkers,
            klet.kubeClient,
            klet.volumePluginMgr,
            klet.containerRuntime,
            kubeDeps.Mounter,
            kubeDeps.HostUtil,
            klet.getPodsDir(),
            kubeDeps.Recorder,
            experimentalCheckNodeCapabilitiesBeforeMount,
            keepTerminatedPodVolumes,
            volumepathhandler.NewBlockVolumePathHandler())
    }
    
    func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
        ...
        if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
            ...
        }
        ...
    }
    
    func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
        ...
        go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
        ...
        kl.syncLoop(updates, kl)
    }
    
    func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
        ...
        syncTicker := time.NewTicker(time.Second)
        ...
        for {
            ...
            if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
                break
            }
            ...
        }
    }
    
    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
        syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
        select {
        case <-syncCh:
            podsToSync := kl.getPodsToSync()
            if len(podsToSync) == 0 {
                break
            }
            handler.HandlePodSyncs(podsToSync)
        }
    }
    
    func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
        start := kl.clock.Now()
        for _, pod := range pods {
            mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
            kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
        }
    }
    
    func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
        // Run the sync in an async worker.
        kl.podWorkers.UpdatePod(UpdatePodOptions{
            Pod:        pod,
            MirrorPod:  mirrorPod,
            UpdateType: syncType,
            StartTime:  start,
        })
        // Note the number of containers for new pods.
        if syncType == kubetypes.SyncPodCreate {
            metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
        }
    }
    
    func (kl *Kubelet) getPodsToSync() []*v1.Pod {
        allPods := kl.podManager.GetPods()
        podUIDs := kl.workQueue.GetWork()
        podUIDSet := sets.NewString()
        for _, podUID := range podUIDs {
            podUIDSet.Insert(string(podUID))
        }
        var podsToSync []*v1.Pod
        for _, pod := range allPods {
            if podUIDSet.Has(string(pod.UID)) {
                // The work of the pod is ready
                podsToSync = append(podsToSync, pod)
                continue
            }
            for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
                if podSyncLoopHandler.ShouldSync(pod) {
                    podsToSync = append(podsToSync, pod)
                    break
                }
            }
        }
        return podsToSync
    }
    

    pkg/kubelet/active_deadline.go中

    func newActiveDeadlineHandler(
        podStatusProvider status.PodStatusProvider,
        recorder record.EventRecorder,
        clock clock.Clock,
    ) (*activeDeadlineHandler, error) {
    
        // check for all required fields
        if clock == nil || podStatusProvider == nil || recorder == nil {
            return nil, fmt.Errorf("required arguments must not be nil: %v, %v, %v", clock, podStatusProvider, recorder)
        }
        return &activeDeadlineHandler{
            clock:             clock,
            podStatusProvider: podStatusProvider,
            recorder:          recorder,
        }, nil
    }
    
    func (m *activeDeadlineHandler) ShouldSync(pod *v1.Pod) bool {
        return m.pastActiveDeadline(pod)
    }
    
    func (m *activeDeadlineHandler) pastActiveDeadline(pod *v1.Pod) bool {
        // no active deadline was specified
        if pod.Spec.ActiveDeadlineSeconds == nil {
            return false
        }
        // get the latest status to determine if it was started
        podStatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
        if !ok {
            podStatus = pod.Status
        }
        // we have no start time so just return
        if podStatus.StartTime.IsZero() {
            return false
        }
        // determine if the deadline was exceeded
        start := podStatus.StartTime.Time
        duration := m.clock.Since(start)
        allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
        return duration >= allowedDuration
    }
    
    func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
        ...
        p.managePodLoop(podUpdates)
        ...
    }
    
    func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
        ...
        err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
        ...
        p.completeWork(pod, err)
        ...
    }
    
    func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {
        ...
        p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
        ...
    }
    

    pkg/kubelet/volumemanager/volume_manager.go中

    const(
        ...
        desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond
        ...
    )
    
    func NewVolumeManager(
        controllerAttachDetachEnabled bool,
        nodeName k8stypes.NodeName,
        podManager pod.Manager,
        podStateProvider podStateProvider,
        kubeClient clientset.Interface,
        volumePluginMgr *volume.VolumePluginMgr,
        kubeContainerRuntime container.Runtime,
        mounter mount.Interface,
        hostutil hostutil.HostUtils,
        kubeletPodsDir string,
        recorder record.EventRecorder,
        checkNodeCapabilitiesBeforeMount bool,
        keepTerminatedPodVolumes bool,
        blockVolumePathHandler volumepathhandler.BlockVolumePathHandler) VolumeManager {
        ...
        vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
            kubeClient,
            desiredStateOfWorldPopulatorLoopSleepPeriod,
            desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
            podManager,
            podStateProvider,
            vm.desiredStateOfWorld,
            vm.actualStateOfWorld,
            kubeContainerRuntime,
            keepTerminatedPodVolumes,
            csiMigratedPluginManager,
            intreeToCSITranslator,
            volumePluginMgr)
        ...
    }
    
    func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
        ...
        go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
        ...
    }
    
    func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
        if pod == nil {
            return nil
        }
    
        expectedVolumes := getExpectedVolumes(pod)
        if len(expectedVolumes) == 0 {
            // No volumes to verify
            return nil
        }
    
        klog.V(3).InfoS("Waiting for volumes to attach and mount for pod", "pod", klog.KObj(pod))
        uniquePodName := util.GetUniquePodName(pod)
    
        // Some pods expect to have Setup called over and over again to update.
        // Remount plugins for which this is true. (Atomically updating volumes,
        // like Downward API, depend on this to update the contents of the volume).
        vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
    
        err := wait.PollImmediate(
            podAttachAndMountRetryInterval,
            podAttachAndMountTimeout,
            vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
    
        if err != nil {
            unmountedVolumes :=
                vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
            // Also get unattached volumes for error message
            unattachedVolumes :=
                vm.getUnattachedVolumes(expectedVolumes)
    
            if len(unmountedVolumes) == 0 {
                return nil
            }
    
            return fmt.Errorf(
                "unmounted volumes=%v, unattached volumes=%v: %s",
                unmountedVolumes,
                unattachedVolumes,
                err)
        }
    
        klog.V(3).InfoS("All volumes are attached and mounted for pod", "pod", klog.KObj(pod))
        return nil
    }
    
    func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
        return func() (done bool, err error) {
            if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
                return true, errors.New(strings.Join(errs, "; "))
            }
            return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
        }
    }
    
    func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
        mountedVolumes := sets.NewString()
        for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
            mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
        }
        return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
    }
    

    pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go中

    func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
        ...
        wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
    }
    func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
        ...
        dswp.findAndAddNewPods()
        ...
    }
    
    func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
        ...
        dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
        ...
    }
    
    func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
        pod *v1.Pod,
        mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
        processedVolumesForFSResize sets.String) {
        ...
        if dswp.podPreviouslyProcessed(uniquePodName) {
            return
        }
        ...
    }
    
    func (dswp *desiredStateOfWorldPopulator) podPreviouslyProcessed(
        podName volumetypes.UniquePodName) bool {
        dswp.pods.RLock()
        defer dswp.pods.RUnlock()
    
        return dswp.pods.processedPods[podName]
    }
    
    func (dswp *desiredStateOfWorldPopulator) markPodProcessingFailed(
        podName volumetypes.UniquePodName) {
        dswp.pods.Lock()
        dswp.pods.processedPods[podName] = false
        dswp.pods.Unlock()
    }
    
    func (dswp *desiredStateOfWorldPopulator) ReprocessPod(
        podName volumetypes.UniquePodName) {
        dswp.markPodProcessingFailed(podName)
    }
    

    相关文章

      网友评论

          本文标题:configmap更新后volume更新问题

          本文链接:https://www.haomeiwen.com/subject/erhkcrtx.html