美文网首页
k8s源码阅读(3) - kube-controller-man

k8s源码阅读(3) - kube-controller-man

作者: 拿着滋水枪的消防员 | 来源:发表于2022-02-14 19:13 被阅读0次

    启动流程

    文件cmd/kube-controller-manager/app/controllermanager.go

    入口方法

    • func NewControllerManagerCommand() *cobra.Command
    • func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error
    • func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks)
      获取锁,获取到了之后有callback,开始StartController
    • func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
      unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error
    
        var controllerChecks []healthz.HealthChecker
        // 逐个启动controller, 生成健康检查
        for controllerName, initFn := range controllers {
            if !controllerCtx.IsControllerEnabled(controllerName) {
                klog.Warningf("%q is disabled", controllerName)
                continue
            }
    
            time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
    
            klog.V(1).Infof("Starting %q", controllerName)
            ctrl, started, err := initFn(ctx, controllerCtx)
            if err != nil {
                klog.Errorf("Error starting %q", controllerName)
                return err
            }
            if !started {
                klog.Warningf("Skipping %q", controllerName)
                continue
            }
            check := controllerhealthz.NamedPingChecker(controllerName)
            if ctrl != nil {
                // check if the controller supports and requests a debugHandler
                // and it needs the unsecuredMux to mount the handler onto.
                if debuggable, ok := ctrl.(controller.Debuggable); ok && unsecuredMux != nil {
                    if debugHandler := debuggable.DebuggingHandler(); debugHandler != nil {
                        basePath := "/debug/controllers/" + controllerName
                        unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
                        unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
                    }
                }
                if healthCheckable, ok := ctrl.(controller.HealthCheckable); ok {
                    if realCheck := healthCheckable.HealthChecker(); realCheck != nil {
                        check = controllerhealthz.NamedHealthChecker(controllerName, realCheck)
                    }
                }
            }
            controllerChecks = append(controllerChecks, check)
    
            klog.Infof("Started %q", controllerName)
        }
    
        healthzHandler.AddHealthChecker(controllerChecks...)
    
    • func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
    // 这一段定义了加载启动的controller列表
        controllers := map[string]InitFunc{}
        controllers["endpoint"] = startEndpointController
        controllers["endpointslice"] = startEndpointSliceController
        controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
        controllers["replicationcontroller"] = startReplicationController
        controllers["podgc"] = startPodGCController
        controllers["resourcequota"] = startResourceQuotaController
        controllers["namespace"] = startNamespaceController
        controllers["serviceaccount"] = startServiceAccountController
        controllers["garbagecollector"] = startGarbageCollectorController
        controllers["daemonset"] = startDaemonSetController
        controllers["job"] = startJobController
        controllers["deployment"] = startDeploymentController
        controllers["replicaset"] = startReplicaSetController
        controllers["horizontalpodautoscaling"] = startHPAController
        controllers["disruption"] = startDisruptionController
        controllers["statefulset"] = startStatefulSetController
        controllers["cronjob"] = startCronJobController
        controllers["csrsigning"] = startCSRSigningController
        controllers["csrapproving"] = startCSRApprovingController
        controllers["csrcleaner"] = startCSRCleanerController
        controllers["ttl"] = startTTLController
        controllers["bootstrapsigner"] = startBootstrapSignerController
        controllers["tokencleaner"] = startTokenCleanerController
        controllers["nodeipam"] = startNodeIpamController
        controllers["nodelifecycle"] = startNodeLifecycleController
        if loopMode == IncludeCloudLoops {
            controllers["service"] = startServiceController
            controllers["route"] = startRouteController
            controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
            // TODO: volume controller into the IncludeCloudLoops only set.
        }
        controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
        controllers["attachdetach"] = startAttachDetachController
        controllers["persistentvolume-expander"] = startVolumeExpandController
        controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
        controllers["pvc-protection"] = startPVCProtectionController
        controllers["pv-protection"] = startPVProtectionController
        controllers["ttl-after-finished"] = startTTLAfterFinishedController
        controllers["root-ca-cert-publisher"] = startRootCACertPublisher
        controllers["ephemeral-volume"] = startEphemeralVolumeController
        if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
            utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
            controllers["storage-version-gc"] = startStorageVersionGCController
        }
    

    NodeLifecycle控制器

    文件cmd/kube-controller-manager/app/core.go

    启动控制器

    • func startNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error)

    文件pkg/controller/nodelifecycle/node_lifecycle_controller.go

    新建控制器

    • func NewNodeLifecycleController(
      ctx context.Context,
      leaseInformer coordinformers.LeaseInformer,
      podInformer coreinformers.PodInformer,
      nodeInformer coreinformers.NodeInformer,
      daemonSetInformer appsv1informers.DaemonSetInformer,
      kubeClient clientset.Interface,
      nodeMonitorPeriod time.Duration,
      nodeStartupGracePeriod time.Duration,
      nodeMonitorGracePeriod time.Duration,
      podEvictionTimeout time.Duration,
      evictionLimiterQPS float32,
      secondaryEvictionLimiterQPS float32,
      largeClusterThreshold int32,
      unhealthyZoneThreshold float32,
      runTaintManager bool,
      ) (*Controller, error)
    // 这里是初始化controller,增加一些client-go的消费事件以及出发函数、缓存等
    

    运行NodeLifecycleController

    • func (nc *Controller) Run(ctx context.Context)
    // 这里开始执行消费pod,node事件的处理协程
        ...
        if nc.runTaintManager {
            go nc.taintManager.Run(ctx)
        }
    
        // Close node update queue to cleanup go routine.
        defer nc.nodeUpdateQueue.ShutDown()
        defer nc.podUpdateQueue.ShutDown()
    
        // Start workers to reconcile labels and/or update NoSchedule taint for nodes.
        for i := 0; i < scheduler.UpdateWorkerSize; i++ {
            // Thanks to "workqueue", each worker just need to get item from queue, because
            // the item is flagged when got from queue: if new event come, the new item will
            // be re-queued until "Done", so no more than one worker handle the same item and
            // no event missed.
            go wait.UntilWithContext(ctx, nc.doNodeProcessingPassWorker, time.Second)
        }
    
        for i := 0; i < podUpdateWorkerSize; i++ {
            go wait.UntilWithContext(ctx, nc.doPodProcessingWorker, time.Second)
        }
    
        if nc.runTaintManager {
            // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
            // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
            go wait.UntilWithContext(ctx, nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod)
        } else {
            // Managing eviction of nodes:
            // When we delete pods off a node, if the node was not empty at the time we then
            // queue an eviction watcher. If we hit an error, retry deletion.
            go wait.UntilWithContext(ctx, nc.doEvictionPass, scheduler.NodeEvictionPeriod)
        }
    
        // Incorporate the results of node health signal pushed from kubelet to master.
        go wait.UntilWithContext(ctx, func(ctx context.Context) {
            if err := nc.monitorNodeHealth(ctx); err != nil {
                klog.Errorf("Error monitoring node health: %v", err)
            }
        }, nc.nodeMonitorPeriod)
    
        <-ctx.Done()
    
    • func (nc *Controller) doNodeProcessingPassWorker(ctx context.Context)
      node生命周期控制器的一个worker, 从nodeUpdateQueue里消费事件进行处理
      Tips: 队列workqueue经常使用AddRateLimited()这个方法,主要用处是将失败的任务重新放回队列并按照配置的重试次数进行重试
    1. doNoScheduleTaintingPass
    - 这里根据node的状态转换对应的污点,如果包含了不能调度的状态,则打上一个NoSchedule的污点,以下就是会被打禁止调度污点的状态匹配规则,匹配到了则打污点
        nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
            v1.NodeReady: {
                v1.ConditionFalse:   v1.TaintNodeNotReady,
                v1.ConditionUnknown: v1.TaintNodeUnreachable,
            },
            v1.NodeMemoryPressure: {
                v1.ConditionTrue: v1.TaintNodeMemoryPressure,
            },
            v1.NodeDiskPressure: {
                v1.ConditionTrue: v1.TaintNodeDiskPressure,
            },
            v1.NodeNetworkUnavailable: {
                v1.ConditionTrue: v1.TaintNodeNetworkUnavailable,
            },
            v1.NodePIDPressure: {
                v1.ConditionTrue: v1.TaintNodePIDPressure,
            },
        }
    
    - node.Spec.Unschedulable字段被改为true(cordon一个node)    也会打上一个污点
    - 对分析出应打的污点做一个分析出需要增加跟删除的污点,操作api进行更新
    2. reconcileNodeLabels 
    自动补os跟arch标签的,自动将kubernetes.io/os跟beta.kubernetes.io/os的标签互相补全,应该是为了兼容不同版本的kubelet还有调度器吧,并无其他作用.
    
    • func (nc *Controller) doPodProcessingWorker(ctx context.Context)
    • func (nc *Controller) processPod(ctx context.Context, podItem podUpdateItem)
      消费pod事件
    控制器的nodeHealthData属性里缓存了node的健康状态,结构体对应的nodeHealthMap
    - 消费pod事件,获取pod实例跟node的实例以及node的健康状态的deepcopy
    - 如果开启了taint manager则会根据node的状态去平滑的执行evictPod,平滑的删除pod信息
    - 根据node的状态,如果状态异常,将pod的状态标记为notready(更新pod的status),并产生event事件
    
    • func (nc *Controller) doNoExecuteTaintingPass(ctx context.Context)
      开启了taint manager
    详细逻辑自行阅读代码
    
    • func (nc *Controller) doEvictionPass(ctx context.Context)
      未开启taint manager
    详细逻辑自行阅读代码
    
    

    以上内容以node生命周期控制器为例,讲解了整个kube-controller-manager的启动流程,协助大家根据自己的具体需求,快速定位修改源码.
    文章未完结,看到哪补充到哪

    相关文章

      网友评论

          本文标题:k8s源码阅读(3) - kube-controller-man

          本文链接:https://www.haomeiwen.com/subject/hvoakrtx.html