美文网首页
k8s 之 controller manager 源码简单分析

k8s 之 controller manager 源码简单分析

作者: wwq2020 | 来源:发表于2020-07-11 17:06 被阅读0次

简介

controller manager 负责监控 replication controllers,并且创建相应的 pods 来达到预期的状态.他使用 api 来监听新的 controllers 以及创建/删除 pods.

简单总结

controller manager 通过informer收集事件,然后controller manager 处理这些事件

相关源码

cmd/kube-controller-manager/controller-manager.go 中


func main() {
   ...
    command := app.NewControllerManagerCommand()
   ...
    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}

cmd/kube-controller-manager/app/controllermanager.go 中

func NewControllerManagerCommand() *cobra.Command {
   ...
    cmd := &cobra.Command{
        Use: "kube-controller-manager",
        Long: `The Kubernetes controller manager is a daemon that embeds
the core control loops shipped with Kubernetes. In applications of robotics and
automation, a control loop is a non-terminating loop that regulates the state of
the system. In Kubernetes, a controller is a control loop that watches the shared
state of the cluster through the apiserver and makes changes attempting to move the
current state towards the desired state. Examples of controllers that ship with
Kubernetes today are the replication controller, endpoints controller, namespace
controller, and serviceaccounts controller.`,
        PersistentPreRunE: func(*cobra.Command, []string) error {
            // silence client-go warnings.
            // kube-controller-manager generically watches APIs (including deprecated ones),
            // and CI ensures it works properly against matching kube-apiserver versions.
            restclient.SetDefaultWarningHandler(restclient.NoWarnings{})
            return nil
        },
        Run: func(cmd *cobra.Command, args []string) {
            verflag.PrintAndExitIfRequested()
            cliflag.PrintFlags(cmd.Flags())

            c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
            if err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }

            if err := Run(c.Complete(), wait.NeverStop); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
        Args: func(cmd *cobra.Command, args []string) error {
            for _, arg := range args {
                if len(arg) > 0 {
                    return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
                }
            }
            return nil
        },
    }
   ...
}

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
   ...
    run := func(ctx context.Context) {
        rootClientBuilder := controller.SimpleControllerClientBuilder{
            ClientConfig: c.Kubeconfig,
        }
        var clientBuilder controller.ControllerClientBuilder
        if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
            if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
                // It's possible another controller process is creating the tokens for us.
                // If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
                klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
            }

            if shouldTurnOnDynamicClient(c.Client) {
                klog.V(1).Infof("using dynamic client builder")
                //Dynamic builder will use TokenRequest feature and refresh service account token periodically
                clientBuilder = controller.NewDynamicClientBuilder(
                    restclient.AnonymousClientConfig(c.Kubeconfig),
                    c.Client.CoreV1(),
                    "kube-system")
            } else {
                klog.V(1).Infof("using legacy client builder")
                clientBuilder = controller.SAControllerClientBuilder{
                    ClientConfig:         restclient.AnonymousClientConfig(c.Kubeconfig),
                    CoreClient:           c.Client.CoreV1(),
                    AuthenticationClient: c.Client.AuthenticationV1(),
                    Namespace:            "kube-system",
                }
            }
        } else {
            clientBuilder = rootClientBuilder
        }
        controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
        if err != nil {
            klog.Fatalf("error building controller context: %v", err)
        }
        saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

        if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
            klog.Fatalf("error starting controllers: %v", err)
        }

        controllerContext.InformerFactory.Start(controllerContext.Stop)
        controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
        close(controllerContext.InformersStarted)

        select {}
    }
   ...

    leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
        Lock:          rl,
        LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
        RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
        RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                klog.Fatalf("leaderelection lost")
            },
        },
        WatchDog: electionChecker,
        Name:     "kube-controller-manager",
    })
   ...
}

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
    // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
    // If this fails, just return here and fail since other controllers won't be able to get credentials.
    if _, _, err := startSATokenController(ctx); err != nil {
        return err
    }

    // Initialize the cloud provider with a reference to the clientBuilder only after token controller
    // has started in case the cloud provider uses the client builder.
    if ctx.Cloud != nil {
        ctx.Cloud.Initialize(ctx.ClientBuilder, ctx.Stop)
    }

    for controllerName, initFn := range controllers {
        if !ctx.IsControllerEnabled(controllerName) {
            klog.Warningf("%q is disabled", controllerName)
            continue
        }

        time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

        klog.V(1).Infof("Starting %q", controllerName)
        debugHandler, started, err := initFn(ctx)
        if err != nil {
            klog.Errorf("Error starting %q", controllerName)
            return err
        }
        if !started {
            klog.Warningf("Skipping %q", controllerName)
            continue
        }
        if debugHandler != nil && unsecuredMux != nil {
            basePath := "/debug/controllers/" + controllerName
            unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
            unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
        }
        klog.Infof("Started %q", controllerName)
    }

    return nil
}

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
    // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
    // If this fails, just return here and fail since other controllers won't be able to get credentials.
    if _, _, err := startSATokenController(ctx); err != nil {
        return err
    }

    // Initialize the cloud provider with a reference to the clientBuilder only after token controller
    // has started in case the cloud provider uses the client builder.
    if ctx.Cloud != nil {
        ctx.Cloud.Initialize(ctx.ClientBuilder, ctx.Stop)
    }

    for controllerName, initFn := range controllers {
        if !ctx.IsControllerEnabled(controllerName) {
            klog.Warningf("%q is disabled", controllerName)
            continue
        }

        time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

        klog.V(1).Infof("Starting %q", controllerName)
        debugHandler, started, err := initFn(ctx)
        if err != nil {
            klog.Errorf("Error starting %q", controllerName)
            return err
        }
        if !started {
            klog.Warningf("Skipping %q", controllerName)
            continue
        }
        if debugHandler != nil && unsecuredMux != nil {
            basePath := "/debug/controllers/" + controllerName
            unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
            unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
        }
        klog.Infof("Started %q", controllerName)
    }

    return nil
}

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    controllers := map[string]InitFunc{}
    controllers["endpoint"] = startEndpointController
    controllers["endpointslice"] = startEndpointSliceController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    controllers["job"] = startJobController
    controllers["deployment"] = startDeploymentController
    controllers["replicaset"] = startReplicaSetController
    controllers["horizontalpodautoscaling"] = startHPAController
    controllers["disruption"] = startDisruptionController
    controllers["statefulset"] = startStatefulSetController
    controllers["cronjob"] = startCronJobController
    controllers["csrsigning"] = startCSRSigningController
    controllers["csrapproving"] = startCSRApprovingController
    controllers["csrcleaner"] = startCSRCleanerController
    controllers["ttl"] = startTTLController
    controllers["bootstrapsigner"] = startBootstrapSignerController
    controllers["tokencleaner"] = startTokenCleanerController
    controllers["nodeipam"] = startNodeIpamController
    controllers["nodelifecycle"] = startNodeLifecycleController
    if loopMode == IncludeCloudLoops {
        controllers["service"] = startServiceController
        controllers["route"] = startRouteController
        controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
        // TODO: volume controller into the IncludeCloudLoops only set.
    }
    controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
    controllers["attachdetach"] = startAttachDetachController
    controllers["persistentvolume-expander"] = startVolumeExpandController
    controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
    controllers["pvc-protection"] = startPVCProtectionController
    controllers["pv-protection"] = startPVProtectionController
    controllers["ttl-after-finished"] = startTTLAfterFinishedController
    controllers["root-ca-cert-publisher"] = startRootCACertPublisher

    return controllers
}

k8s.io/client-go/tools/leaderelection/leaderelection.go 中

func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
    le, err := NewLeaderElector(lec)
    if err != nil {
        panic(err)
    }
    if lec.WatchDog != nil {
        lec.WatchDog.SetLeaderElection(le)
    }
    le.Run(ctx)
}
func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
    if lec.LeaseDuration <= lec.RenewDeadline {
        return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
    }
    if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
        return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
    }
    if lec.LeaseDuration < 1 {
        return nil, fmt.Errorf("leaseDuration must be greater than zero")
    }
    if lec.RenewDeadline < 1 {
        return nil, fmt.Errorf("renewDeadline must be greater than zero")
    }
    if lec.RetryPeriod < 1 {
        return nil, fmt.Errorf("retryPeriod must be greater than zero")
    }
    if lec.Callbacks.OnStartedLeading == nil {
        return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
    }
    if lec.Callbacks.OnStoppedLeading == nil {
        return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
    }

    if lec.Lock == nil {
        return nil, fmt.Errorf("Lock must not be nil.")
    }
    le := LeaderElector{
        config:  lec,
        clock:   clock.RealClock{},
        metrics: globalMetricsFactory.newLeaderMetrics(),
    }
    le.metrics.leaderOff(le.config.Name)
    return &le, nil
}

func (le *LeaderElector) Run(ctx context.Context) {
    defer runtime.HandleCrash()
    defer func() {
        le.config.Callbacks.OnStoppedLeading()
    }()

    if !le.acquire(ctx) {
        return // ctx signalled done
    }
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    go le.config.Callbacks.OnStartedLeading(ctx)
    le.renew(ctx)
}

挑一个 deployment controller 看
cmd/kube-controller-manager/app/apps.go

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
        return nil, false, nil
    }
    dc, err := deployment.NewDeploymentController(
        ctx.InformerFactory.Apps().V1().Deployments(),
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("deployment-controller"),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
    }
    go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
    return nil, true, nil
}

k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go中

func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

    if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
        if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
            return nil, err
        }
    }
    dc := &DeploymentController{
        client:        client,
        eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
    }
    dc.rsControl = controller.RealRSControl{
        KubeClient: client,
        Recorder:   dc.eventRecorder,
    }

    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addDeployment,
        UpdateFunc: dc.updateDeployment,
        // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
        DeleteFunc: dc.deleteDeployment,
    })
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addReplicaSet,
        UpdateFunc: dc.updateReplicaSet,
        DeleteFunc: dc.deleteReplicaSet,
    })
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        DeleteFunc: dc.deletePod,
    })

    dc.syncHandler = dc.syncDeployment
    dc.enqueueDeployment = dc.enqueue

    dc.dLister = dInformer.Lister()
    dc.rsLister = rsInformer.Lister()
    dc.podLister = podInformer.Lister()
    dc.dListerSynced = dInformer.Informer().HasSynced
    dc.rsListerSynced = rsInformer.Informer().HasSynced
    dc.podListerSynced = podInformer.Informer().HasSynced
    return dc, nil
}

func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer dc.queue.ShutDown()

    klog.Infof("Starting deployment controller")
    defer klog.Infof("Shutting down deployment controller")

    if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(dc.worker, time.Second, stopCh)
    }

    <-stopCh
}

func (dc *DeploymentController) syncDeployment(key string) error {
    startTime := time.Now()
    klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
    defer func() {
        klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    deployment, err := dc.dLister.Deployments(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.V(2).Infof("Deployment %v has been deleted", key)
        return nil
    }
    if err != nil {
        return err
    }

    // Deep-copy otherwise we are mutating our cache.
    // TODO: Deep-copy only when needed.
    d := deployment.DeepCopy()

    everything := metav1.LabelSelector{}
    if reflect.DeepEqual(d.Spec.Selector, &everything) {
        dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
        if d.Status.ObservedGeneration < d.Generation {
            d.Status.ObservedGeneration = d.Generation
            dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
        }
        return nil
    }

    // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
    // through adoption/orphaning.
    rsList, err := dc.getReplicaSetsForDeployment(d)
    if err != nil {
        return err
    }
    // List all Pods owned by this Deployment, grouped by their ReplicaSet.
    // Current uses of the podMap are:
    //
    // * check if a Pod is labeled correctly with the pod-template-hash label.
    // * check that no old Pods are running in the middle of Recreate Deployments.
    podMap, err := dc.getPodMapForDeployment(d, rsList)
    if err != nil {
        return err
    }

    if d.DeletionTimestamp != nil {
        return dc.syncStatusOnly(d, rsList)
    }

    // Update deployment conditions with an Unknown condition when pausing/resuming
    // a deployment. In this way, we can be sure that we won't timeout when a user
    // resumes a Deployment with a set progressDeadlineSeconds.
    if err = dc.checkPausedConditions(d); err != nil {
        return err
    }

    if d.Spec.Paused {
        return dc.sync(d, rsList)
    }

    // rollback is not re-entrant in case the underlying replica sets are updated with a new
    // revision so we should ensure that we won't proceed to update replica sets until we
    // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
    if getRollbackTo(d) != nil {
        return dc.rollback(d, rsList)
    }

    scalingEvent, err := dc.isScalingEvent(d, rsList)
    if err != nil {
        return err
    }
    if scalingEvent {
        return dc.sync(d, rsList)
    }

    switch d.Spec.Strategy.Type {
    case apps.RecreateDeploymentStrategyType:
        return dc.rolloutRecreate(d, rsList, podMap)
    case apps.RollingUpdateDeploymentStrategyType:
        return dc.rolloutRolling(d, rsList)
    }
    return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

相关文章

网友评论

      本文标题:k8s 之 controller manager 源码简单分析

      本文链接:https://www.haomeiwen.com/subject/fxmicktx.html