美文网首页
K8s自定义controller-manager笔记

K8s自定义controller-manager笔记

作者: Teddy_b | 来源:发表于2023-06-27 11:10 被阅读0次

    背景

    kube-apiserver的主要职责是完成对etcd的增删改查,至于增删改查操作完成后,要达到什么效果,就依赖控制器controller-manager来完成

    kube-controller-manager顾名思义是用来管理控制器的,它由一系列的控制器组成,由它统一的管理(启动、停止等)

    那怎么自定义一个controller-manager呢?在这里做个学习笔记。

    初始化

    karmada-controller-manager的初始化流程为例,他在初始化的时候会创建一些runnable的分组,需要加入到controller-manager中的控制器都会加入到这个分组的runnable中去

    LeaderElection这个分组为例,就是所有需要选主的控制器都应该加入到这个分组里,这样只有在controller-manager成为leader的时候,这个组内的控制器才会运行;

    其它的三个分组就是不成为leader也需要运行的控制器

    func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
        return &runnables{
            Webhooks:       newRunnableGroup(baseContext, errChan),
            Caches:         newRunnableGroup(baseContext, errChan),
            LeaderElection: newRunnableGroup(baseContext, errChan),
            Others:         newRunnableGroup(baseContext, errChan),
        }
    }
    
    func (r *runnables) Add(fn Runnable) error {
        switch runnable := fn.(type) {
        case hasCache:
            return r.Caches.Add(fn, func(ctx context.Context) bool {
                return runnable.GetCache().WaitForCacheSync(ctx)
            })
        case *webhook.Server:
            return r.Webhooks.Add(fn, nil)
        case LeaderElectionRunnable:
            if !runnable.NeedLeaderElection() {
                return r.Others.Add(fn, nil)
            }
            return r.LeaderElection.Add(fn, nil)
        default:
            return r.LeaderElection.Add(fn, nil)
        }
    }
    

    控制器加入到controller-manager

    如何将一个控制器加入到controller-manager中,一般有两种形式,但这两种形式本质上都是同一种方式,只是编码的区别

    方式一--直接加入

    type Controller struct {
        client.Client      // used to operate Cluster resources.
        EventRecorder      record.EventRecorder
        ...
    }
    
    
    func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
        c.clusterHealthMap = newClusterHealthMap()
        return utilerrors.NewAggregate([]error{
            mgr.Add(c),
        })
    }
    

    这种方式下,直接通过controller-manager的Add方法添加一个Controller,我们直接定义一个Controller,并且实现Runnable接口,也就是实现它的Start()方法,这样再controller-manager启动的时候,就会调用这个ControllerStart()方法,进而完成你的控制逻辑

    方式二--间接加入

    type Controller struct {
        client.Client      // used to operate Cluster resources.
        EventRecorder      record.EventRecorder
        ...
    }
    
    func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
        c.clusterHealthMap = newClusterHealthMap()
        return utilerrors.NewAggregate([]error{
            controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).Complete(c)
        })
    }
    
    

    这种方式下,也是需要定义一个Controller,并且实现Reconciler接口,也就是实现它的Reconcile()方法,然后主要依赖k8s提供的依赖包完成控制器的加入,我们再具体看下它是怎么加入的

    func (blder *Builder) Complete(r reconcile.Reconciler) error {
        _, err := blder.Build(r)
        return err
    }
    
    // Build builds the Application Controller and returns the Controller it created.
    func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
        ...
        // Set the ControllerManagedBy
        if err := blder.doController(r); err != nil {
            return nil, err
        }
    
        // Set the Watch
        if err := blder.doWatch(); err != nil {
            return nil, err
        }
    
        return blder.ctrl, nil
    }
    

    Complete()的时候,doController也会创建一个内部的Controller,然后doWatch完成对我们指定对象&clusterv1alpha1.Cluster{}的Informer操作,我们先看下doController方法

    func New(name string, mgr manager.Manager, options Options) (Controller, error) {
        c, err := NewUnmanaged(name, mgr, options)
        ...
    
        // Add the controller as a Manager components
        return c, mgr.Add(c)
    }
    

    可以看到也是创建了一个Controller,然后调用controller-manager的Add方法添加这个Controller

    再看下doWatch方法的实现逻辑

    func (blder *Builder) doWatch() error {
        // Reconcile type
        if blder.forInput.object != nil {
            typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
            if err != nil {
                return err
            }
            src := &source.Kind{Type: typeForSrc}
            hdler := &handler.EnqueueRequestForObject{}
            allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
                    // 调用刚刚创建的内部Controller的Watch方法
            if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
                return err
            }
        }
    
        ...
        return nil
    }
    
    func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
        ...
    
        // 内部Controller还没启动的时候,只是将其添加到一个数组里
            // 再controller-manager启动的时候,会去启动每一个Controller, 然后在执行真正的Start操作
        if !c.Started {
            c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
            return nil
        }
    
        c.LogConstructor(nil).Info("Starting EventSource", "source", src)
        return src.Start(c.ctx, evthdler, c.Queue, prct...)
    }
    
    func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
        prct ...predicate.Predicate) error {
        ...
        go func() {
            var (
                i       cache.Informer
                lastErr error
            )
    
            // 为我们指定的对象&clusterv1alpha1.Cluster{}创建Informer
            if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
                // Lookup the Informer from the Cache and add an EventHandler which populates the Queue
                i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
                ...
                return true, nil
            });
    
                    // 为这个Informer添加事件处理回调,在事件发生时进行回调
            _, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
            ...
            if !ks.cache.WaitForCacheSync(ctx) {
                // Would be great to return something more informative here
                ks.started <- errors.New("cache did not sync")
            }
            close(ks.started)
        }()
    
        return nil
    }
    

    总结一下这种方式的流程:

    • 通过Build创建一个内部的Controller,并将这个Controller加入到controller-manager
    • 由于此时controller-manager还没启动,因此会构建一个待watch的对象加入到数组中,等待controller-manager启动的时候去执行真正的watch
    • controller-manager在后面启动的时候,会创建对象&clusterv1alpha1.Cluster{}的Informer,并注册事件回调

    因为往controller-manager里加入的是这个内部Controller,因此controller-manager启动时还是调用的这个ControllerStart()方法,我们再来看一下

    func (c *Controller) Start(ctx context.Context) error {
        ...
    
        wg := &sync.WaitGroup{}
        err := func() error {
            defer c.mu.Unlock()
    
            // TODO(pwittrock): Reconsider HandleCrash
            defer utilruntime.HandleCrash()
    
            // controller-manager还没启动的时候,都是将对象加入到这个数组中
                   // 现在启动了,因此需要调用每一个对象的Start()方法,也就是上面的Informer的创建和注册事件回调
            for _, watch := range c.startWatches {
                c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
    
                if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
                    return err
                }
            }
    
            ...
            wg.Add(c.MaxConcurrentReconciles)
                   // 启动多个携程来处理任务
            for i := 0; i < c.MaxConcurrentReconciles; i++ {
                go func() {
                    defer wg.Done()
                    // Run a worker thread that just dequeues items, processes them, and marks them done.
                    // It enforces that the reconcileHandler is never invoked concurrently with the same object.
                    for c.processNextWorkItem(ctx) {
                    }
                }()
            }
    
            c.Started = true
            return nil
        }()
        ...
    }
    
    func (c *Controller) processNextWorkItem(ctx context.Context) bool {
        obj, shutdown := c.Queue.Get()
        ...
        defer c.Queue.Done(obj)
            ...
    
        c.reconcileHandler(ctx, obj)
        return true
    }
    
    func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
        ...
    
            // Do 记录的就是我们自定义的Controller对象,在这里回调了自定义Controller中的Reconcile()方法
        return c.Do.Reconcile(ctx, req)
    }
    

    启动流程在总结一下:

    • 创建我们指定对象&clusterv1alpha1.Cluster{}的Informer,并注册事件回调,等待Informer同步完成;下面是我们注册的Informer事件回调,它会在收到事件后往队列中存放
      func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
        ...
        q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
            Name:      evt.Object.GetName(),
            Namespace: evt.Object.GetNamespace(),
        }})
      }
    
    • 阻塞的从队列中获取元素,这里会一直阻塞,直到Informer那边往队列里写入元素
    • 内部Controller从队列中获取到元素后,回调自定义Controller的Reconcile()方法,进而实现调谐的过程

    对比

    两种加入Controller的方式对于开发者来说要做的事情都不多,更主要的问题在于我们的Start()方法 或者 Reconcile()怎么实现,来完成控制器逻辑

    • 第一种方式,由于是controller-manager直接调用的,因此入参只有一个context上下文,所以这里比较适合做一些定时任务,比如定时的检测集群的状态,然后给集群添加污点
    func (c *Controller) Start(ctx context.Context) error {
        klog.Infof("Starting cluster health monitor")
        defer klog.Infof("Shutting cluster health monitor")
    
        // Incorporate the results of cluster health signal pushed from cluster-status-controller to master.
        go wait.UntilWithContext(ctx, func(ctx context.Context) {
            if err := c.monitorClusterHealth(ctx); err != nil {
                klog.Errorf("Error monitoring cluster health: %v", err)
            }
            // 默认值 5s,集群健康状态的检测周期
        }, c.ClusterMonitorPeriod)
        <-ctx.Done()
    
        return nil
    }
    
    • 第二中方式,主要是通过Informer实现,在收到事件后就会回调Reconcile()方法,因此入参除了context上下文,还有watch事件里新产生的对象,所以这里比较适合做一些实时的控制,即发生某个事件后就需要进行相关的处理,比如集群创建后,就需要给这个集群创建一个命名空间,并且添加污点、添加Finalizers
    func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
        klog.V(4).Infof("Reconciling cluster %s", req.NamespacedName.Name)
    
        cluster := &clusterv1alpha1.Cluster{}
        if err := c.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
            // The resource may no longer exist, in which case we stop processing.
            if apierrors.IsNotFound(err) {
                return controllerruntime.Result{}, nil
            }
    
            return controllerruntime.Result{Requeue: true}, err
        }
    
        // cluster对象存在 deletionTimestamp,表明需要删除
        if !cluster.DeletionTimestamp.IsZero() {
            return c.removeCluster(ctx, cluster)
        }
    
        return c.syncCluster(ctx, cluster)
    }
    
    func (c *Controller) syncCluster(ctx context.Context, cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) {
        // create execution space
        // 检查 karmada-es-member1 命名空间是否存在,不存在则创建
        err := c.createExecutionSpace(cluster)
        if err != nil {
            c.EventRecorder.Event(cluster, corev1.EventTypeWarning, events.EventReasonCreateExecutionSpaceFailed, err.Error())
            return controllerruntime.Result{Requeue: true}, err
        }
    
        // taint cluster by condition
        // 根据集群的Condition给Cluster添加污点
        err = c.taintClusterByCondition(ctx, cluster)
        if err != nil {
            c.EventRecorder.Event(cluster, corev1.EventTypeWarning, events.EventReasonTaintClusterByConditionFailed, err.Error())
            return controllerruntime.Result{Requeue: true}, err
        }
    
        // ensure finalizer
        // 给集群添加 karmada.io/cluster-controller 这个Finalizer
        return c.ensureFinalizer(cluster)
    }
    

    相关文章

      网友评论

          本文标题:K8s自定义controller-manager笔记

          本文链接:https://www.haomeiwen.com/subject/pitnydtx.html