美文网首页
Deployment-Controller源码分析

Deployment-Controller源码分析

作者: periky | 来源:发表于2022-06-19 17:34 被阅读0次

    入口逻辑

    启动和初始化入口 startDeploymentController

    • cmd/kube-controller-manager/app/apps.go:22
    func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
        if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
            return nil, false, nil
        }
        // DeploymentController实例化
        dc, err := deployment.NewDeploymentController(
            // Deployment Informer
            ctx.InformerFactory.Apps().V1().Deployments(),
            // ReplicaSet Informer
            ctx.InformerFactory.Apps().V1().ReplicaSets(),
            // Pod Informer
            ctx.InformerFactory.Core().V1().Pods(),
            // k8s client
            ctx.ClientBuilder.ClientOrDie("deployment-controller"),
        )
        if err != nil {
            return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
        }
        go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
        return nil, true, nil
    }
    

    startDeploymentController()函数中通过NewDeploymentController()方法初始化DeploymentController实例,入参包含Deployment、ReplicaSet、Pod的Informer接口和K8sClient。

    DeploymentController对象

    DeploymentController类型定义

    • pkg/controller/deployment/deployment_controller.go:68
    // DeploymentController is responsible for synchronizing Deployment objects stored
    // in the system with actual running replica sets and pods.
    type DeploymentController struct {
        // rsControl is used for adopting/releasing replica sets.
        rsControl     controller.RSControlInterface
        client        clientset.Interface
        eventRecorder record.EventRecorder
    
        // To allow injection of syncDeployment for testing.
        syncHandler func(dKey string) error
        // used for unit testing
        enqueueDeployment func(deployment *apps.Deployment)
    
        // dLister can list/get deployments from the shared informer's store
        dLister appslisters.DeploymentLister
        // rsLister can list/get replica sets from the shared informer's store
        rsLister appslisters.ReplicaSetLister
        // podLister can list/get pods from the shared informer's store
        podLister corelisters.PodLister
    
        // dListerSynced returns true if the Deployment store has been synced at least once.
        // Added as a member to the struct to allow injection for testing.
        dListerSynced cache.InformerSynced
        // rsListerSynced returns true if the ReplicaSet store has been synced at least once.
        // Added as a member to the struct to allow injection for testing.
        rsListerSynced cache.InformerSynced
        // podListerSynced returns true if the pod store has been synced at least once.
        // Added as a member to the struct to allow injection for testing.
        podListerSynced cache.InformerSynced
    
        // Deployments that need to be synced
        queue workqueue.RateLimitingInterface
    }
    

    DeploymentController初始化

    • pkg/controller/deployment/deployment_controller.go:101
    // NewDeploymentController creates a new DeploymentController.
    func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
        // k8s事件
        eventBroadcaster := record.NewBroadcaster()
        eventBroadcaster.StartStructuredLogging(0)
        eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
    
        // 注册prometheus metric `deployment_controller_rate_limiter_use`
        if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
            if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
                return nil, err
            }
        }
        dc := &DeploymentController{
            client:        client,
            eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
            queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
        }
        
        // k8s client, replicaset操作
        dc.rsControl = controller.RealRSControl{
            KubeClient: client,
            Recorder:   dc.eventRecorder,
        }
    
        // deployment informer ResourceEventHandler
        dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    dc.addDeployment,     // create deployment的事件处理逻辑
            UpdateFunc: dc.updateDeployment,  // update deployment的事件处理逻辑
            // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
            DeleteFunc: dc.deleteDeployment,  // delete deployment的事件处理逻辑
        })
        // replicaset informer ResourceEventHandler
        rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    dc.addReplicaSet,
            UpdateFunc: dc.updateReplicaSet,
            DeleteFunc: dc.deleteReplicaSet,
        })
        // pod ResourceEventHandler
        podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            DeleteFunc: dc.deletePod,
        })
    
        // deployment处理逻辑,不支持同一个deployment资源的并发处理
        dc.syncHandler = dc.syncDeployment
        dc.enqueueDeployment = dc.enqueue
    
        // resource lister
        dc.dLister = dInformer.Lister()
        dc.rsLister = rsInformer.Lister()
        dc.podLister = podInformer.Lister()
        // 缓存同步完成标记
        dc.dListerSynced = dInformer.Informer().HasSynced
        dc.rsListerSynced = rsInformer.Informer().HasSynced
        dc.podListerSynced = podInformer.Informer().HasSynced
        return dc, nil
    }
    

    DeploymentController启动

    DeploymentController执行逻辑 -- Run()函数

    • pkg/controller/deployment/deployment_controller.go:149
    // Run begins watching and syncing.
    func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
        defer utilruntime.HandleCrash()
        defer dc.queue.ShutDown()
    
        klog.InfoS("Starting controller", "controller", "deployment")
        defer klog.InfoS("Shutting down controller", "controller", "deployment")
    
        // 等待资源[deployment,replicaset,pod]缓存同步完成
        if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
            return
        }
    
        for i := 0; i < workers; i++ {
            go wait.Until(dc.worker, time.Second, stopCh)
        }
    
        <-stopCh
    }
    
    • pkg/controller/deployment/deployment_controller.go:460
    // worker runs a worker thread that just dequeues items, processes them, and marks them done.
    // It enforces that the syncHandler is never invoked concurrently with the same key.
    func (dc *DeploymentController) worker() {
        for dc.processNextWorkItem() {
        }
    }
    
    • pkg/controller/deployment/deployment_controller.go:465
    func (dc *DeploymentController) processNextWorkItem() bool {
        key, quit := dc.queue.Get()
        if quit {
            return false
        }
        defer dc.queue.Done(key)
    
        // syncDeployment,同步当前资源的状态
        err := dc.syncHandler(key.(string))
        dc.handleErr(err, key)
    
        return true
    }
    
    • pkg/controller/deployment/deployment_controller.go:568
    // syncDeployment will sync the deployment with the given key.
    // This function is not meant to be invoked concurrently with the same key.
    func (dc *DeploymentController) syncDeployment(key string) error {
        // 获取当前资源的namespace,name
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
            klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
            return err
        }
    
        startTime := time.Now()
        klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
        defer func() {
            klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
        }()
    
        // 根据 namespace 和 name 从 cache 中检索对应 Deployment 对象
        deployment, err := dc.dLister.Deployments(namespace).Get(name)
        if errors.IsNotFound(err) {
            klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
            return nil
        }
        if err != nil {
            return err
        }
    
        // Deep-copy otherwise we are mutating our cache.
        // TODO: Deep-copy only when needed.
        d := deployment.DeepCopy()
    
        everything := metav1.LabelSelector{}
        // 空LabelSelector匹配当前namespace下所有pod, 发送一个Warning Event, 更新 .Status.ObservedGeneration 然后返回
        if reflect.DeepEqual(d.Spec.Selector, &everything) {
            dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
            if d.Status.ObservedGeneration < d.Generation {
                d.Status.ObservedGeneration = d.Generation
                dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
            }
            return nil
        }
    
        // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
        // through adoption/orphaning.
        // 获取当前Deployment拥有的所有ReplicaSet, 同时会更新这些ReplicaSet的ControllerRef
        rsList, err := dc.getReplicaSetsForDeployment(d)
        if err != nil {
            return err
        }
        // List all Pods owned by this Deployment, grouped by their ReplicaSet.
        // Current uses of the podMap are:
        //
        // * check if a Pod is labeled correctly with the pod-template-hash label.
        // * check that no old Pods are running in the middle of Recreate Deployments.
        // map[types.UID][]*v1.Pod 类型,key是rs的UID,value是对应rs管理的所有pod列表
        podMap, err := dc.getPodMapForDeployment(d, rsList)
        if err != nil {
            return err
        }
    
        // 已经标记删除的deployment,仅更新状态
        if d.DeletionTimestamp != nil {
            return dc.syncStatusOnly(d, rsList)
        }
    
        // Update deployment conditions with an Unknown condition when pausing/resuming
        // a deployment. In this way, we can be sure that we won't timeout when a user
        // resumes a Deployment with a set progressDeadlineSeconds.
        // 根据 .Spec.Pause配置看是否更新Deployment的conditions
        if err = dc.checkPausedConditions(d); err != nil {
            return err
        }
    
        // .spec.paused=true,更新所有replicas!=0的replicaset
        if d.Spec.Paused {
            return dc.sync(d, rsList)
        }
    
        // rollback is not re-entrant in case the underlying replica sets are updated with a new
        // revision so we should ensure that we won't proceed to update replica sets until we
        // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
        if getRollbackTo(d) != nil {
            // 老版本deployment版本回滚逻辑兼容
            return dc.rollback(d, rsList)
        }
    
        // 判断当前事件是否为副本数目调整事件
        scalingEvent, err := dc.isScalingEvent(d, rsList)
        if err != nil {
            return err
        }
        if scalingEvent {
            return dc.sync(d, rsList)
        }
    
        switch d.Spec.Strategy.Type {
        case apps.RecreateDeploymentStrategyType:
            // pod重建的发布策略
            return dc.rolloutRecreate(d, rsList, podMap)
        case apps.RollingUpdateDeploymentStrategyType:
            // pod滚动更新的发布策略
            return dc.rolloutRolling(d, rsList)
        }
        return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
    }
    

    相关文章

      网友评论

          本文标题:Deployment-Controller源码分析

          本文链接:https://www.haomeiwen.com/subject/hipjvrtx.html