编写一个自定义K8s Controller

作者: packyzbq | 来源:发表于2020-03-06 16:23 被阅读0次

    在 K8s 中当我们需要监控某个资源的变化并作一系列操作时,使用 K8s 提供的 controller 机制来实现,同时 K8s 官方提供了一个通用库 client-go,通过它可以很容易实现自定义controller.

    Client-go & controller 架构

    在编写controller 之前,我们需要了解 client-go 对资源监控的整个架构和流程,并需要知道我们所需要自定义的是哪部分组件。

    在 client-go 中包含了编写自定义 controller 可以使用的各种机制,这些机制在库的k8s.io/client-go/tools k8s.io/client-go/util中定义。

    image

    如图所示,图中包含了 client-go 和 controller 整个交互流程。

    client-go

    在client-go 中主要包含了以下组件:

    • Reflector:通过调用 LISTWATCH 接口与 ApiServer 通信,监听特定资源(此处监听的资源需要我们指定),并把资源的更新动态存入 Delta FIFO 队列
    • Informer:从Delta FIFO队列拿出对象,完成此操作的函数是processLoop。
    • Indexer: 提供线程级别安全来存储对象和key。

    custom controller

    • Informer reference: Informer对象引用
    • Indexer reference: Indexer对象引用
    • Resource Event Handlers: 被Informer调用的回调函数,这些函数的作用通常是获取对象的key,并把key放入Work queue,以进一步做处理。
    • Work queue: 工作队列,用于将对象的交付与其处理分离,编写Resource event handler functions以提取传递的对象的key并将其添加到工作队列。此处可以过滤掉我们不关心的信息。
    • Process Item: 用于处理Work queue中的对象,可以有一个或多个其他函数一起处理;这些函数通常使用Indexer reference或Listing wrapper来检索与该键对应的对象。这里就是我们需要自定义的业务逻辑

    Sample Controller

    这里编写一个简易的 Controller, 用于监听 pod 创建、删除信息,并将信息打印出来。

    Controller 逻辑

    首先我们需要定义一个这样的 Controller 结构体

    type Controller struct {
        indexer  cache.Indexer  // Indexer 的引用
        queue    workqueue.RateLimitingInterface  //workqueue 的引用
        informer cache.Controller // Informer 的引用
    }
    

    定义 Controller 的工作流

    func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
        defer runtime.HandleCrash()
    
        defer c.queue.ShutDown()
    
        klog.Info("Starting pod controller")
    
        go c.informer.Run(stopCh)   // 启动 informer
    
        if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
            runtime.HandleError(fmt.Errorf("Time out waitng for caches to sync"))
            return
        }
    
        // 启动多个 worker 处理 workqueue 中的对象
        for i := 0; i < threadiness; i++ {
            go wait.Until(c.runWorker, time.Second, stopCh)
        }
    
        <-stopCh
        klog.Info("Stopping Pod controller")
    }
    

    具体处理 worker queue 中对象的流程

    func (c *Controller) runWorker() {
        // 启动无限循环,接收并处理消息
        for c.processNextItem() {
    
        }
    }
    // 从 workqueue 中获取对象,并打印信息。
    func (c *Controller) processNextItem() bool {
        key, shutdown := c.queue.Get()
        // 退出
        if shutdown {
            return false
        }
    
        // 标记此key已经处理
        defer c.queue.Done(key)
    
        // 将key对应的 object 的信息进行打印
        err := c.syncToStdout(key.(string))
    
        c.handleError(err, key)
        return true
    }
    
    // 获取 key 对应的 object,并打印相关信息
    func (c *Controller) syncToStdout(key string) error {
        obj, exists, err := c.indexer.GetByKey(key)
        if err != nil {
            klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
            return err
        }
        if !exists {
            fmt.Printf("Pod %s does not exist")
        } else {
            fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*core_v1.Pod).GetName())
        }
        return nil
    }
    

    Main 函数逻辑

    func main() {
        var kubeconfig string
        var master string
        // 从外部获取集群信息(kube.config)
        flag.StringVar(&kubeconfig, "kubeconfig", "", "kubeconfig file")
        // 获取集群master 的url
        flag.StringVar(&master, "master", "", "master url")
    
        // 读取构建 config
        config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
        if err != nil {
            klog.Fatal(err)
        }
    
        // 创建 k8s client
        clientset, err := kubernetes.NewForConfig(config)
        if err != nil {
            klog.Fatal(err)
        }
    
        // 指定 ListWatcher 在所有namespace下监听 pod 资源
        podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything())
    
        // 创建 workqueue
        queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    
        // 创建 indexer 和 informer
        indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
            // 当有 pod 创建时,根据 Delta queue 弹出的 object 生成对应的Key,并加入到 workqueue中。此处可以根据Object的一些属性,进行过滤
            AddFunc: func(obj interface{}) {
                key, err := cache.MetaNamespaceKeyFunc(new)
                if err == nil {
                    queue.Add(key)
                }
            },
            // pod 删除操作
            DeleteFunc: func(obj interface{}) {
                // DeletionHandlingMetaNamespaceKeyFunc 会在生成key 之前检查。因为资源删除后有可能会进行重建等操作,监听时错过了删除信息,从而导致该条记录是陈旧的。
                key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
                if err == nil {
                    queue.Add(key)
                }
            },
        }, cache.Indexers{})
    
        controller := pkg.NewController(queue, indexer, informer)
    
        stop := make(chan struct{})
    
        defer close(stop)
        // 启动 controller
        go controller.Run(1, stop)
    
        select {}
    
    }
    

    后记

    Controller 的整体流程综上,如果我们使用 CRD ,Controller 流程也不会变化很多,更多的是需要对CRD 的监控,并根据变化创建对应的 Deployment 或 StatefulSet。同时 CRD 也需要增加对应的 Validation 和 Spec 的解析。这一部分我们下一篇继续。

    To Be Continue...

    如果喜欢,请关注我的公众号,或者查看我的博客 http://packyzbq.gitee.io. 我会不定时的发送我自己的学习记录,大家互相学习交流哈~

    weixin

    相关文章

      网友评论

        本文标题:编写一个自定义K8s Controller

        本文链接:https://www.haomeiwen.com/subject/tqqwrhtx.html