美文网首页
[k8s源码分析][controller-manager] ku

[k8s源码分析][controller-manager] ku

作者: nicktming | 来源:发表于2019-11-07 22:50 被阅读0次

    1. 前言

    转载请说明原文出处, 尊重他人劳动成果!

    源码位置: https://github.com/nicktming/kubernetes/tree/tming-v1.13/pkg/controller/replicaset
    分支: tming-v1.13 (基于v1.13版本)

    本文将分析controller-manager的启动过程.

    2. 启动

    kubernetes/cmd/kube-controller-manager/controller-manager.go中启动.

    // kubernetes/cmd/kube-controller-manager/controller-manager.go
    func main() {
        rand.Seed(time.Now().UnixNano())
        command := app.NewControllerManagerCommand()
        ...
        if err := command.Execute(); err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            os.Exit(1)
        }
    }
    

    进入到

    func NewControllerManagerCommand() *cobra.Command {
        s, err := options.NewKubeControllerManagerOptions()
        ...
        cmd := &cobra.Command{
            Use: "kube-controller-manager",
            ...
            Run: func(cmd *cobra.Command, args []string) {
                ...
                c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
                ...
                if err := Run(c.Complete(), wait.NeverStop); err != nil {
                    fmt.Fprintf(os.Stderr, "%v\n", err)
                    os.Exit(1)
                }
            },
        }
        ...
        return cmd
    }
    

    这里需要注意三个地方:
    1. s, err := options.NewKubeControllerManagerOptions()创建一个默认的KubeControllerManagerOptions对象s.
    2. c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())通过传入的参数将s变成一个Config对象.
    3. Run(c.Complete(), wait.NeverStop)启动程序.

    接下来将总这三个部分来继续分析.

    2.1 创建KubeControllerManagerOptions对象

    // cmd/kube-controller-manager/app/options/options.go
    func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
        componentConfig, err := NewDefaultComponentConfig(ports.InsecureKubeControllerManagerPort)
        if err != nil {
            return nil, err
        }
    ...
    }
    

    这里主要根据系统的一些默认值生成了一个KubeControllerManagerOptions对象.

    2.2 生成Config对象

    ===> kubernetes/cmd/kube-controller-manager/app/controllermanager.go
    func KnownControllers() []string {
        ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))
        ret.Insert(
            saTokenControllerName,
        )
        return ret.List()
    }
    func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
        controllers := map[string]InitFunc{}
        controllers["endpoint"] = startEndpointController
        ...
        controllers["deployment"] = startDeploymentController
        ...
        controllers["nodeipam"] = startNodeIpamController
        if loopMode == IncludeCloudLoops {
            controllers["service"] = startServiceController
            controllers["route"] = startRouteController
        }
        ...
        controllers["root-ca-cert-publisher"] = startRootCACertPublisher
        return controllers
    }
    
    var ControllersDisabledByDefault = sets.NewString(
        "bootstrapsigner",
        "tokencleaner",
    )
    

    可以看到KnownControllers是所有需要启动的controller的名字, 对应的方法就是启动其对应controller的实体

    func (s KubeControllerManagerOptions) Config(allControllers []string, disabledByDefaultControllers []string) (*kubecontrollerconfig.Config, error) {
        if err := s.Validate(allControllers, disabledByDefaultControllers); err != nil {
            return nil, err
        }
    
        if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
            return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
        }
        
        kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
        if err != nil {
            return nil, err
        }
        kubeconfig.ContentConfig.ContentType = s.Generic.ClientConnection.ContentType
        kubeconfig.QPS = s.Generic.ClientConnection.QPS
        kubeconfig.Burst = int(s.Generic.ClientConnection.Burst)
        
        // 生成一个与api-server打交道的client
        client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, KubeControllerManagerUserAgent))
        if err != nil {
            return nil, err
        }
    
        // shallow copy, do not modify the kubeconfig.Timeout.
        config := *kubeconfig
        config.Timeout = s.Generic.LeaderElection.RenewDeadline.Duration
        leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "leader-election"))
    
        eventRecorder := createRecorder(client, KubeControllerManagerUserAgent)
        
        c := &kubecontrollerconfig.Config{
            Client:               client,
            Kubeconfig:           kubeconfig,
            EventRecorder:        eventRecorder,
            LeaderElectionClient: leaderElectionClient,
        }
        if err := s.ApplyTo(c); err != nil {
            return nil, err
        }
        return c, nil
    }
    

    可以看到Config中生成四个变量:

    client 用于与api-server进行交流.
    kubeconfig 配置
    eventRecorder 记录
    LeaderElectionClient 高可用的时候用到

    2.3 Run

    ===>cmd/kube-controller-manager/app/controllermanager.go
    func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
        ...
        run := func(ctx context.Context) {
            ...
            if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
                if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
                    // 这个就是在启动的时候需要指定--service-account-private-key-file
                    klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
                }
                clientBuilder = controller.SAControllerClientBuilder{
                    ClientConfig:         restclient.AnonymousClientConfig(c.Kubeconfig),
                    CoreClient:           c.Client.CoreV1(),
                    AuthenticationClient: c.Client.AuthenticationV1(),
                    Namespace:            "kube-system",
                }
            } else {
                clientBuilder = rootClientBuilder
            }
            controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
            if err != nil {
                klog.Fatalf("error building controller context: %v", err)
            }
            saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
    
            // 启动所有controller
            if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
                klog.Fatalf("error starting controllers: %v", err)
            }
            // 启动所有注册的infromers
            controllerContext.InformerFactory.Start(controllerContext.Stop)
            close(controllerContext.InformersStarted)
    
            select {}
        }
    
        if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
            run(context.TODO())
            panic("unreachable")
        }
    
        id, err := os.Hostname()
        if err != nil {
            return err
        }
    
        // add a uniquifier so that two processes on the same host don't accidentally both become active
        id = id + "_" + string(uuid.NewUUID())
        rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
            "kube-system",
            "kube-controller-manager",
            c.LeaderElectionClient.CoreV1(),
            resourcelock.ResourceLockConfig{
                Identity:      id,
                EventRecorder: c.EventRecorder,
            })
        if err != nil {
            klog.Fatalf("error creating lock: %v", err)
        }
    
        // 高可用
        leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
            Lock:          rl,
            LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
            RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
            RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
            Callbacks: leaderelection.LeaderCallbacks{
                OnStartedLeading: run,
                OnStoppedLeading: func() {
                    klog.Fatalf("leaderelection lost")
                },
            },
            WatchDog: electionChecker,
            Name:     "kube-controller-manager",
        })
        panic("unreachable")
    }
    

    这里需要注意几点:

    1. 这个就是在启动的时候需要指定--service-account-private-key-file文件, 在 k8s源码编译以及二进制安装(用于源码开发调试版) 中已经遇到过了, 该问题会在别的文章分析.
    2. controllerContext属性中有InformerFactory, 并且所有的controller用的都是这个InformerFactory, 因此所有用到同一个informercontroller里面都是一样的, 比如所有用到podInformercontroller用的都是同一个对象.
    3. StartControllers方法中启动所有的controller, 每个controller都是以goroutine的方式启动.

    ===>cmd/kube-controller-manager/app/controllermanager.go
    func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
        ...
        for controllerName, initFn := range controllers {
            if !ctx.IsControllerEnabled(controllerName) {
                klog.Warningf("%q is disabled", controllerName)
                continue
            }
            ...
            // 都是以goroutine的方式启动
            debugHandler, started, err := initFn(ctx)
            ...
        }
        return nil
    }
    
    1. 高可用, 使用的其实也是endpoint资源类型, 这里不多说了, 可以参考 [k8s源码分析][client-go] k8s选举leaderelection (分布式资源锁实现)[k8s源码分析][kube-scheduler]scheduler之高可用及原理 .
    [root@master kubectl]# ./kubectl get endpoints -n kube-system
    NAME                      ENDPOINTS   AGE
    kube-controller-manager   <none>      22d
    kube-scheduler            <none>      22d
    [root@master kubectl]# ./kubectl get endpoints kube-controller-manager -o yaml -n kube-system
    apiVersion: v1
    kind: Endpoints
    metadata:
      annotations:
        control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"master_6efbea00-0168-11ea-b452-525400d54f7e","leaseDurationSeconds":15,"acquireTime":"2019-11-07T14:11:21Z","renewTime":"2019-11-07T14:12:01Z","leaderTransitions":15}'
      creationTimestamp: "2019-10-15T14:57:16Z"
      name: kube-controller-manager
      namespace: kube-system
      resourceVersion: "131420"
      selfLink: /api/v1/namespaces/kube-system/endpoints/kube-controller-manager
      uid: 13f00d33-ef5c-11e9-af01-525400d54f7e
    [root@master kubectl]# 
    

    3. 总结

    conclusion.png

    相关文章

      网友评论

          本文标题:[k8s源码分析][controller-manager] ku

          本文链接:https://www.haomeiwen.com/subject/gpqivctx.html