简单总结
kubelet针对增删改会执行syncPod,成功后会添加到workQueue即周期性的syncPod
针对周期性的volume reconcile
上一次sync pod结束后,podworker会把pod添加回workQueue,kubelet定期从workQueue中取出执行sync pod,标记processedPods为false
dswp定期检查pod的,如果processedPods为false,如果volume all mounted,则调用MarkRemountRequired,判断volume是否需要remount,如果是则设置pod需要remount
volumemanager的reconcile会定期检查PodExistsInVolume,如果mountedPods中存在且volumeMountStateForPod非VolumeMountUncertain且remountRequired为true,会出现remount err则执行mountAttachedVolumes,最终调用secret/configmap plugin的SetUpAt,其中回调用GetSecret/GetConfigMap
根据kubelet配置ConfigMapAndSecretChangeDetectionStrategy,有如下secret/configmap策略
针对Get,以secret举例
GetSecret每次都从apiserver取获取
UnregisterPod空操作
RegisterPod空操作
针对Cache,以secret举例
GetSecret如果cache没有达到ttl则从cache取,否则从apiserver取(ttl取自node注解node.alpha.kubernetes.io/ttl,默认1分钟)
UnregisterPod从object store减少引用计数,如果达到0则删除cache
RegisterPod添加cache或者添加引用计数
针对Watch,以secret举例
GetSecret从cache取,如果是immutable则停止reflector
UnregisterPod从object store减少引用计数,如果达到0则删除cache停止reflector
RegisterPod添加cache并启动reflector或者添加引用计数
所以减少kubelet的configmap/secret请求数的方式为
设置ConfigMapAndSecretChangeDetectionStrategy为Cache(需要关闭ttlController,kube-controller.yaml中controllers配置添加-ttl,如--controllers=-ttl,*)且node.alpha.kubernetes.io/ttl设置较长的时间比如30m
或设置ConfigMapAndSecretChangeDetectionStrategy为Watch且secret/configmap设置为immutable
相关代码
kubelet 在初始化时会初始化secretManager/configmapManager
根据kubelet配置ConfigMapAndSecretChangeDetectionStrategy,有如下secret/config策略
pkg/kubelet/kubelet.go中
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
...
if klet.kubeClient != nil {
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy:
secretManager = secret.NewWatchingSecretManager(klet.kubeClient, klet.resyncInterval)
configMapManager = configmap.NewWatchingConfigMapManager(klet.kubeClient, klet.resyncInterval)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
secretManager = secret.NewCachingSecretManager(
klet.kubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
configMapManager = configmap.NewCachingConfigMapManager(
klet.kubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
case kubeletconfiginternal.GetChangeDetectionStrategy:
secretManager = secret.NewSimpleSecretManager(klet.kubeClient)
configMapManager = configmap.NewSimpleConfigMapManager(klet.kubeClient)
default:
return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
}
klet.secretManager = secretManager
klet.configMapManager = configMapManager
}
...
}
func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error {
...
vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
...
}
kubelet处理pod事件如果是非删除pod,registerpod,如果是删除pod则会unregisterpod
pkg/kubelet/kubelet.go中
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
...
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
if kl.secretManager != nil {
kl.secretManager.RegisterPod(pod)
}
if kl.configMapManager != nil {
kl.configMapManager.RegisterPod(pod)
}
}
...
if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
if !wait.Interrupted(err) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
}
return false, err
}
...
}
pkg/kubelet/kubelet.go中
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
...
if kl.secretManager != nil {
kl.secretManager.UnregisterPod(pod)
}
if kl.configMapManager != nil {
kl.configMapManager.UnregisterPod(pod)
}
...
}
后续secret/configmap获取会从secretManager/configmapManager获取,如获取PullSecrets
pkg/kubelet/kubelet_pods.go中
func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {
...
secret, err := kl.secretManager.GetSecret(pod.Namespace, secretRef.Name)
if err != nil {
klog.InfoS("Unable to retrieve pull secret, the image pull may not succeed.", "pod", klog.KObj(pod), "secret", klog.KOb(secret), "err", err)
failedPullSecrets = append(failedPullSecrets, secretRef.Name)
continue
}
...
}
针对GetChangeDetectionStrategy,以secret举例
GetSecret每次都从apiserver取获取
UnregisterPod空操作
RegisterPod空操作
pkg/kubelet/secret/secret_manager.go中
func (s *simpleSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
return s.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}
针对TTLCacheChangeDetectionStrategy,以secret举例
GetSecret如果cache没有达到ttl则从cache取,否则从apiserver取(ttl取自node注解node.alpha.kubernetes.io/ttl,默认1分钟)
UnregisterPod从object store减少引用计数,如果达到0则删除cache
RegisterPod添加cache或者添加引用计数
pkg/kubelet/secret/secret_manager.go中
func (s *secretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
object, err := s.manager.GetObject(namespace, name)
if err != nil {
return nil, err
}
if secret, ok := object.(*v1.Secret); ok {
return secret, nil
}
return nil, fmt.Errorf("unexpected object type: %v", object)
}
func (s *secretManager) RegisterPod(pod *v1.Pod) {
s.manager.RegisterPod(pod)
}
func (s *secretManager) UnregisterPod(pod *v1.Pod) {
s.manager.UnregisterPod(pod)
}
pkg/kubelet/util/manager/cache_based_manager.go中
func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) {
return c.objectStore.Get(namespace, name)
}
func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
...
c.objectStore.AddReference(pod.Namespace, name, pod.UID)
...
}
func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
...
c.objectStore.DeleteReference(prev.Namespace, name, prev.UID)
...
}
pkg/kubelet/util/manager/cache_based_manager.go中
func (s *objectStore) AddReference(namespace, name string, _ types.UID) {
key := objectKey{namespace: namespace, name: name}
// AddReference is called from RegisterPod, thus it needs to be efficient.
// Thus Add() is only increasing refCount and generation of a given object.
// Then Get() is responsible for fetching if needed.
s.lock.Lock()
defer s.lock.Unlock()
item, exists := s.items[key]
if !exists {
item = &objectStoreItem{
refCount: 0,
data: &objectData{},
}
s.items[key] = item
}
item.refCount++
// This will trigger fetch on the next Get() operation.
item.data = nil
}
func (s *objectStore) DeleteReference(namespace, name string, _ types.UID) {
key := objectKey{namespace: namespace, name: name}
s.lock.Lock()
defer s.lock.Unlock()
if item, ok := s.items[key]; ok {
item.refCount--
if item.refCount == 0 {
delete(s.items, key)
}
}
}
func (s *objectStore) Get(namespace, name string) (runtime.Object, error) {
...
data := func() *objectData {
s.lock.Lock()
defer s.lock.Unlock()
item, exists := s.items[key]
if !exists {
return nil
}
if item.data == nil {
item.data = &objectData{}
}
return item.data
}()
...
if data.err != nil || !s.isObjectFresh(data) {
...
}
...
}
func (s *objectStore) isObjectFresh(data *objectData) bool {
objectTTL := s.defaultTTL
if ttl, ok := s.getTTL(); ok {
objectTTL = ttl
}
return s.clock.Now().Before(data.lastUpdateTime.Add(objectTTL))
}
针对WatchChangeDetectionStrategy
GetSecret从cache取,如果是immutable则停止reflector
UnregisterPod从object store减少引用计数,如果达到0则删除cache停止reflector
RegisterPod添加cache并启动reflector或者添加引用计数
pkg/kubelet/secret/secret_manager.go中
func (s *secretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
object, err := s.manager.GetObject(namespace, name)
if err != nil {
return nil, err
}
if secret, ok := object.(*v1.Secret); ok {
return secret, nil
}
return nil, fmt.Errorf("unexpected object type: %v", object)
}
func (s *secretManager) RegisterPod(pod *v1.Pod) {
s.manager.RegisterPod(pod)
}
func (s *secretManager) UnregisterPod(pod *v1.Pod) {
s.manager.UnregisterPod(pod)
}
pkg/kubelet/util/manager/cache_based_manager.go中
func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) {
return c.objectStore.Get(namespace, name)
}
func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
...
c.objectStore.AddReference(pod.Namespace, name, pod.UID)
...
}
func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
...
c.objectStore.DeleteReference(prev.Namespace, name, prev.UID)
...
}
func (c *objectCache) AddReference(namespace, name string, referencedFrom types.UID) {
key := objectKey{namespace: namespace, name: name}
c.lock.Lock()
defer c.lock.Unlock()
item, exists := c.items[key]
if !exists {
item = c.newReflectorLocked(namespace, name)
c.items[key] = item
}
item.refMap[referencedFrom]++
}
func (c *objectCache) DeleteReference(namespace, name string, referencedFrom types.UID) {
key := objectKey{namespace: namespace, name: name}
c.lock.Lock()
defer c.lock.Unlock()
if item, ok := c.items[key]; ok {
item.refMap[referencedFrom]--
if item.refMap[referencedFrom] == 0 {
delete(item.refMap, referencedFrom)
}
if len(item.refMap) == 0 {
// Stop the underlying reflector.
item.stop()
delete(c.items, key)
}
}
}
func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
c.lock.RLock()
item, exists := c.items[key]
c.lock.RUnlock()
...
if c.isImmutable(object) {
item.setImmutable()
if item.stop() {
klog.V(4).InfoS("Stopped watching for changes - object is immutable", "obj", klog.KRef(namespace, name))
}
}
...
}
针对周期性的volume reconcile
上一次sync pod结束后,podworker会把pod添加回workQueue,kubelet定期从workQueue中取出执行sync pod,标记processedPods为false
dswp定期检查pod的,如果processedPods为false,如果volume all mounted,则调用MarkRemountRequired,判断volume是否需要remount,如果是则设置pod需要remount
volumemanager的reconcile会定期检查PodExistsInVolume,出现remount err则执行mountAttachedVolumes,最终调用secret/configmap plugin的SetUpAt,其中回调用GetSecret/GetConfigMap
pkg/kubelet/volume_host.go
func NewInitializedVolumePluginMgr(
...
kvh := &kubeletVolumeHost{
kubelet: kubelet,
volumePluginMgr: volume.VolumePluginMgr{},
secretManager: secretManager,
configMapManager: configMapManager,
tokenManager: tokenManager,
informerFactory: informerFactory,
csiDriverLister: csiDriverLister,
csiDriversSynced: csiDriversSynced,
exec: utilexec.New(),
}
...
if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
return nil, fmt.Errorf(
"could not initialize volume plugins for KubeletVolumePluginMgr: %v",
err)
}
...
}
pkg/volume/plugins.go中
func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPluginProber, host VolumeHost) error {
...
for _, plugin := range plugins {
...
err := plugin.Init(host)
...
}
...
}
pkg/kubelet/volumemanager/volume_manager.go中
...
reconcilerLoopSleepPeriod = 100 * time.Millisecond
...
desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond
...
func NewVolumeManager(
...
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
kubeClient,
desiredStateOfWorldPopulatorLoopSleepPeriod,
podManager,
podStateProvider,
vm.desiredStateOfWorld,
vm.actualStateOfWorld,
kubeContainerRuntime,
keepTerminatedPodVolumes,
csiMigratedPluginManager,
intreeToCSITranslator,
volumePluginMgr)
...
vm.reconciler = reconciler.NewReconciler(
kubeClient,
controllerAttachDetachEnabled,
reconcilerLoopSleepPeriod,
waitForAttachTimeout,
nodeName,
vm.desiredStateOfWorld,
vm.actualStateOfWorld,
vm.desiredStateOfWorldPopulator.HasAddedPods,
vm.operationExecutor,
mounter,
hostutil,
volumePluginMgr,
kubeletPodsDir)
...
}
pkg/kubelet/volumemanager/reconciler/reconciler.go中
func (rc *reconciler) runOld(stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods.
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
klog.InfoS("Reconciler: start to sync state")
rc.sync()
}
}
}
func (rc *reconciler) reconcile() {
...
rc.mountOrAttachVolumes()
...
}
pkg/kubelet/volumemanager/reconciler/reconciler_common.go中
func NewReconciler(
kubeClient clientset.Interface,
controllerAttachDetachEnabled bool,
loopSleepDuration time.Duration,
waitForAttachTimeout time.Duration,
nodeName types.NodeName,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
populatorHasAddedPods func() bool,
operationExecutor operationexecutor.OperationExecutor,
mounter mount.Interface,
hostutil hostutil.HostUtils,
volumePluginMgr *volumepkg.VolumePluginMgr,
kubeletPodsDir string) Reconciler {
return &reconciler{
kubeClient: kubeClient,
controllerAttachDetachEnabled: controllerAttachDetachEnabled,
loopSleepDuration: loopSleepDuration,
waitForAttachTimeout: waitForAttachTimeout,
nodeName: nodeName,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
populatorHasAddedPods: populatorHasAddedPods,
operationExecutor: operationExecutor,
mounter: mounter,
hostutil: hostutil,
skippedDuringReconstruction: map[v1.UniqueVolumeName]*globalVolumeInfo{},
volumePluginMgr: volumePluginMgr,
kubeletPodsDir: kubeletPodsDir,
timeOfLastSync: time.Time{},
volumesFailedReconstruction: make([]podVolume, 0),
volumesNeedUpdateFromNodeStatus: make([]v1.UniqueVolumeName, 0),
volumesNeedReportedInUse: make([]v1.UniqueVolumeName, 0),
}
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
if utilfeature.DefaultFeatureGate.Enabled(features.NewVolumeManagerReconstruction) {
rc.runNew(stopCh)
return
}
rc.runOld(stopCh)
}
func (rc *reconciler) mountOrAttachVolumes() {
...
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName,volumeToMount.DesiredPersistentVolumeSize, volumeToMount.SELinuxLabel)
...
} else if !volMounted || cache.IsRemountRequiredError(err) {
rc.mountAttachedVolumes(volumeToMount, err)
}
func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) {
...
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
...
}
pkg/volume/util/operationexecutor/operation_executor.go中
func (oe *operationExecutor) MountVolume(
...
generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc
...
}
pkg/volume/util/operationexecutor/operation_generator.go中
func (og *operationGenerator) GenerateMountVolumeFunc(
...
volumeMounter, newMounterErr := volumePlugin.NewMounter
...
mountErr := volumeMounter.SetUp
...
}
pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go中
func NewDesiredStateOfWorldPopulator(
kubeClient clientset.Interface,
loopSleepDuration time.Duration,
podManager PodManager,
podStateProvider PodStateProvider,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
kubeContainerRuntime kubecontainer.Runtime,
keepTerminatedPodVolumes bool,
csiMigratedPluginManager csimigration.PluginManager,
intreeToCSITranslator csimigration.InTreeToCSITranslator,
volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorldPopulator {
return &desiredStateOfWorldPopulator{
kubeClient: kubeClient,
loopSleepDuration: loopSleepDuration,
podManager: podManager,
podStateProvider: podStateProvider,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
pods: processedPods{
processedPods: make(map[volumetypes.UniquePodName]bool)},
kubeContainerRuntime: kubeContainerRuntime,
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
hasAddedPods: false,
hasAddedPodsLock: sync.RWMutex{},
csiMigratedPluginManager: csiMigratedPluginManager,
intreeToCSITranslator: intreeToCSITranslator,
volumePluginMgr: volumePluginMgr,
}
}
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
klog.InfoS("Desired state populator starts to run")
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
done := sourcesReady.AllReady()
dswp.populatorLoop()
return done, nil
}, stopCh)
...
}
func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
dswp.findAndAddNewPods()
dswp.findAndRemoveDeletedPods()
}
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
...
for _, pod := range dswp.podManager.GetPods() {
...
dswp.processPodVolumes(pod, mountedVolumesForPod)
...
}
...
}
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
...
if dswp.podPreviouslyProcessed(uniquePodName) {
return
}
...
dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
...
}
pkg/kubelet/volumemanager/cache/actual_state_of_world.go中
func (asw *actualStateOfWorld) PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity, seLinuxLabel string) (bool, string, error) {
...
if podObj.remountRequired {
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
}
...
}
func (asw *actualStateOfWorld) MarkRemountRequired(
...
if volumePlugin.RequiresRemount(podObj.volumeSpec) {
podObj.remountRequired = true
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
}
...
}
pkg/volume/secret/secret.go中
func (plugin *secretPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return &secretVolumeMounter{
secretVolume: &secretVolume{
spec.Name(),
pod.UID,
plugin,
plugin.host.GetMounter(plugin.GetPluginName()),
volume.NewCachedMetrics(volume.NewMetricsDu(getPath(pod.UID, spec.Name(), plugin.host))),
},
source: *spec.Volume.Secret,
pod: *pod,
opts: &opts,
getSecret: plugin.getSecret,
}, nil
}
func (plugin *secretPlugin) Init(host volume.VolumeHost) error {
plugin.host = host
plugin.getSecret = host.GetSecretFunc()
return nil
}
func (plugin *secretPlugin) RequiresRemount(spec *volume.Spec) bool {
return true
}
func (b *secretVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *secretVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
...
secret, err := b.getSecret(b.pod.Namespace, b.source.SecretName)
}
网友评论