美文网首页
Kubernetes 源码分析 -- controller-ma

Kubernetes 源码分析 -- controller-ma

作者: 何约什 | 来源:发表于2018-12-21 15:18 被阅读98次

    前面分析完了API Server的源码,API Server的代码量大,架构比价复杂,花了不少的时间,今天是2018.9.28,终于开始研究kube-controller-manager的源码,今天大致看了一下kube-controller-manager的程序运行逻辑,确实显得简单很多,大概与之前分析的nginx-ingress-controller的代码框架差不了太多,当然kube-controller-manager负责的资源会多了很多,整体上肯定会复杂很多。本章先从容易的入口,先来分析一下kube-controller-manager的启动流程。

    程序入口

    入口与APIServer一样,基于cobra框架,kube-controller-manager的结构如下:

    type CMServer struct {
        cmoptions.ControllerManagerServer
    }
    
    type ControllerManagerServer struct {
        componentconfig.KubeControllerManagerConfiguration
    
        Master     string
        Kubeconfig string
    }
    

    主要的配置在componentconfig.KubeControllerManagerConfiguration结构中,Master和Kubeconfig是可选项,它们用来指定kube-apiserver的信息,这样可以让controller-manager连接到kube-apiserver上,当没有配置这两项时,会采用InCluster模式来启动,这种情况下,我们可以让程序在k8s中作为pod启动,从环境变量中读取相关参数,处理的方法见:createcmd.BuildConfigFromFlags函数:
    1)KUBERNETES_SERVICE_HOST 例如:KUBERNETES_PORT=tcp://10.96.0.1:443
    2)/var/run/secrets/kubernetes.io/serviceaccount 目录下的token, ca.crt文件

    一般来说,kube-controller-manager我们都会直接指定Master或者Kubeconfig参数,可以让Master指向127.0.0.1:6443,这样让kube-controller-manager与kube-apiserver运行在一台机器上。

    • 指定master地址的启动脚本

    kube-controller-manager --master=127.0.0.1:8080 --root-ca-file=/data/herry2038/ssl/ca.pem --service-account-private-key-file=/data/herry2038/ssl/kubernetes-key.pem --leader-elect=true --logtostderr=false --log-dir=/data/herry2038/log/controller --v=2 --cluster-signing-cert-file=/data/herry2038/ssl/ca.pem --cluster-signing-key-file=/data/herry2038/ssl/ca-key.pem --feature-gates=RotateKubeletServerCertificate=true

    • 指定kubeconfig的启动脚本

    kube-controller-manager --use-service-account-credentials=true --kubeconfig=/etc/kubernetes/controller-manager.conf --service-account-private-key-file=/etc/kubernetes/pki/sa.key --cluster-signing-key-file=/etc/kubernetes/pki/ca.key --address=127.0.0.1 --leader-elect=true --controllers=*,bootstrapsigner,tokencleaner --root-ca-file=/etc/kubernetes/pki/ca.crt --cluster-signing-cert-file=/etc/kubernetes/pki/ca.crt --allocate-node-cidrs=true --cluster-cidr=10.244.0.0/16 --node-cidr-mask-size=24

    运行主流程

    运行主流程比较清晰,代码在k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go中,注意,kube-controller-manager实现了高可用功能,我们可以同时启动多个进程,它们会进行选举一个leader,由leader来执行相关的功能。当现有leader故障的情况会,选举出新的leader执行任务。

    func Run(s *options.CMServer) error {
        // To help debugging, immediately log version
        glog.Infof("Version: %+v", version.Get())
        if err := s.Validate(KnownControllers(), ControllersDisabledByDefault.List()); err != nil {
            return err
        }
        // 配置存放到全局配置中,configz实现了一个全局的配置功能,并且可以安装到HTTP服务中对外提供http访问配置的能力
        if c, err := configz.New("componentconfig"); err == nil {
            c.Set(s.KubeControllerManagerConfiguration)
        } else {
            glog.Errorf("unable to register configz: %s", err)
        }
        // 基于配置创建到API Server的连接以及Leader Election
        kubeClient, leaderElectionClient, kubeconfig, err := createClients(s)
        if err != nil {
            return err
        }
        // 启动HTTP服务
        if s.Port >= 0 {
            go startHTTP(s) // prof、configz、health、metrics
        }
        // 创建事件记录器,存储目标:API Server
        recorder := createRecorder(kubeClient)
    
        // 注意这里run是一个方法,主要目的是在leader election中,成为leader后才需要运行。follower不需要运行。
        run := func(stop <-chan struct{}) {
            // 这里用到了ControllerClientBuilder,它是用于创建到API Server的各种连接的构建器。
            // 由于在Controller中会有多种到API Server的连接,为了区分它们,就需要基于构建器来完成,
            // 在构建时会生成不同的Agent Name,从而能够根据Agent Name进行区分。
            rootClientBuilder := controller.SimpleControllerClientBuilder{
                ClientConfig: kubeconfig,
            }
            var clientBuilder controller.ControllerClientBuilder
            if s.UseServiceAccountCredentials {
                if len(s.ServiceAccountKeyFile) == 0 {
                    // It's possible another controller process is creating the tokens for us.
                    // If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
                    glog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
                }
                clientBuilder = controller.SAControllerClientBuilder{
                    ClientConfig:         restclient.AnonymousClientConfig(kubeconfig),
                    CoreClient:           kubeClient.CoreV1(),
                    AuthenticationClient: kubeClient.AuthenticationV1(),
                    Namespace:            "kube-system",
                }
            } else {
                clientBuilder = rootClientBuilder
            }
            // rootClientBuilder 用于sharedInformer与token控制器
            ctx, err := CreateControllerContext(s, rootClientBuilder, clientBuilder, stop)
            if err != nil {
                glog.Fatalf("error building controller context: %v", err)
            }
            //  service account token controller有点特殊,它必须要优先启动,为其他controller设置许可权,
            //  并且它不能使用普通的client builder,只能使用root client builder。
            //  它也不能包含在普通的初始化,必须优先启动。
            saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
    
            // 启动控制器
            if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
                glog.Fatalf("error starting controllers: %v", err)
            }
    
            // 启动Informer工厂实例,在这里会启动各种类型的通知实例,新的资源变更,会源源不断的作为事件传输过来。
            ctx.InformerFactory.Start(ctx.Stop)
            close(ctx.InformersStarted) // 这时候可以安全启动Informers,然而,Informers其实已经在InformerFactory中启动了。????
    
            select {}
        }
    
        if !s.LeaderElection.LeaderElect {
            run(wait.NeverStop)
            panic("unreachable")
        }
    
        id, err := os.Hostname()
        if err != nil {
            return err
        }
        // add a uniquifier so that two processes on the same host don't accidentally both become active
        id = id + "_" + string(uuid.NewUUID())
        // 创建资源锁,资源锁有两种:configmap和endpoints
        rl, err := resourcelock.New(s.LeaderElection.ResourceLock,
            "kube-system",
            "kube-controller-manager",
            leaderElectionClient.CoreV1(),
            resourcelock.ResourceLockConfig{
                Identity:      id,
                EventRecorder: recorder,
            })
        if err != nil {
            glog.Fatalf("error creating lock: %v", err)
        }
        // 启动leader election
        leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
            Lock:          rl,
            LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
            RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
            RetryPeriod:   s.LeaderElection.RetryPeriod.Duration,
            Callbacks: leaderelection.LeaderCallbacks{
                OnStartedLeading: run,  // 成为leader后运行
                OnStoppedLeading: func() {
                    glog.Fatalf("leaderelection lost")
                },
            },
        })
        panic("unreachable")
    }
    

    代码的逻辑比较清晰就不做过多的说明了,它主要的工作是启动各个控制器,其中ServiceAccount需要优先启动,包括对应的资源ServiceAccounts和Secrets的Informer也会优先启动,接着会陆续启动其他资源的控制器。

    ServiceAccount控制器

    ServiceAccountToken控制器是第一个启动的控制器,具体的结构名为:TokensController,该控制器有点特殊,它必须要优先启动,为其他controller设置许可权。并且它不能使用普通的client builder,只能使用root client builder,所以它也不能包含在普通的初始化,必须优先启动。

    ServiceAccountToken控制器的主要功能:
    捕捉两种资源:ServiceAccount和Secrets的变化事件,基于事件中的对象,进行ServiceAccount与Token的关联检查,进行不要的ServiceAccount与Secrets的关联处理和缓存中非法数据的删除操作。

    下面是启动ServiceAccountToken控制器的代码:

    func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (bool, error) {
        if !ctx.IsControllerEnabled(saTokenControllerName) {
            glog.Warningf("%q is disabled", saTokenControllerName)
            return false, nil
        }
    
        if len(ctx.Options.ServiceAccountKeyFile) == 0 {
            glog.Warningf("%q is disabled because there is no private key", saTokenControllerName)
            return false, nil
        }
        privateKey, err := certutil.PrivateKeyFromFile(ctx.Options.ServiceAccountKeyFile)
        if err != nil {
            return true, fmt.Errorf("error reading key for service account token controller: %v", err)
        }
    
        var rootCA []byte
        if ctx.Options.RootCAFile != "" {
            rootCA, err = ioutil.ReadFile(ctx.Options.RootCAFile)
            if err != nil {
                return true, fmt.Errorf("error reading root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
            }
            if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
                return true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
            }
        } else {
            rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData
        }
        // 这里创建了一个TokensController实例
        controller, err := serviceaccountcontroller.NewTokensController(
            ctx.InformerFactory.Core().V1().ServiceAccounts(),  // 注意InformerFactory仍然采用通用的client builder
            ctx.InformerFactory.Core().V1().Secrets(),
            c.rootClientBuilder.ClientOrDie("tokens-controller"),
            serviceaccountcontroller.TokensControllerOptions{
                TokenGenerator: serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, privateKey),
                RootCA:         rootCA,
            },
        )
        if err != nil {
            return true, fmt.Errorf("error creating Tokens controller: %v", err)
        }
        // 启动控制器
        go controller.Run(int(ctx.Options.ConcurrentSATokenSyncs), ctx.Stop)
    
        // start the first set of informers now so that other controllers can start
        // 优先启动ServiceAccounts和Secrets
        ctx.InformerFactory.Start(ctx.Stop)
    
        return true, nil
    }
    

    具体代码的注释基本已经说得比较清楚了,核心在于从InformerFoctory中创建ServiceAccounts和Secrets两个SharedIndexerInformer,侦听器负责把事件放到两个不同队列中。控制器的启动会按需启动多个工作协程去处理队列中的数据。
    因为有两种数据,所有两个工作队列:syncServiceAccountQueue和syncSecretQueue,负责处理的协程的函数分别是:
    TokensController.syncServiceAccount和TokensController.syncSecret。

    syncSecret的处理逻辑比较简单,它的功能主要有两个:

    • 对于不存在的secret,删除该secret到ServiceAccount的关联
    • 对于存在的secret,则确保该token(Secret)与ServiceAccount建立了关联,如果没有,则删除该Secret

    重点逻辑在于同步ServiceAccount的处理函数上:

    func (e *TokensController) syncServiceAccount() {
        key, quit := e.syncServiceAccountQueue.Get()
        if quit {
            return
        }
        defer e.syncServiceAccountQueue.Done(key)
    
        retry := false
        defer func() {
            e.retryOrForget(e.syncServiceAccountQueue, key, retry)
        }()
    
        saInfo, err := parseServiceAccountKey(key)
        if err != nil {
            glog.Error(err)
            return
        }
    
        sa, err := e.getServiceAccount(saInfo.namespace, saInfo.name, saInfo.uid, false)
        switch {
        case err != nil:
            glog.Error(err)
            retry = true
        case sa == nil:
            // service account no longer exists, so delete related tokens
            glog.V(4).Infof("syncServiceAccount(%s/%s), service account deleted, removing tokens", saInfo.namespace, saInfo.name)
            sa = &v1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: saInfo.namespace, Name: saInfo.name, UID: saInfo.uid}}
            retry, err = e.deleteTokens(sa)
            if err != nil {
                glog.Errorf("error deleting serviceaccount tokens for %s/%s: %v", saInfo.namespace, saInfo.name, err)
            }
        default:
            // ensure a token exists and is referenced by this service account
            retry, err = e.ensureReferencedToken(sa)
            if err != nil {
                glog.Errorf("error synchronizing serviceaccount %s/%s: %v", saInfo.namespace, saInfo.name, err)
            }
        }
    }
    

    主要逻辑也是当ServiceAccount不存在时删除所有关联的token(secrets),否则确保有必要的token与之关联。
    ensureReferencedToken完成了这个核心的功能,它首先确定是否有相应的secrets与之关联,如果有直接返回,否则会尝试创建一个新的Secrets来关联。注意:这个操作会最终作用到API Server中。

    其他资源控制器的启动

    所有控制器的启动都是由下面的代码来启动的:
    StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode))
    其中saTokenCotnrollerInitFunc特殊,所以专门提炼出来,优先执行,其他的控制器预计对应的初始化函数是有方法NewControllerInitializers来完成。

    func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
        controllers := map[string]InitFunc{}
        controllers["endpoint"] = startEndpointController
        controllers["replicationcontroller"] = startReplicationController
        controllers["podgc"] = startPodGCController
        controllers["resourcequota"] = startResourceQuotaController
        controllers["namespace"] = startNamespaceController
        controllers["serviceaccount"] = startServiceAccountController
        controllers["garbagecollector"] = startGarbageCollectorController
        controllers["daemonset"] = startDaemonSetController
        controllers["job"] = startJobController
        controllers["deployment"] = startDeploymentController
        controllers["replicaset"] = startReplicaSetController
        controllers["horizontalpodautoscaling"] = startHPAController
        controllers["disruption"] = startDisruptionController
        controllers["statefulset"] = startStatefulSetController
        controllers["cronjob"] = startCronJobController
        controllers["csrsigning"] = startCSRSigningController
        controllers["csrapproving"] = startCSRApprovingController
        controllers["csrcleaner"] = startCSRCleanerController
        controllers["ttl"] = startTTLController
        controllers["bootstrapsigner"] = startBootstrapSignerController
        controllers["tokencleaner"] = startTokenCleanerController
        if loopMode == IncludeCloudLoops {
            controllers["service"] = startServiceController
            controllers["nodeipam"] = startNodeIpamController
            controllers["route"] = startRouteController
            // TODO: volume controller into the IncludeCloudLoops only set.
            // TODO: Separate cluster in cloud check from node lifecycle controller.
        }
        controllers["nodelifecycle"] = startNodeLifecycleController
        controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
        controllers["attachdetach"] = startAttachDetachController
        controllers["persistentvolume-expander"] = startVolumeExpandController
        controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
        controllers["pvc-protection"] = startPVCProtectionController
        controllers["pv-protection"] = startPVProtectionController
    
        return controllers
    }
    

    这个方法生成了一个资源名到初始化函数的映射,这里要额外说明的是,对于loopMode==IncludeCloudLoops的循环模型,一般是我们指定CloudProvider=externel的情况下,都是处于这种循环模式,这种情况下,会额外启动service, nodeipam, route这三种资源的控制器,否则,将会在对应的cloud-controller-manager中去处理。

    经过上面的初始化后,在StartControllers函数中,会逐个调用启动控制器启动函数,完成各种资源的控制器的启动。

    func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
        // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
        // If this fails, just return here and fail since other controllers won't be able to get credentials.
        if _, err := startSATokenController(ctx); err != nil {
            return err
        }
    
        // Initialize the cloud provider with a reference to the clientBuilder only after token controller
        // has started in case the cloud provider uses the client builder.
        if ctx.Cloud != nil {
            ctx.Cloud.Initialize(ctx.ClientBuilder)
        }
            // 启动各种资源控制器
        for controllerName, initFn := range controllers {
            if !ctx.IsControllerEnabled(controllerName) {
                glog.Warningf("%q is disabled", controllerName)
                continue
            }
    
            time.Sleep(wait.Jitter(ctx.Options.ControllerStartInterval.Duration, ControllerStartJitter))
    
            glog.V(1).Infof("Starting %q", controllerName)
            started, err := initFn(ctx)
            if err != nil {
                glog.Errorf("Error starting %q", controllerName)
                return err
            }
            if !started {
                glog.Warningf("Skipping %q", controllerName)
                continue
            }
            glog.Infof("Started %q", controllerName)
        }
    
        return nil
    }
    

    各种资源的控制器的处理模式与ServiceAccount类似,这里就不再逐一分析了,下次碰到关键的资源类型再针对性的研究代码。

    相关文章

      网友评论

          本文标题:Kubernetes 源码分析 -- controller-ma

          本文链接:https://www.haomeiwen.com/subject/qyrmoftx.html