美文网首页
从生成CRD到编写自定义控制器

从生成CRD到编写自定义控制器

作者: 程序员札记 | 来源:发表于2022-11-04 11:34 被阅读0次

    介绍

    我们可以使用code-generator 以及controller-tools来进行代码自动生成,通过代码自动生成可以帮我们自动生成 CRD 资源对象,以及客户端访问的 ClientSet、Informer、Lister 等工具包,接下来我们就来了解下如何编写一个自定义的控制器。

    CRD定义

    首先初始化项目:

    $ mkdir operator-crd && cd operator-crd
    $ go mod init operator-crd
    $ mkdir -p pkg/apis/example.com/v1
    
    

    在该文件夹下新建doc.go文件,内容如下所示:

    // +k8s:deepcopy-gen=package
    // +groupName=example.com
    
    package v1
    
    

    根据 CRD 的规范定义,这里我们定义的 group 为 example.com,版本为 v1,在顶部添加了一个代码自动生成的 deepcopy-gen 的 tag,为整个包中的类型生成深拷贝方法。

    然后就是非常重要的资源对象的结构体定义,新建 types.go 文件,types.go内容可以使用type-scaffpld自动生成,具体文件内容如下:

    package v1
    
    import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    
    // BarSpec defines the desired state of Bar
    type BarSpec struct {
       // INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster
       DeploymentName string `json:"deploymentName"`
       Image          string `json:"image"`
       Replicas       *int32 `json:"replicas"`
    }
    
    // BarStatus defines the observed state of Bar.
    // It should always be reconstructable from the state of the cluster and/or outside world.
    type BarStatus struct {
       // INSERT ADDITIONAL STATUS FIELDS -- observed state of cluster
    }
    
    // 下面这个一定不能少,少了的话不能生成 lister 和 informer
    // +genclient  
    // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
    
    // Bar is the Schema for the bars API
    // +k8s:openapi-gen=true
    type Bar struct {
       metav1.TypeMeta   `json:",inline"`
       metav1.ObjectMeta `json:"metadata,omitempty"`
    
       Spec   BarSpec   `json:"spec,omitempty"`
       Status BarStatus `json:"status,omitempty"`
    }
    
    // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
    
    // BarList contains a list of Bar
    type BarList struct {
       metav1.TypeMeta `json:",inline"`
       metav1.ListMeta `json:"metadata,omitempty"`
       Items           []Bar `json:"items"`
    }
    
    

    然后可以参考系统内置的资源对象,还需要提供 AddToScheme 与 Resource 两个变量供 client 注册,新建 register.go 文件,内容如下所示:

    package v1
    
    import (
       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
       "k8s.io/apimachinery/pkg/runtime"
       "k8s.io/apimachinery/pkg/runtime/schema"
    )
    
    // SchemeGroupVersion 注册自己的自定义资源
    var SchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1"}
    
    // Kind takes an unqualified kind and returns back a Group qualified GroupKind
    func Kind(kind string) schema.GroupKind {
       return SchemeGroupVersion.WithKind(kind).GroupKind()
    }
    
    // Resource takes an unqualified resource and returns a Group qualified GroupResource
    func Resource(resource string) schema.GroupResource {
       return SchemeGroupVersion.WithResource(resource).GroupResource()
    }
    
    var (
       // SchemeBuilder initializes a scheme builder
       SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
       // AddToScheme is a global function that registers this API group & version to a scheme
       AddToScheme = SchemeBuilder.AddToScheme
    )
    
    // Adds the list of known types to Scheme.
    func addKnownTypes(scheme *runtime.Scheme) error {
       // 添加 Bar 与 BarList这两个资源到 scheme
       scheme.AddKnownTypes(SchemeGroupVersion,
           &Bar{},
           &BarList{},
       )
       metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
       return nil
    }
    
    

    使用controller-gen生成crd:

    $ controller-gen crd paths=./... output:crd:dir=crd
    
    

    生成example.com_bars.yaml文件如下所示:

    ---
    apiVersion: apiextensions.k8s.io/v1
    kind: CustomResourceDefinition
    metadata:
     annotations:
       controller-gen.kubebuilder.io/version: (devel)
     creationTimestamp: null
     name: bars.example.com
    spec:
     group: example.com
     names:
       kind: Bar
       listKind: BarList
       plural: bars
       singular: bar
     scope: Namespaced
     versions:
     - name: v1
       schema:
         openAPIV3Schema:
           description: Bar is the Schema for the bars API
           properties:
             apiVersion:
               description: 'APIVersion defines the versioned schema of this representation
                 of an object. Servers should convert recognized schemas to the latest
                 internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
               type: string
             kind:
               description: 'Kind is a string value representing the REST resource this
                 object represents. Servers may infer this from the endpoint the client
                 submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
               type: string
             metadata:
               type: object
             spec:
               description: BarSpec defines the desired state of Bar
               properties:
                 deploymentName:
                   description: INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster
                   type: string
                 image:
                   type: string
                 replicas:
                   format: int32
                   type: integer
               required:
               - deploymentName
               - image
               - replicas
               type: object
             status:
               description: BarStatus defines the observed state of Bar. It should always
                 be reconstructable from the state of the cluster and/or outside world.
               type: object
           type: object
       served: true
       storage: true
    
    

    最终项目结构如下所示:

    $ tree
    .
    ├── crd
    │   └── example.com_bars.yaml
    ├── go.mod
    ├── go.sum
    └── pkg
       └── apis
           └── example.com
               └── v1
                   ├── doc.go
                   ├── register.go
                   └── types.go
    
    5 directories, 6 files
    
    

    生成客户端相关代码

    上面我们准备好资源的 API 资源类型后,就可以使用开始生成 CRD 资源的客户端使用的相关代码了。

    首先创建生成代码的脚本,下面这些脚本均来源于 sample-controller 提供的示例:

    $ mkdir hack && cd hack
    
    

    在该目录下面新建 tools.go 文件,添加 code-generator 依赖,因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包。文件内容如下所示:

    // +build tools
    
    // 建立 tools.go 来依赖 code-generator
    // 因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包.
    package tools
    
    import _ "k8s.io/code-generator"
    
    

    然后新建 update-codegen.sh 脚本,用来配置代码生成的脚本:

    #!/usr/bin/env bash
    
    set -o errexit
    set -o nounset
    set -o pipefail
    
    SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
    
    CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
    
    bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
     operator-crd/pkg/client operator-crd/pkg/apis example.com:v1 \
     --output-base "${SCRIPT_ROOT}"/../ \
     --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
    
    # To use your own boilerplate text append:
    #   --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt
    
    

    同样还有 verify-codegen.sh 脚本,用来校验生成的代码是否是最新的:

    #!/usr/bin/env bash
    
    set -o errexit
    set -o nounset
    set -o pipefail
    
    SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
    
    DIFFROOT="${SCRIPT_ROOT}/pkg"
    TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg"
    _tmp="${SCRIPT_ROOT}/_tmp"
    
    cleanup() {
     rm -rf "${_tmp}"
    }
    trap "cleanup" EXIT SIGINT
    
    cleanup
    
    mkdir -p "${TMP_DIFFROOT}"
    cp -a "${DIFFROOT}"/* "${TMP_DIFFROOT}"
    
    "${SCRIPT_ROOT}/hack/update-codegen.sh"
    echo "diffing ${DIFFROOT} against freshly generated codegen"
    ret=0
    diff -Naupr "${DIFFROOT}" "${TMP_DIFFROOT}" || ret=$?
    cp -a "${TMP_DIFFROOT}"/* "${DIFFROOT}"
    if [[ $ret -eq 0 ]]
    then
     echo "${DIFFROOT} up to date."
    else
     echo "${DIFFROOT} is out of date. Please run hack/update-codegen.sh"
     exit 1
    fi
    
    

    还有一个为生成的代码文件添加头部内容的 boilerplate.go.txt 文件,内容如下所示,其实就是为每个生成的代码文件头部添加上下面的开源协议声明:

    /*
    Copyright The Kubernetes Authors.
    
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
    
       http://www.apache.org/licenses/LICENSE-2.0
    
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    */
    
    

    接下来我们就可以来执行代码生成的脚本了,首先将依赖包放置到 vendor 目录中去:

    $ go mod vendor
    
    

    然后执行脚本生成代码:

    $ chmod +x ./hack/update-codegen.sh
    $./hack/update-codegen.sh 
    Generating deepcopy funcs
    Generating clientset for example.com:v1 at operator-crd/pkg/client/clientset
    Generating listers for example.com:v1 at operator-crd/pkg/client/listers
    Generating informers for example.com:v1 at operator-crd/pkg/client/informers
    
    

    代码生成后,整个项目的 pkg 包变成了下面的样子:

    $  tree pkg                
    pkg
    ├── apis
    │   └── example.com
    │       └── v1
    │           ├── doc.go
    │           ├── register.go
    │           ├── types.go
    │           └── zz_generated.deepcopy.go
    └── client
       ├── clientset
       │   └── versioned
       │       ├── clientset.go
       │       ├── doc.go
       │       ├── fake
       │       │   ├── clientset_generated.go
       │       │   ├── doc.go
       │       │   └── register.go
       │       ├── scheme
       │       │   ├── doc.go
       │       │   └── register.go
       │       └── typed
       │           └── example.com
       │               └── v1
       │                   ├── bar.go
       │                   ├── doc.go
       │                   ├── example.com_client.go
       │                   ├── fake
       │                   │   ├── doc.go
       │                   │   ├── fake_bar.go
       │                   │   └── fake_example.com_client.go
       │                   └── generated_expansion.go
       ├── informers
       │   └── externalversions
       │       ├── example.com
       │       │   ├── interface.go
       │       │   └── v1
       │       │       ├── bar.go
       │       │       └── interface.go
       │       ├── factory.go
       │       ├── generic.go
       │       └── internalinterfaces
       │           └── factory_interfaces.go
       └── listers
           └── example.com
               └── v1
                   ├── bar.go
                   └── expansion_generated.go
    
    20 directories, 26 files
    
    

    仔细观察可以发现 pkg/apis/example.com/v1 目录下面多了一个zz_generated.deepcopy.go 文件,在 pkg/client 文件夹下生成了 clientset和 informers 和 listers 三个目录,有了这几个自动生成的客户端相关操作包,我们就可以去访问 CRD 资源了,可以和使用内置的资源对象一样去对 Bar 进行 List 和 Watch 操作了。

    编写控制器

    首先要先获取访问资源对象的 ClientSet,在项目根目录下面新建 main.go 文件。

    package main
    
    import (
       "k8s.io/client-go/tools/clientcmd"
           "k8s.io/klog/v2"
       clientset "operator-crd/pkg/client/clientset/versioned"
       "operator-crd/pkg/client/informers/externalversions"
       "time"
    
       "os"
       "os/signal"
       "syscall"
    )
    
    var (
       onlyOneSignalHandler = make(chan struct{})
       shutdownSignals      = []os.Signal{os.Interrupt, syscall.SIGTERM}
    )
    
    // 注册 SIGTERM 和 SIGINT 信号
    // 返回一个 stop channel, 该通道在捕获到第一个信号时被关闭
    // 如果捕获到第二个信号,程序直接退出
    func setupSignalHandler() (stopCh <-chan struct{}) {
       // 当调用两次的时候 panics
       close(onlyOneSignalHandler)
    
       stop := make(chan struct{})
       c := make(chan os.Signal, 2)
    
       // Notify 函数让 signal 包将输入信号转发到c
       // 如果没有列出要传递的信号,会将所有输入信号传递到 c; 否则只会传递列出的输入信号
       signal.Notify(c, shutdownSignals...)
    
       go func() {
           <-c
           close(stop)
           <-c
           os.Exit(1) // 第二个信号直接退出
       }()
    
       return stop
    }
    
    func main() {
       stopCh := setupSignalHandler()
    
       // 获取config
       config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
       if err != nil {
           klog.Fatalln(err)
       }
    
       // 通过config构建clientSet
       // 这里的clientSet 是 Bar 的
       clientSet, err := clientset.NewForConfig(config)
       if err != nil {
           klog.Fatalln(err)
       }
    
       // informerFactory 工厂类, 这里注入我们通过代码生成的 client
       // client 主要用于和 API Server 进行通信,实现 ListAndWatch
       factory := externalversions.NewSharedInformerFactory(clientSet, time.Second*30)
    
       // 实例化自定义控制器
       controller := NewController(factory.Example().V1().Bars())
    
       // 启动 informer,开始list 和 watch
       go factory.Start(stopCh)
    
       // 启动控制器
       if err = controller.Run(2, stopCh); err != nil {
           klog.Fatalf("Error running controller: %s", err.Error())
       }
    }
    
    

    首先初始化一个用于访问 Bar 资源的 ClientSet 对象,然后同样新建一个 Bar 的 InformerFactory 实例,通过这个工厂实例可以去启动 Informer 开始对 Bar 的 List 和 Watch 操作,然后同样我们要自己去封装一个自定义的控制器,在这个控制器里面去实现一个控制循环,不断对 Bar 的状态进行调谐。

    在项目根目录下新建 controller.go 文件,内容如下所示:

    package main
    
    import (
       "fmt"
       "k8s.io/apimachinery/pkg/api/errors"
       "k8s.io/apimachinery/pkg/util/runtime"
       "k8s.io/apimachinery/pkg/util/wait"
       "k8s.io/client-go/tools/cache"
       "k8s.io/client-go/util/workqueue"
       "k8s.io/klog/v2"
       v1 "operator-crd/pkg/apis/example.com/v1"
       "time"
    
       informers "operator-crd/pkg/client/informers/externalversions/example.com/v1"
    )
    
    type Controller struct {
       informer  informers.BarInformer
       workqueue workqueue.RateLimitingInterface
    }
    
    func NewController(informer informers.BarInformer) *Controller {
       controller := &Controller{
           informer: informer,
           // WorkQueue 的实现,负责同步 Informer 和控制循环之间的数据
           workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "bar"),
       }
    
       klog.Info("Setting up Bar event handlers")
    
       // informer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc)
       // 分别对应 API 对象的“添加”“更新”和“删除”事件。
       // 而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中
       informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
           AddFunc:    controller.addBar,
           UpdateFunc: controller.updateBar,
           DeleteFunc: controller.deleteBar,
       })
       return controller
    }
    
    func (c *Controller) Run(thread int, stopCh <-chan struct{}) error {
       defer runtime.HandleCrash()
       defer c.workqueue.ShuttingDown()
    
       // 记录开始日志
       klog.Info("Starting Bar control loop")
       klog.Info("Waiting for informer caches to sync")
    
       // 等待缓存同步数据
       if ok := cache.WaitForCacheSync(stopCh, c.informer.Informer().HasSynced); !ok {
           return fmt.Errorf("failed to wati for caches to sync")
       }
    
       klog.Info("Starting workers")
       for i := 0; i < thread; i++ {
           go wait.Until(c.runWorker, time.Second, stopCh)
       }
    
       klog.Info("Started workers")
       <-stopCh
       klog.Info("Shutting down workers")
       return nil
    }
    
    // runWorker 是一个不断运行的方法,并且一直会调用 c.processNextWorkItem 从 workqueue读取消息
    func (c *Controller) runWorker() {
       for c.processNExtWorkItem() {
       }
    }
    
    // 从workqueue读取和读取消息
    func (c *Controller) processNExtWorkItem() bool {
       // 获取 item
       item, shutdown := c.workqueue.Get()
       if shutdown {
           return false
       }
    
       if err := func(item interface{}) error {
           // 标记以及处理
           defer c.workqueue.Done(item)
           var key string
           var ok bool
           if key, ok = item.(string); !ok {
               // 判读key的类型不是字符串,则直接丢弃
               c.workqueue.Forget(item)
               runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", item))
               return nil
           }
    
           if err := c.syncHandler(key); err != nil {
               return fmt.Errorf("error syncing '%s':%s", item, err.Error())
           }
           c.workqueue.Forget(item)
           return nil
       }(item); err != nil {
           runtime.HandleError(err)
           return false
       }
       return true
    }
    
    // 尝试从 Informer 维护的缓存中拿到了它所对应的 Bar 对象
    func (c *Controller) syncHandler(key string) error {
       namespace, name, err := cache.SplitMetaNamespaceKey(key)
       if err != nil {
           runtime.HandleError(fmt.Errorf("invalid respirce key:%s", key))
           return err
       }
    
       bar, err := c.informer.Lister().Bars(namespace).Get(name)
       if err != nil {
           if errors.IsNotFound(err) {
               // 说明是在删除事件中添加进来的
               return nil
           }
           runtime.HandleError(fmt.Errorf("failed to get bar by: %s/%s", namespace, name))
           return err
       }
       fmt.Printf("[BarCRD] try to process bar:%#v ...", bar)
       // 可以根据bar来做其他的事。
       // todo
       return nil
    }
    
    func (c *Controller) addBar(item interface{}) {
       var key string
       var err error
       if key, err = cache.MetaNamespaceKeyFunc(item); err != nil {
           runtime.HandleError(err)
           return
       }
       c.workqueue.AddRateLimited(key)
    }
    
    func (c *Controller) deleteBar(item interface{}) {
       var key string
       var err error
       if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(item); err != nil {
           runtime.HandleError(err)
           return
       }
       fmt.Println("delete crd")
       c.workqueue.AddRateLimited(key)
    }
    
    func (c *Controller) updateBar(old, new interface{}) {
       oldItem := old.(*v1.Bar)
       newItem := new.(*v1.Bar)
       // 比较两个资源版本,如果相同,则不处理
       if oldItem.ResourceVersion == newItem.ResourceVersion {
           return
       }
       c.workqueue.AddRateLimited(new)
    }
    
    

    我们这里自定义的控制器只封装了一个 Informer 和一个限速队列,我们当然也可以在里面添加一个用于访问本地缓存的 Indexer,但实际上 Informer 中已经包含了 Lister,对于 List 和 Get 操作都会去通过 Indexer 从本地缓存中获取数据,所以只用一个 Informer 也是完全可行的。

    同样在 Informer 中注册了3个事件处理器,将监听的事件获取到后送入 workqueue 队列,然后通过控制器的控制循环不断从队列中消费数据,根据获取的 key 来获取数据判断对象是需要删除还是需要进行其他业务处理,这里我们同样也只是打印出了对应的操作日志,对于实际的项目则进行相应的业务逻辑处理即可。

    到这里一个完整的自定义 API 对象和它所对应的自定义控制器就编写完毕了。

    测试

    接下来我们直接运行我们的main函数:

    I0512 16:51:33.922138   39032 controller.go:29] Setting up Bar event handlers
    I0512 16:51:33.922255   39032 controller.go:47] Starting Bar control loop
    I0512 16:51:33.922258   39032 controller.go:48] Waiting for informer caches to sync
    I0512 16:51:34.023108   39032 controller.go:55] Starting workers
    I0512 16:51:34.023153   39032 controller.go:60] Started workers
    
    

    现在我们创建一个Bar资源对象:

    # bar.yaml
    
    apiVersion: example.com/v1
    kind: Bar
    metadata:
     name: bar-demo
     namespace: default
    spec:
     image: "nginx:1.17.1"
     deploymentName: example-bar
     replicas: 2
    
    

    直接创建上面的对象,注意观察控制器的日志:

    I0512 16:51:33.922138   39032 controller.go:29] Setting up Bar event handlers
    I0512 16:51:33.922255   39032 controller.go:47] Starting Bar control loop
    I0512 16:51:33.922258   39032 controller.go:48] Waiting for informer caches to sync
    I0512 16:51:34.023108   39032 controller.go:55] Starting workers
    I0512 16:51:34.023153   39032 controller.go:60] Started workers
    [BarCRD] try to process bar:"bar-demo" ...
    
    

    可以看到,我们上面创建 bar.yaml 的操作,触发了 EventHandler 的添加事件,从而被放进了工作队列。然后控制器的控制循环从队列里拿到这个对象,并且打印出了正在处理这个 bar 对象的日志信息。

    同样我们删除这个资源的时候,也会有对应的提示。

    这就是开发自定义 CRD 控制器的基本流程,当然我们还可以在事件处理的业务逻辑中去记录一些 Events 信息,这样我们就可以通过 Event 去了解我们资源的状态了。

    相关文章

      网友评论

          本文标题:从生成CRD到编写自定义控制器

          本文链接:https://www.haomeiwen.com/subject/xysptdtx.html