美文网首页
K8s自定义controller-manager笔记

K8s自定义controller-manager笔记

作者: Teddy_b | 来源:发表于2023-06-27 11:10 被阅读0次

背景

kube-apiserver的主要职责是完成对etcd的增删改查,至于增删改查操作完成后,要达到什么效果,就依赖控制器controller-manager来完成

kube-controller-manager顾名思义是用来管理控制器的,它由一系列的控制器组成,由它统一的管理(启动、停止等)

那怎么自定义一个controller-manager呢?在这里做个学习笔记。

初始化

karmada-controller-manager的初始化流程为例,他在初始化的时候会创建一些runnable的分组,需要加入到controller-manager中的控制器都会加入到这个分组的runnable中去

LeaderElection这个分组为例,就是所有需要选主的控制器都应该加入到这个分组里,这样只有在controller-manager成为leader的时候,这个组内的控制器才会运行;

其它的三个分组就是不成为leader也需要运行的控制器

func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
    return &runnables{
        Webhooks:       newRunnableGroup(baseContext, errChan),
        Caches:         newRunnableGroup(baseContext, errChan),
        LeaderElection: newRunnableGroup(baseContext, errChan),
        Others:         newRunnableGroup(baseContext, errChan),
    }
}

func (r *runnables) Add(fn Runnable) error {
    switch runnable := fn.(type) {
    case hasCache:
        return r.Caches.Add(fn, func(ctx context.Context) bool {
            return runnable.GetCache().WaitForCacheSync(ctx)
        })
    case *webhook.Server:
        return r.Webhooks.Add(fn, nil)
    case LeaderElectionRunnable:
        if !runnable.NeedLeaderElection() {
            return r.Others.Add(fn, nil)
        }
        return r.LeaderElection.Add(fn, nil)
    default:
        return r.LeaderElection.Add(fn, nil)
    }
}

控制器加入到controller-manager

如何将一个控制器加入到controller-manager中,一般有两种形式,但这两种形式本质上都是同一种方式,只是编码的区别

方式一--直接加入

type Controller struct {
    client.Client      // used to operate Cluster resources.
    EventRecorder      record.EventRecorder
    ...
}


func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
    c.clusterHealthMap = newClusterHealthMap()
    return utilerrors.NewAggregate([]error{
        mgr.Add(c),
    })
}

这种方式下,直接通过controller-manager的Add方法添加一个Controller,我们直接定义一个Controller,并且实现Runnable接口,也就是实现它的Start()方法,这样再controller-manager启动的时候,就会调用这个ControllerStart()方法,进而完成你的控制逻辑

方式二--间接加入

type Controller struct {
    client.Client      // used to operate Cluster resources.
    EventRecorder      record.EventRecorder
    ...
}

func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
    c.clusterHealthMap = newClusterHealthMap()
    return utilerrors.NewAggregate([]error{
        controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).Complete(c)
    })
}

这种方式下,也是需要定义一个Controller,并且实现Reconciler接口,也就是实现它的Reconcile()方法,然后主要依赖k8s提供的依赖包完成控制器的加入,我们再具体看下它是怎么加入的

func (blder *Builder) Complete(r reconcile.Reconciler) error {
    _, err := blder.Build(r)
    return err
}

// Build builds the Application Controller and returns the Controller it created.
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
    ...
    // Set the ControllerManagedBy
    if err := blder.doController(r); err != nil {
        return nil, err
    }

    // Set the Watch
    if err := blder.doWatch(); err != nil {
        return nil, err
    }

    return blder.ctrl, nil
}

Complete()的时候,doController也会创建一个内部的Controller,然后doWatch完成对我们指定对象&clusterv1alpha1.Cluster{}的Informer操作,我们先看下doController方法

func New(name string, mgr manager.Manager, options Options) (Controller, error) {
    c, err := NewUnmanaged(name, mgr, options)
    ...

    // Add the controller as a Manager components
    return c, mgr.Add(c)
}

可以看到也是创建了一个Controller,然后调用controller-manager的Add方法添加这个Controller

再看下doWatch方法的实现逻辑

func (blder *Builder) doWatch() error {
    // Reconcile type
    if blder.forInput.object != nil {
        typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
        if err != nil {
            return err
        }
        src := &source.Kind{Type: typeForSrc}
        hdler := &handler.EnqueueRequestForObject{}
        allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
                // 调用刚刚创建的内部Controller的Watch方法
        if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
            return err
        }
    }

    ...
    return nil
}

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
    ...

    // 内部Controller还没启动的时候,只是将其添加到一个数组里
        // 再controller-manager启动的时候,会去启动每一个Controller, 然后在执行真正的Start操作
    if !c.Started {
        c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
        return nil
    }

    c.LogConstructor(nil).Info("Starting EventSource", "source", src)
    return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
    prct ...predicate.Predicate) error {
    ...
    go func() {
        var (
            i       cache.Informer
            lastErr error
        )

        // 为我们指定的对象&clusterv1alpha1.Cluster{}创建Informer
        if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
            // Lookup the Informer from the Cache and add an EventHandler which populates the Queue
            i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
            ...
            return true, nil
        });

                // 为这个Informer添加事件处理回调,在事件发生时进行回调
        _, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
        ...
        if !ks.cache.WaitForCacheSync(ctx) {
            // Would be great to return something more informative here
            ks.started <- errors.New("cache did not sync")
        }
        close(ks.started)
    }()

    return nil
}

总结一下这种方式的流程:

  • 通过Build创建一个内部的Controller,并将这个Controller加入到controller-manager
  • 由于此时controller-manager还没启动,因此会构建一个待watch的对象加入到数组中,等待controller-manager启动的时候去执行真正的watch
  • controller-manager在后面启动的时候,会创建对象&clusterv1alpha1.Cluster{}的Informer,并注册事件回调

因为往controller-manager里加入的是这个内部Controller,因此controller-manager启动时还是调用的这个ControllerStart()方法,我们再来看一下

func (c *Controller) Start(ctx context.Context) error {
    ...

    wg := &sync.WaitGroup{}
    err := func() error {
        defer c.mu.Unlock()

        // TODO(pwittrock): Reconsider HandleCrash
        defer utilruntime.HandleCrash()

        // controller-manager还没启动的时候,都是将对象加入到这个数组中
               // 现在启动了,因此需要调用每一个对象的Start()方法,也就是上面的Informer的创建和注册事件回调
        for _, watch := range c.startWatches {
            c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

            if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
                return err
            }
        }

        ...
        wg.Add(c.MaxConcurrentReconciles)
               // 启动多个携程来处理任务
        for i := 0; i < c.MaxConcurrentReconciles; i++ {
            go func() {
                defer wg.Done()
                // Run a worker thread that just dequeues items, processes them, and marks them done.
                // It enforces that the reconcileHandler is never invoked concurrently with the same object.
                for c.processNextWorkItem(ctx) {
                }
            }()
        }

        c.Started = true
        return nil
    }()
    ...
}

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    obj, shutdown := c.Queue.Get()
    ...
    defer c.Queue.Done(obj)
        ...

    c.reconcileHandler(ctx, obj)
    return true
}

func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
    ...

        // Do 记录的就是我们自定义的Controller对象,在这里回调了自定义Controller中的Reconcile()方法
    return c.Do.Reconcile(ctx, req)
}

启动流程在总结一下:

  • 创建我们指定对象&clusterv1alpha1.Cluster{}的Informer,并注册事件回调,等待Informer同步完成;下面是我们注册的Informer事件回调,它会在收到事件后往队列中存放
  func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
    ...
    q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
        Name:      evt.Object.GetName(),
        Namespace: evt.Object.GetNamespace(),
    }})
  }
  • 阻塞的从队列中获取元素,这里会一直阻塞,直到Informer那边往队列里写入元素
  • 内部Controller从队列中获取到元素后,回调自定义Controller的Reconcile()方法,进而实现调谐的过程

对比

两种加入Controller的方式对于开发者来说要做的事情都不多,更主要的问题在于我们的Start()方法 或者 Reconcile()怎么实现,来完成控制器逻辑

  • 第一种方式,由于是controller-manager直接调用的,因此入参只有一个context上下文,所以这里比较适合做一些定时任务,比如定时的检测集群的状态,然后给集群添加污点
func (c *Controller) Start(ctx context.Context) error {
    klog.Infof("Starting cluster health monitor")
    defer klog.Infof("Shutting cluster health monitor")

    // Incorporate the results of cluster health signal pushed from cluster-status-controller to master.
    go wait.UntilWithContext(ctx, func(ctx context.Context) {
        if err := c.monitorClusterHealth(ctx); err != nil {
            klog.Errorf("Error monitoring cluster health: %v", err)
        }
        // 默认值 5s,集群健康状态的检测周期
    }, c.ClusterMonitorPeriod)
    <-ctx.Done()

    return nil
}
  • 第二中方式,主要是通过Informer实现,在收到事件后就会回调Reconcile()方法,因此入参除了context上下文,还有watch事件里新产生的对象,所以这里比较适合做一些实时的控制,即发生某个事件后就需要进行相关的处理,比如集群创建后,就需要给这个集群创建一个命名空间,并且添加污点、添加Finalizers
func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
    klog.V(4).Infof("Reconciling cluster %s", req.NamespacedName.Name)

    cluster := &clusterv1alpha1.Cluster{}
    if err := c.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
        // The resource may no longer exist, in which case we stop processing.
        if apierrors.IsNotFound(err) {
            return controllerruntime.Result{}, nil
        }

        return controllerruntime.Result{Requeue: true}, err
    }

    // cluster对象存在 deletionTimestamp,表明需要删除
    if !cluster.DeletionTimestamp.IsZero() {
        return c.removeCluster(ctx, cluster)
    }

    return c.syncCluster(ctx, cluster)
}

func (c *Controller) syncCluster(ctx context.Context, cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) {
    // create execution space
    // 检查 karmada-es-member1 命名空间是否存在,不存在则创建
    err := c.createExecutionSpace(cluster)
    if err != nil {
        c.EventRecorder.Event(cluster, corev1.EventTypeWarning, events.EventReasonCreateExecutionSpaceFailed, err.Error())
        return controllerruntime.Result{Requeue: true}, err
    }

    // taint cluster by condition
    // 根据集群的Condition给Cluster添加污点
    err = c.taintClusterByCondition(ctx, cluster)
    if err != nil {
        c.EventRecorder.Event(cluster, corev1.EventTypeWarning, events.EventReasonTaintClusterByConditionFailed, err.Error())
        return controllerruntime.Result{Requeue: true}, err
    }

    // ensure finalizer
    // 给集群添加 karmada.io/cluster-controller 这个Finalizer
    return c.ensureFinalizer(cluster)
}

相关文章

网友评论

      本文标题:K8s自定义controller-manager笔记

      本文链接:https://www.haomeiwen.com/subject/pitnydtx.html