美文网首页
kubelet configmap/secret qps很高如何

kubelet configmap/secret qps很高如何

作者: wwq2020 | 来源:发表于2023-10-23 16:25 被阅读0次

简单总结

kubelet针对增删改会执行syncPod,成功后会添加到workQueue即周期性的syncPod

针对周期性的volume reconcile
上一次sync pod结束后,podworker会把pod添加回workQueue,kubelet定期从workQueue中取出执行sync pod,标记processedPods为false
dswp定期检查pod的,如果processedPods为false,如果volume all mounted,则调用MarkRemountRequired,判断volume是否需要remount,如果是则设置pod需要remount
volumemanager的reconcile会定期检查PodExistsInVolume,如果mountedPods中存在且volumeMountStateForPod非VolumeMountUncertain且remountRequired为true,会出现remount err则执行mountAttachedVolumes,最终调用secret/configmap plugin的SetUpAt,其中回调用GetSecret/GetConfigMap

根据kubelet配置ConfigMapAndSecretChangeDetectionStrategy,有如下secret/configmap策略

针对Get,以secret举例
GetSecret每次都从apiserver取获取
UnregisterPod空操作
RegisterPod空操作

针对Cache,以secret举例
GetSecret如果cache没有达到ttl则从cache取,否则从apiserver取(ttl取自node注解node.alpha.kubernetes.io/ttl,默认1分钟)
UnregisterPod从object store减少引用计数,如果达到0则删除cache
RegisterPod添加cache或者添加引用计数

针对Watch,以secret举例
GetSecret从cache取,如果是immutable则停止reflector
UnregisterPod从object store减少引用计数,如果达到0则删除cache停止reflector
RegisterPod添加cache并启动reflector或者添加引用计数

所以减少kubelet的configmap/secret请求数的方式为
设置ConfigMapAndSecretChangeDetectionStrategy为Cache(需要关闭ttlController,kube-controller.yaml中controllers配置添加-ttl,如--controllers=-ttl,*)且node.alpha.kubernetes.io/ttl设置较长的时间比如30m
或设置ConfigMapAndSecretChangeDetectionStrategy为Watch且secret/configmap设置为immutable

相关代码

kubelet 在初始化时会初始化secretManager/configmapManager
根据kubelet配置ConfigMapAndSecretChangeDetectionStrategy,有如下secret/config策略
pkg/kubelet/kubelet.go中

func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    ...
    if klet.kubeClient != nil {
        switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
        case kubeletconfiginternal.WatchChangeDetectionStrategy:
            secretManager = secret.NewWatchingSecretManager(klet.kubeClient, klet.resyncInterval)
            configMapManager = configmap.NewWatchingConfigMapManager(klet.kubeClient, klet.resyncInterval)
        case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
            secretManager = secret.NewCachingSecretManager(
                klet.kubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
            configMapManager = configmap.NewCachingConfigMapManager(
                klet.kubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
        case kubeletconfiginternal.GetChangeDetectionStrategy:
            secretManager = secret.NewSimpleSecretManager(klet.kubeClient)
            configMapManager = configmap.NewSimpleConfigMapManager(klet.kubeClient)
        default:
            return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
        }

        klet.secretManager = secretManager
        klet.configMapManager = configMapManager
    }
    ...
}
func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error {
    ...
    vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
    ...
}

kubelet处理pod事件如果是非删除pod,registerpod,如果是删除pod则会unregisterpod
pkg/kubelet/kubelet.go中

func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
    ...
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
        if kl.secretManager != nil {
            kl.secretManager.RegisterPod(pod)
        }
        if kl.configMapManager != nil {
            kl.configMapManager.RegisterPod(pod)
        }
    }
    ...
    if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
        if !wait.Interrupted(err) {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
            klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
        }
        return false, err
    }
    ...
}

pkg/kubelet/kubelet.go中

func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
    ...
    if kl.secretManager != nil {
        kl.secretManager.UnregisterPod(pod)
    }
    if kl.configMapManager != nil {
        kl.configMapManager.UnregisterPod(pod)
    }
    ...
}

后续secret/configmap获取会从secretManager/configmapManager获取,如获取PullSecrets
pkg/kubelet/kubelet_pods.go中

func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {
    ...
    secret, err := kl.secretManager.GetSecret(pod.Namespace, secretRef.Name)
    if err != nil {
        klog.InfoS("Unable to retrieve pull secret, the image pull may not succeed.", "pod", klog.KObj(pod), "secret", klog.KOb(secret), "err", err)
        failedPullSecrets = append(failedPullSecrets, secretRef.Name)
        continue
    }
    ...
}

针对GetChangeDetectionStrategy,以secret举例
GetSecret每次都从apiserver取获取
UnregisterPod空操作
RegisterPod空操作
pkg/kubelet/secret/secret_manager.go中

func (s *simpleSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
    return s.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}

针对TTLCacheChangeDetectionStrategy,以secret举例
GetSecret如果cache没有达到ttl则从cache取,否则从apiserver取(ttl取自node注解node.alpha.kubernetes.io/ttl,默认1分钟)
UnregisterPod从object store减少引用计数,如果达到0则删除cache
RegisterPod添加cache或者添加引用计数
pkg/kubelet/secret/secret_manager.go中

func (s *secretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
    object, err := s.manager.GetObject(namespace, name)
    if err != nil {
        return nil, err
    }
    if secret, ok := object.(*v1.Secret); ok {
        return secret, nil
    }
    return nil, fmt.Errorf("unexpected object type: %v", object)
}

func (s *secretManager) RegisterPod(pod *v1.Pod) {
    s.manager.RegisterPod(pod)
}

func (s *secretManager) UnregisterPod(pod *v1.Pod) {
    s.manager.UnregisterPod(pod)
}

pkg/kubelet/util/manager/cache_based_manager.go中

func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) {
    return c.objectStore.Get(namespace, name)
}

func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
    ...
    c.objectStore.AddReference(pod.Namespace, name, pod.UID)
    ...
}

func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
    ...
    c.objectStore.DeleteReference(prev.Namespace, name, prev.UID)
    ...
}

pkg/kubelet/util/manager/cache_based_manager.go中

func (s *objectStore) AddReference(namespace, name string, _ types.UID) {
    key := objectKey{namespace: namespace, name: name}

    // AddReference is called from RegisterPod, thus it needs to be efficient.
    // Thus Add() is only increasing refCount and generation of a given object.
    // Then Get() is responsible for fetching if needed.
    s.lock.Lock()
    defer s.lock.Unlock()
    item, exists := s.items[key]
    if !exists {
        item = &objectStoreItem{
            refCount: 0,
            data:     &objectData{},
        }
        s.items[key] = item
    }

    item.refCount++
    // This will trigger fetch on the next Get() operation.
    item.data = nil
}

func (s *objectStore) DeleteReference(namespace, name string, _ types.UID) {
    key := objectKey{namespace: namespace, name: name}

    s.lock.Lock()
    defer s.lock.Unlock()
    if item, ok := s.items[key]; ok {
        item.refCount--
        if item.refCount == 0 {
            delete(s.items, key)
        }
    }
}

func (s *objectStore) Get(namespace, name string) (runtime.Object, error) {
    ...
    data := func() *objectData {
        s.lock.Lock()
        defer s.lock.Unlock()
        item, exists := s.items[key]
        if !exists {
            return nil
        }
        if item.data == nil {
            item.data = &objectData{}
        }
        return item.data
    }()
    ...
    if data.err != nil || !s.isObjectFresh(data) {
        ...
    }
    ...
}

func (s *objectStore) isObjectFresh(data *objectData) bool {
    objectTTL := s.defaultTTL
    if ttl, ok := s.getTTL(); ok {
        objectTTL = ttl
    }
    return s.clock.Now().Before(data.lastUpdateTime.Add(objectTTL))
}

针对WatchChangeDetectionStrategy
GetSecret从cache取,如果是immutable则停止reflector
UnregisterPod从object store减少引用计数,如果达到0则删除cache停止reflector
RegisterPod添加cache并启动reflector或者添加引用计数
pkg/kubelet/secret/secret_manager.go中

func (s *secretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
    object, err := s.manager.GetObject(namespace, name)
    if err != nil {
        return nil, err
    }
    if secret, ok := object.(*v1.Secret); ok {
        return secret, nil
    }
    return nil, fmt.Errorf("unexpected object type: %v", object)
}

func (s *secretManager) RegisterPod(pod *v1.Pod) {
    s.manager.RegisterPod(pod)
}

func (s *secretManager) UnregisterPod(pod *v1.Pod) {
    s.manager.UnregisterPod(pod)
}

pkg/kubelet/util/manager/cache_based_manager.go中

func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) {
    return c.objectStore.Get(namespace, name)
}

func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
    ...
    c.objectStore.AddReference(pod.Namespace, name, pod.UID)
    ...
}

func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
    ...
    c.objectStore.DeleteReference(prev.Namespace, name, prev.UID)
    ...
}
func (c *objectCache) AddReference(namespace, name string, referencedFrom types.UID) {
    key := objectKey{namespace: namespace, name: name}


    c.lock.Lock()
    defer c.lock.Unlock()
    item, exists := c.items[key]
    if !exists {
        item = c.newReflectorLocked(namespace, name)
        c.items[key] = item
    }
    item.refMap[referencedFrom]++
}

func (c *objectCache) DeleteReference(namespace, name string, referencedFrom types.UID) {
    key := objectKey{namespace: namespace, name: name}

    c.lock.Lock()
    defer c.lock.Unlock()
    if item, ok := c.items[key]; ok {
        item.refMap[referencedFrom]--
        if item.refMap[referencedFrom] == 0 {
            delete(item.refMap, referencedFrom)
        }
        if len(item.refMap) == 0 {
            // Stop the underlying reflector.
            item.stop()
            delete(c.items, key)
        }
    }
}


func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
    c.lock.RLock()
    item, exists := c.items[key]
    c.lock.RUnlock()
    ...
    if c.isImmutable(object) {
        item.setImmutable()
        if item.stop() {
            klog.V(4).InfoS("Stopped watching for changes - object is immutable", "obj", klog.KRef(namespace, name))
        }
    }
    ...
}

针对周期性的volume reconcile
上一次sync pod结束后,podworker会把pod添加回workQueue,kubelet定期从workQueue中取出执行sync pod,标记processedPods为false
dswp定期检查pod的,如果processedPods为false,如果volume all mounted,则调用MarkRemountRequired,判断volume是否需要remount,如果是则设置pod需要remount
volumemanager的reconcile会定期检查PodExistsInVolume,出现remount err则执行mountAttachedVolumes,最终调用secret/configmap plugin的SetUpAt,其中回调用GetSecret/GetConfigMap

pkg/kubelet/volume_host.go

func NewInitializedVolumePluginMgr(
    ...
    kvh := &kubeletVolumeHost{
        kubelet:          kubelet,
        volumePluginMgr:  volume.VolumePluginMgr{},
        secretManager:    secretManager,
        configMapManager: configMapManager,
        tokenManager:     tokenManager,
        informerFactory:  informerFactory,
        csiDriverLister:  csiDriverLister,
        csiDriversSynced: csiDriversSynced,
        exec:             utilexec.New(),
    }
    ...
    if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
        return nil, fmt.Errorf(
            "could not initialize volume plugins for KubeletVolumePluginMgr: %v",
            err)
    }
    ...
}

pkg/volume/plugins.go中

func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPluginProber, host VolumeHost) error {
    ...
    for _, plugin := range plugins {
        ...
        err := plugin.Init(host)
        ...
    }
    ...
}

pkg/kubelet/volumemanager/volume_manager.go中

    ...
    reconcilerLoopSleepPeriod = 100 * time.Millisecond
    ...
    desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond
    ...

func NewVolumeManager(
    ...
    vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
        kubeClient,
        desiredStateOfWorldPopulatorLoopSleepPeriod,
        podManager,
        podStateProvider,
        vm.desiredStateOfWorld,
        vm.actualStateOfWorld,
        kubeContainerRuntime,
        keepTerminatedPodVolumes,
        csiMigratedPluginManager,
        intreeToCSITranslator,
        volumePluginMgr)
    ...
    vm.reconciler = reconciler.NewReconciler(
        kubeClient,
        controllerAttachDetachEnabled,
        reconcilerLoopSleepPeriod,
        waitForAttachTimeout,
        nodeName,
        vm.desiredStateOfWorld,
        vm.actualStateOfWorld,
        vm.desiredStateOfWorldPopulator.HasAddedPods,
        vm.operationExecutor,
        mounter,
        hostutil,
        volumePluginMgr,
        kubeletPodsDir)
    ...
}

pkg/kubelet/volumemanager/reconciler/reconciler.go中


func (rc *reconciler) runOld(stopCh <-chan struct{}) {
    wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
}

func (rc *reconciler) reconciliationLoopFunc() func() {
    return func() {
        rc.reconcile()

        // Sync the state with the reality once after all existing pods are added to the desired state from all sources.
        // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
        // desired state of world does not contain a complete list of pods.
        if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
            klog.InfoS("Reconciler: start to sync state")
            rc.sync()
        }
    }
}

func (rc *reconciler) reconcile() {
    ...
    rc.mountOrAttachVolumes()
    ...
}

pkg/kubelet/volumemanager/reconciler/reconciler_common.go中

func NewReconciler(
    kubeClient clientset.Interface,
    controllerAttachDetachEnabled bool,
    loopSleepDuration time.Duration,
    waitForAttachTimeout time.Duration,
    nodeName types.NodeName,
    desiredStateOfWorld cache.DesiredStateOfWorld,
    actualStateOfWorld cache.ActualStateOfWorld,
    populatorHasAddedPods func() bool,
    operationExecutor operationexecutor.OperationExecutor,
    mounter mount.Interface,
    hostutil hostutil.HostUtils,
    volumePluginMgr *volumepkg.VolumePluginMgr,
    kubeletPodsDir string) Reconciler {
    return &reconciler{
        kubeClient:                      kubeClient,
        controllerAttachDetachEnabled:   controllerAttachDetachEnabled,
        loopSleepDuration:               loopSleepDuration,
        waitForAttachTimeout:            waitForAttachTimeout,
        nodeName:                        nodeName,
        desiredStateOfWorld:             desiredStateOfWorld,
        actualStateOfWorld:              actualStateOfWorld,
        populatorHasAddedPods:           populatorHasAddedPods,
        operationExecutor:               operationExecutor,
        mounter:                         mounter,
        hostutil:                        hostutil,
        skippedDuringReconstruction:     map[v1.UniqueVolumeName]*globalVolumeInfo{},
        volumePluginMgr:                 volumePluginMgr,
        kubeletPodsDir:                  kubeletPodsDir,
        timeOfLastSync:                  time.Time{},
        volumesFailedReconstruction:     make([]podVolume, 0),
        volumesNeedUpdateFromNodeStatus: make([]v1.UniqueVolumeName, 0),
        volumesNeedReportedInUse:        make([]v1.UniqueVolumeName, 0),
    }
}

func (rc *reconciler) Run(stopCh <-chan struct{}) {
    if utilfeature.DefaultFeatureGate.Enabled(features.NewVolumeManagerReconstruction) {
        rc.runNew(stopCh)
        return
    }

    rc.runOld(stopCh)
}

func (rc *reconciler) mountOrAttachVolumes() {
    ...
    volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName,volumeToMount.DesiredPersistentVolumeSize, volumeToMount.SELinuxLabel)
    ...
    } else if !volMounted || cache.IsRemountRequiredError(err) {
        rc.mountAttachedVolumes(volumeToMount, err)
}

func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) {
    ...
    err := rc.operationExecutor.MountVolume(
        rc.waitForAttachTimeout,
        volumeToMount.VolumeToMount,
        rc.actualStateOfWorld,
        isRemount)
    ...
}

pkg/volume/util/operationexecutor/operation_executor.go中

func (oe *operationExecutor) MountVolume(
    ...
    generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc

    ...
}

pkg/volume/util/operationexecutor/operation_generator.go中

func (og *operationGenerator) GenerateMountVolumeFunc(
    ...
    volumeMounter, newMounterErr := volumePlugin.NewMounter
    ...
    mountErr := volumeMounter.SetUp
    ...
}

pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go中

func NewDesiredStateOfWorldPopulator(
    kubeClient clientset.Interface,
    loopSleepDuration time.Duration,
    podManager PodManager,
    podStateProvider PodStateProvider,
    desiredStateOfWorld cache.DesiredStateOfWorld,
    actualStateOfWorld cache.ActualStateOfWorld,
    kubeContainerRuntime kubecontainer.Runtime,
    keepTerminatedPodVolumes bool,
    csiMigratedPluginManager csimigration.PluginManager,
    intreeToCSITranslator csimigration.InTreeToCSITranslator,
    volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorldPopulator {
    return &desiredStateOfWorldPopulator{
        kubeClient:          kubeClient,
        loopSleepDuration:   loopSleepDuration,
        podManager:          podManager,
        podStateProvider:    podStateProvider,
        desiredStateOfWorld: desiredStateOfWorld,
        actualStateOfWorld:  actualStateOfWorld,
        pods: processedPods{
            processedPods: make(map[volumetypes.UniquePodName]bool)},
        kubeContainerRuntime:     kubeContainerRuntime,
        keepTerminatedPodVolumes: keepTerminatedPodVolumes,
        hasAddedPods:             false,
        hasAddedPodsLock:         sync.RWMutex{},
        csiMigratedPluginManager: csiMigratedPluginManager,
        intreeToCSITranslator:    intreeToCSITranslator,
        volumePluginMgr:          volumePluginMgr,
    }
}

func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    // Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
    klog.InfoS("Desired state populator starts to run")
    wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
        done := sourcesReady.AllReady()
        dswp.populatorLoop()
        return done, nil
    }, stopCh)
    ...
}

func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
    dswp.findAndAddNewPods()
    dswp.findAndRemoveDeletedPods()
}

func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
    ...
    for _, pod := range dswp.podManager.GetPods() {
    ...
        dswp.processPodVolumes(pod, mountedVolumesForPod)
    ...
    }
    ...
}

func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
    ...
    if dswp.podPreviouslyProcessed(uniquePodName) {
        return
    }
    ...
    dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
    ...
}

pkg/kubelet/volumemanager/cache/actual_state_of_world.go中

func (asw *actualStateOfWorld) PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity, seLinuxLabel string) (bool, string, error) {
    ...
    if podObj.remountRequired {
        return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
    }
    ...
}

func (asw *actualStateOfWorld) MarkRemountRequired(
    ...
    if volumePlugin.RequiresRemount(podObj.volumeSpec) {
        podObj.remountRequired = true
        asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
    }
    ...
}

pkg/volume/secret/secret.go中

func (plugin *secretPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
    return &secretVolumeMounter{
        secretVolume: &secretVolume{
            spec.Name(),
            pod.UID,
            plugin,
            plugin.host.GetMounter(plugin.GetPluginName()),
            volume.NewCachedMetrics(volume.NewMetricsDu(getPath(pod.UID, spec.Name(), plugin.host))),
        },
        source:    *spec.Volume.Secret,
        pod:       *pod,
        opts:      &opts,
        getSecret: plugin.getSecret,
    }, nil
}

func (plugin *secretPlugin) Init(host volume.VolumeHost) error {
    plugin.host = host
    plugin.getSecret = host.GetSecretFunc()
    return nil
}

func (plugin *secretPlugin) RequiresRemount(spec *volume.Spec) bool {
    return true
}


func (b *secretVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
    return b.SetUpAt(b.GetPath(), mounterArgs)
}

func (b *secretVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
    ...
    secret, err := b.getSecret(b.pod.Namespace, b.source.SecretName)
}

相关文章

网友评论

      本文标题:kubelet configmap/secret qps很高如何

      本文链接:https://www.haomeiwen.com/subject/zmljidtx.html