简单总结
kubelet对于成功sync的pod会重新添加回工作队列(SyncFrequency+SyncFrequency*随机小数(0-1))) (SyncFrequency默认1分钟)
kubelet每隔1秒会获取需要同步的pod,分发到podworker
desiredStateOfWorldPopulator会定期获取之前没有处理过的pod进行处理(0.1秒)
podworker会把desiredStateOfWorldPopulator中pod设置为未处理过
也就是说,configmap更新后,需要等待约SyncFrequency+SyncFrequency*随机小数(0-1)+0.1秒+1秒
相关代码
pkg/kubelet/apis/config/v1beta1/zz_generated.defaults.go中
func RegisterDefaults(scheme *runtime.Scheme) error {
scheme.AddTypeDefaultingFunc(&v1beta1.KubeletConfiguration{}, func(obj interface{}) { SetObjectDefaults_KubeletConfiguration(obj.(*v1beta1.KubeletConfiguration)) })
return nil
}
func SetObjectDefaults_KubeletConfiguration(in *v1beta1.KubeletConfiguration) {
SetDefaults_KubeletConfiguration(in)
for i := range in.ReservedMemory {
a := &in.ReservedMemory[i]
v1.SetDefaults_ResourceList(&a.Limits)
}
}
pkg/kubelet/apis/config/v1beta1/defaults.go中
func addDefaultingFuncs(scheme *kruntime.Scheme) error {
return RegisterDefaults(scheme)
}
func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfiguration) {
if obj.EnableServer == nil {
obj.EnableServer = utilpointer.BoolPtr(true)
}
if obj.SyncFrequency == zeroDuration {
obj.SyncFrequency = metav1.Duration{Duration: 1 * time.Minute}
}
if obj.FileCheckFrequency == zeroDuration {
obj.FileCheckFrequency = metav1.Duration{Duration: 20 * time.Second}
}
if obj.HTTPCheckFrequency == zeroDuration {
obj.HTTPCheckFrequency = metav1.Duration{Duration: 20 * time.Second}
}
if obj.Address == "" {
obj.Address = "0.0.0.0"
}
if obj.Port == 0 {
obj.Port = ports.KubeletPort
}
if obj.Authentication.Anonymous.Enabled == nil {
obj.Authentication.Anonymous.Enabled = utilpointer.BoolPtr(false)
}
if obj.Authentication.Webhook.Enabled == nil {
obj.Authentication.Webhook.Enabled = utilpointer.BoolPtr(true)
}
if obj.Authentication.Webhook.CacheTTL == zeroDuration {
obj.Authentication.Webhook.CacheTTL = metav1.Duration{Duration: 2 * time.Minute}
}
if obj.Authorization.Mode == "" {
obj.Authorization.Mode = kubeletconfigv1beta1.KubeletAuthorizationModeWebhook
}
if obj.Authorization.Webhook.CacheAuthorizedTTL == zeroDuration {
obj.Authorization.Webhook.CacheAuthorizedTTL = metav1.Duration{Duration: 5 * time.Minute}
}
if obj.Authorization.Webhook.CacheUnauthorizedTTL == zeroDuration {
obj.Authorization.Webhook.CacheUnauthorizedTTL = metav1.Duration{Duration: 30 * time.Second}
}
if obj.RegistryPullQPS == nil {
obj.RegistryPullQPS = utilpointer.Int32Ptr(5)
}
if obj.RegistryBurst == 0 {
obj.RegistryBurst = 10
}
if obj.EventRecordQPS == nil {
obj.EventRecordQPS = utilpointer.Int32Ptr(5)
}
if obj.EventBurst == 0 {
obj.EventBurst = 10
}
if obj.EnableDebuggingHandlers == nil {
obj.EnableDebuggingHandlers = utilpointer.BoolPtr(true)
}
if obj.HealthzPort == nil {
obj.HealthzPort = utilpointer.Int32Ptr(10248)
}
if obj.HealthzBindAddress == "" {
obj.HealthzBindAddress = "127.0.0.1"
}
if obj.OOMScoreAdj == nil {
obj.OOMScoreAdj = utilpointer.Int32Ptr(int32(qos.KubeletOOMScoreAdj))
}
if obj.StreamingConnectionIdleTimeout == zeroDuration {
obj.StreamingConnectionIdleTimeout = metav1.Duration{Duration: 4 * time.Hour}
}
if obj.NodeStatusReportFrequency == zeroDuration {
// For backward compatibility, NodeStatusReportFrequency's default value is
// set to NodeStatusUpdateFrequency if NodeStatusUpdateFrequency is set
// explicitly.
if obj.NodeStatusUpdateFrequency == zeroDuration {
obj.NodeStatusReportFrequency = metav1.Duration{Duration: 5 * time.Minute}
} else {
obj.NodeStatusReportFrequency = obj.NodeStatusUpdateFrequency
}
}
if obj.NodeStatusUpdateFrequency == zeroDuration {
obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
}
if obj.NodeLeaseDurationSeconds == 0 {
obj.NodeLeaseDurationSeconds = 40
}
if obj.ImageMinimumGCAge == zeroDuration {
obj.ImageMinimumGCAge = metav1.Duration{Duration: 2 * time.Minute}
}
if obj.ImageGCHighThresholdPercent == nil {
// default is below docker's default dm.min_free_space of 90%
obj.ImageGCHighThresholdPercent = utilpointer.Int32Ptr(85)
}
if obj.ImageGCLowThresholdPercent == nil {
obj.ImageGCLowThresholdPercent = utilpointer.Int32Ptr(80)
}
if obj.VolumeStatsAggPeriod == zeroDuration {
obj.VolumeStatsAggPeriod = metav1.Duration{Duration: time.Minute}
}
if obj.CgroupsPerQOS == nil {
obj.CgroupsPerQOS = utilpointer.BoolPtr(true)
}
if obj.CgroupDriver == "" {
obj.CgroupDriver = "cgroupfs"
}
if obj.CPUManagerPolicy == "" {
obj.CPUManagerPolicy = "none"
}
if obj.CPUManagerReconcilePeriod == zeroDuration {
// Keep the same as default NodeStatusUpdateFrequency
obj.CPUManagerReconcilePeriod = metav1.Duration{Duration: 10 * time.Second}
}
if obj.MemoryManagerPolicy == "" {
obj.MemoryManagerPolicy = kubeletconfigv1beta1.NoneMemoryManagerPolicy
}
if obj.TopologyManagerPolicy == "" {
obj.TopologyManagerPolicy = kubeletconfigv1beta1.NoneTopologyManagerPolicy
}
if obj.TopologyManagerScope == "" {
obj.TopologyManagerScope = kubeletconfigv1beta1.ContainerTopologyManagerScope
}
if obj.RuntimeRequestTimeout == zeroDuration {
obj.RuntimeRequestTimeout = metav1.Duration{Duration: 2 * time.Minute}
}
if obj.HairpinMode == "" {
obj.HairpinMode = kubeletconfigv1beta1.PromiscuousBridge
}
if obj.MaxPods == 0 {
obj.MaxPods = 110
}
// default nil or negative value to -1 (implies node allocatable pid limit)
if obj.PodPidsLimit == nil || *obj.PodPidsLimit < int64(0) {
obj.PodPidsLimit = utilpointer.Int64(-1)
}
if obj.ResolverConfig == nil {
obj.ResolverConfig = utilpointer.String(kubetypes.ResolvConfDefault)
}
if obj.CPUCFSQuota == nil {
obj.CPUCFSQuota = utilpointer.BoolPtr(true)
}
if obj.CPUCFSQuotaPeriod == nil {
obj.CPUCFSQuotaPeriod = &metav1.Duration{Duration: 100 * time.Millisecond}
}
if obj.NodeStatusMaxImages == nil {
obj.NodeStatusMaxImages = utilpointer.Int32Ptr(50)
}
if obj.MaxOpenFiles == 0 {
obj.MaxOpenFiles = 1000000
}
if obj.ContentType == "" {
obj.ContentType = "application/vnd.kubernetes.protobuf"
}
if obj.KubeAPIQPS == nil {
obj.KubeAPIQPS = utilpointer.Int32Ptr(5)
}
if obj.KubeAPIBurst == 0 {
obj.KubeAPIBurst = 10
}
if obj.SerializeImagePulls == nil {
obj.SerializeImagePulls = utilpointer.BoolPtr(true)
}
if obj.EvictionHard == nil {
obj.EvictionHard = DefaultEvictionHard
}
if obj.EvictionPressureTransitionPeriod == zeroDuration {
obj.EvictionPressureTransitionPeriod = metav1.Duration{Duration: 5 * time.Minute}
}
if obj.EnableControllerAttachDetach == nil {
obj.EnableControllerAttachDetach = utilpointer.BoolPtr(true)
}
if obj.MakeIPTablesUtilChains == nil {
obj.MakeIPTablesUtilChains = utilpointer.BoolPtr(true)
}
if obj.IPTablesMasqueradeBit == nil {
obj.IPTablesMasqueradeBit = utilpointer.Int32Ptr(DefaultIPTablesMasqueradeBit)
}
if obj.IPTablesDropBit == nil {
obj.IPTablesDropBit = utilpointer.Int32Ptr(DefaultIPTablesDropBit)
}
if obj.FailSwapOn == nil {
obj.FailSwapOn = utilpointer.BoolPtr(true)
}
if obj.ContainerLogMaxSize == "" {
obj.ContainerLogMaxSize = "10Mi"
}
if obj.ContainerLogMaxFiles == nil {
obj.ContainerLogMaxFiles = utilpointer.Int32Ptr(5)
}
if obj.ConfigMapAndSecretChangeDetectionStrategy == "" {
obj.ConfigMapAndSecretChangeDetectionStrategy = kubeletconfigv1beta1.WatchChangeDetectionStrategy
}
if obj.EnforceNodeAllocatable == nil {
obj.EnforceNodeAllocatable = DefaultNodeAllocatableEnforcement
}
if obj.VolumePluginDir == "" {
obj.VolumePluginDir = DefaultVolumePluginDir
}
// Use the Default LoggingConfiguration option
componentbaseconfigv1alpha1.RecommendedLoggingConfiguration(&obj.Logging)
if obj.EnableSystemLogHandler == nil {
obj.EnableSystemLogHandler = utilpointer.BoolPtr(true)
}
if obj.EnableProfilingHandler == nil {
obj.EnableProfilingHandler = utilpointer.BoolPtr(true)
}
if obj.EnableDebugFlagsHandler == nil {
obj.EnableDebugFlagsHandler = utilpointer.BoolPtr(true)
}
if obj.SeccompDefault == nil {
obj.SeccompDefault = utilpointer.BoolPtr(false)
}
if obj.MemoryThrottlingFactor == nil {
obj.MemoryThrottlingFactor = utilpointer.Float64Ptr(DefaultMemoryThrottlingFactor)
}
}
pkg/kubelet/apis/config/v1beta1/register.go中
func init() {
localSchemeBuilder.Register(addDefaultingFuncs)
}
pkg/kubelet/apis/config/scheme/scheme.go中
func NewSchemeAndCodecs(mutators ...serializer.CodecFactoryOptionsMutator) (*runtime.Scheme, *serializer.CodecFactory, error) {
scheme := runtime.NewScheme()
if err := kubeletconfig.AddToScheme(scheme); err != nil {
return nil, nil, err
}
if err := kubeletconfigv1beta1.AddToScheme(scheme); err != nil {
return nil, nil, err
}
codecs := serializer.NewCodecFactory(scheme, mutators...)
return scheme, &codecs, nil
}
cmd/kubelet/app/options/options.go中
func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfiguration) {
...
fs.DurationVar(&c.SyncFrequency.Duration, "sync-frequency", c.SyncFrequency.Duration, "Max period between synchronizing running containers and config")
...
}
func NewKubeletConfiguration() (*kubeletconfig.KubeletConfiguration, error) {
scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
versioned := &v1beta1.KubeletConfiguration{}
scheme.Default(versioned)
config := &kubeletconfig.KubeletConfiguration{}
if err := scheme.Convert(versioned, config, nil); err != nil {
return nil, err
}
applyLegacyDefaults(config)
return config, nil
}
cmd/kubelet/app/server.go中
func NewKubeletCommand() *cobra.Command {
...
kubeletConfig, err := options.NewKubeletConfiguration()
...
if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
...
}
...
options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
}
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
logOption := &logs.Options{Config: s.Logging}
logOption.Apply()
// To help debugging, immediately log version
klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
return fmt.Errorf("failed OS init: %w", err)
}
if err := run(ctx, s, kubeDeps, featureGate); err != nil {
return fmt.Errorf("failed to run Kubelet: %w", err)
}
return nil
}
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
...
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
...
}
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
kubeDeps,
&kubeServer.ContainerRuntimeOptions,
kubeServer.ContainerRuntime,
hostname,
hostnameOverridden,
nodeName,
nodeIPs,
kubeServer.ProviderID,
kubeServer.CloudProvider,
kubeServer.CertDirectory,
kubeServer.RootDirectory,
kubeServer.ImageCredentialProviderConfigFile,
kubeServer.ImageCredentialProviderBinDir,
kubeServer.RegisterNode,
kubeServer.RegisterWithTaints,
kubeServer.AllowedUnsafeSysctls,
kubeServer.ExperimentalMounterPath,
kubeServer.KernelMemcgNotification,
kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
kubeServer.MinimumGCAge,
kubeServer.MaxPerPodContainerCount,
kubeServer.MaxContainerCount,
kubeServer.MasterServiceNamespace,
kubeServer.RegisterSchedulable,
kubeServer.KeepTerminatedPodVolumes,
kubeServer.NodeLabels,
kubeServer.NodeStatusMaxImages,
kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault,
)
...
}
func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *kubelet.Dependencies,
crOptions *config.ContainerRuntimeOptions,
containerRuntime string,
hostname string,
hostnameOverridden bool,
nodeName types.NodeName,
nodeIPs []net.IP,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
imageCredentialProviderConfigFile string,
imageCredentialProviderBinDir string,
registerNode bool,
registerWithTaints []api.Taint,
allowedUnsafeSysctls []string,
experimentalMounterPath string,
kernelMemcgNotification bool,
experimentalCheckNodeCapabilitiesBeforeMount bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
masterServiceNamespace string,
registerSchedulable bool,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
nodeStatusMaxImages int32,
seccompDefault bool,
) (k kubelet.Bootstrap, err error) {
...
k, err = kubelet.NewMainKubelet(kubeCfg,
kubeDeps,
crOptions,
containerRuntime,
hostname,
hostnameOverridden,
nodeName,
nodeIPs,
providerID,
cloudProvider,
certDirectory,
rootDirectory,
imageCredentialProviderConfigFile,
imageCredentialProviderBinDir,
registerNode,
registerWithTaints,
allowedUnsafeSysctls,
experimentalMounterPath,
kernelMemcgNotification,
experimentalCheckNodeCapabilitiesBeforeMount,
experimentalNodeAllocatableIgnoreEvictionThreshold,
minimumGCAge,
maxPerPodContainerCount,
maxContainerCount,
masterServiceNamespace,
registerSchedulable,
keepTerminatedPodVolumes,
nodeLabels,
nodeStatusMaxImages,
seccompDefault,
)
...
}
pkg/kubelet/kubelet.go中
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
crOptions *config.ContainerRuntimeOptions,
containerRuntime string,
hostname string,
hostnameOverridden bool,
nodeName types.NodeName,
nodeIPs []net.IP,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
imageCredentialProviderConfigFile string,
imageCredentialProviderBinDir string,
registerNode bool,
registerWithTaints []api.Taint,
allowedUnsafeSysctls []string,
experimentalMounterPath string,
kernelMemcgNotification bool,
experimentalCheckNodeCapabilitiesBeforeMount bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
masterServiceNamespace string,
registerSchedulable bool,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
nodeStatusMaxImages int32,
seccompDefault bool,
) (*Kubelet, error) {
...
klet := &Kubelet{
...
resyncInterval: kubeCfg.SyncFrequency.Duration,
...
}
...
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
if err != nil {
return nil, err
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
...
klet.podWorkers = newPodWorkers(
klet.syncPod,
klet.syncTerminatingPod,
klet.syncTerminatedPod,
kubeDeps.Recorder,
klet.workQueue,
klet.resyncInterval,
backOffPeriod,
klet.podCache,
)
...
klet.volumeManager = volumemanager.NewVolumeManager(
kubeCfg.EnableControllerAttachDetach,
nodeName,
klet.podManager,
klet.podWorkers,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
kubeDeps.Mounter,
kubeDeps.HostUtil,
klet.getPodsDir(),
kubeDeps.Recorder,
experimentalCheckNodeCapabilitiesBeforeMount,
keepTerminatedPodVolumes,
volumepathhandler.NewBlockVolumePathHandler())
}
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
...
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
...
}
...
}
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
...
kl.syncLoop(updates, kl)
}
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
...
syncTicker := time.NewTicker(time.Second)
...
for {
...
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
...
}
}
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case <-syncCh:
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
handler.HandlePodSyncs(podsToSync)
}
}
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
}
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
func (kl *Kubelet) getPodsToSync() []*v1.Pod {
allPods := kl.podManager.GetPods()
podUIDs := kl.workQueue.GetWork()
podUIDSet := sets.NewString()
for _, podUID := range podUIDs {
podUIDSet.Insert(string(podUID))
}
var podsToSync []*v1.Pod
for _, pod := range allPods {
if podUIDSet.Has(string(pod.UID)) {
// The work of the pod is ready
podsToSync = append(podsToSync, pod)
continue
}
for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
if podSyncLoopHandler.ShouldSync(pod) {
podsToSync = append(podsToSync, pod)
break
}
}
}
return podsToSync
}
pkg/kubelet/active_deadline.go中
func newActiveDeadlineHandler(
podStatusProvider status.PodStatusProvider,
recorder record.EventRecorder,
clock clock.Clock,
) (*activeDeadlineHandler, error) {
// check for all required fields
if clock == nil || podStatusProvider == nil || recorder == nil {
return nil, fmt.Errorf("required arguments must not be nil: %v, %v, %v", clock, podStatusProvider, recorder)
}
return &activeDeadlineHandler{
clock: clock,
podStatusProvider: podStatusProvider,
recorder: recorder,
}, nil
}
func (m *activeDeadlineHandler) ShouldSync(pod *v1.Pod) bool {
return m.pastActiveDeadline(pod)
}
func (m *activeDeadlineHandler) pastActiveDeadline(pod *v1.Pod) bool {
// no active deadline was specified
if pod.Spec.ActiveDeadlineSeconds == nil {
return false
}
// get the latest status to determine if it was started
podStatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
podStatus = pod.Status
}
// we have no start time so just return
if podStatus.StartTime.IsZero() {
return false
}
// determine if the deadline was exceeded
start := podStatus.StartTime.Time
duration := m.clock.Since(start)
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
return duration >= allowedDuration
}
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
...
p.managePodLoop(podUpdates)
...
}
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
...
err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
...
p.completeWork(pod, err)
...
}
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {
...
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
...
}
pkg/kubelet/volumemanager/volume_manager.go中
const(
...
desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond
...
)
func NewVolumeManager(
controllerAttachDetachEnabled bool,
nodeName k8stypes.NodeName,
podManager pod.Manager,
podStateProvider podStateProvider,
kubeClient clientset.Interface,
volumePluginMgr *volume.VolumePluginMgr,
kubeContainerRuntime container.Runtime,
mounter mount.Interface,
hostutil hostutil.HostUtils,
kubeletPodsDir string,
recorder record.EventRecorder,
checkNodeCapabilitiesBeforeMount bool,
keepTerminatedPodVolumes bool,
blockVolumePathHandler volumepathhandler.BlockVolumePathHandler) VolumeManager {
...
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
kubeClient,
desiredStateOfWorldPopulatorLoopSleepPeriod,
desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
podManager,
podStateProvider,
vm.desiredStateOfWorld,
vm.actualStateOfWorld,
kubeContainerRuntime,
keepTerminatedPodVolumes,
csiMigratedPluginManager,
intreeToCSITranslator,
volumePluginMgr)
...
}
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
...
go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
...
}
func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
if pod == nil {
return nil
}
expectedVolumes := getExpectedVolumes(pod)
if len(expectedVolumes) == 0 {
// No volumes to verify
return nil
}
klog.V(3).InfoS("Waiting for volumes to attach and mount for pod", "pod", klog.KObj(pod))
uniquePodName := util.GetUniquePodName(pod)
// Some pods expect to have Setup called over and over again to update.
// Remount plugins for which this is true. (Atomically updating volumes,
// like Downward API, depend on this to update the contents of the volume).
vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
err := wait.PollImmediate(
podAttachAndMountRetryInterval,
podAttachAndMountTimeout,
vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
if err != nil {
unmountedVolumes :=
vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
// Also get unattached volumes for error message
unattachedVolumes :=
vm.getUnattachedVolumes(expectedVolumes)
if len(unmountedVolumes) == 0 {
return nil
}
return fmt.Errorf(
"unmounted volumes=%v, unattached volumes=%v: %s",
unmountedVolumes,
unattachedVolumes,
err)
}
klog.V(3).InfoS("All volumes are attached and mounted for pod", "pod", klog.KObj(pod))
return nil
}
func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
return func() (done bool, err error) {
if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
return true, errors.New(strings.Join(errs, "; "))
}
return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
}
}
func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
mountedVolumes := sets.NewString()
for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
}
return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
}
pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go中
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
...
wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
}
func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
...
dswp.findAndAddNewPods()
...
}
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
...
dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
...
}
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
pod *v1.Pod,
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
processedVolumesForFSResize sets.String) {
...
if dswp.podPreviouslyProcessed(uniquePodName) {
return
}
...
}
func (dswp *desiredStateOfWorldPopulator) podPreviouslyProcessed(
podName volumetypes.UniquePodName) bool {
dswp.pods.RLock()
defer dswp.pods.RUnlock()
return dswp.pods.processedPods[podName]
}
func (dswp *desiredStateOfWorldPopulator) markPodProcessingFailed(
podName volumetypes.UniquePodName) {
dswp.pods.Lock()
dswp.pods.processedPods[podName] = false
dswp.pods.Unlock()
}
func (dswp *desiredStateOfWorldPopulator) ReprocessPod(
podName volumetypes.UniquePodName) {
dswp.markPodProcessingFailed(podName)
}
网友评论