美文网首页k8s
k8s go client Informer 结合源码分析

k8s go client Informer 结合源码分析

作者: shoyu666 | 来源:发表于2022-03-11 00:19 被阅读0次

    背景

    k8s apiService 通过 http 对外暴露服务, go client 是 k8s 提供的一套 go语言实现的client lib,封装了通用的流程。
    go client中比较重要的就是 Informer机制。本文就结合源码分析下go client informer的流程。

    informer是什么

    go client的本质就是通过http请求 apiService,但是实际需要考虑很多情况,比如
    1:直接请求apiService,会导致apiService压力太大。
    2:请求一个资源,要考虑监听资源的变动事件,客户端消费事件的性能。
    3:重复开发等等
    go client的目的就是提供通用的请求apiService的lib,考虑了 请求 k8s资源,监听k8s资源,性能优化等共性问题。
    Informer机制解决了上面的一些问题。
    informer机制如下


    图1

    该图反应了informer的工作机制,下面我们结合代码来解释下这张图。

    分析

    分析之前先说说怎么使用 go client
    看一段代码,然后对代码段做说明。

    package main
    
    import (
        "flag"
        "fmt"
        "k8s.io/apimachinery/pkg/util/wait"
        "path/filepath"
        "time"
    
        v1 "k8s.io/api/core/v1"
        "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/tools/clientcmd"
    )
    
    func main() {
    
        var namespace, configFile string
    
        flag.StringVar(&namespace, "n", "memall", "Your Namespace ")
        //这里是设置配置文件为本地的    .kube/config 该文件是k8s服务器的相关连接配置
        flag.StringVar(&configFile, "config", filepath.Join("C:\\Users\\n\\", ".kube", "config"), "Config file ")
        flag.Parse()
    
        fmt.Printf("Running with kube config file %v\n", configFile)
    
        config, err := clientcmd.BuildConfigFromFlags("", configFile)
        if err != nil {
            panic(err.Error())
        }
    
        //创建 clientset, set意思就是集合,client集合,client go为不同的资源准备了不同的http client
        clientset, err := kubernetes.NewForConfig(config)
    
        //创建 SharedInformerFactory,用于创建 SharedIndexInformer
        informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute*1)
            //创建 PodInformer:Pod的SharedIndexInformer通过PodInformer 创建
        podInformer := informerFactory.Core().V1().Pods()
           //设置监听响应函数
        podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    
            AddFunc: func(obj interface{}) {
                //pod添加回调
                newPod := obj.(*v1.Pod)
                fmt.Printf("new pod added %v\n", newPod.GetName())
            },
    
            UpdateFunc: func(old, new interface{}) {
                //pod更新回调
                newPod := new.(*v1.Pod)
                fmt.Printf("new pod update %v\n", newPod.GetName())
            },
    
            DeleteFunc: func(new interface{}) {
                 //pod删除回调
                newPod := new.(*v1.Pod)
                fmt.Printf("new pod deleted %v\n", newPod.GetName())
            },
        })
    
        stopper := make(chan struct{})
        defer close(stopper)
            //启动informer
        informerFactory.Start(stopper)
            //等待缓存同步
        informerFactory.WaitForCacheSync(wait.NeverStop)
        <-stopper
    }
    
    

    以上就是使用的过程
    第一步设置k8s集群的连接配置(config)
    第二步基于该配置创建SharedInformerFactory 和 podInformer
    第三步通过 podInformer.Informer() 方法获得 Pod的 SharedIndexInformer
    第四步为SharedIndexInformer设置监听
    第五步 informerFactory.Start(stopper) 启动 informer
    当启动informer是就进入了图1的流程
    下面我们来具体看下。

    创建 SharedInformerFactory,SharedInformerFactory用于创建 SharedIndexInformer

        informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute*1)
    

    这段代码创建了SharedInformerFactory,SharedInformerFactory 的作用如同它的名字 SharedIndexInformer 工厂,用来创建和管理 各种k8s资源的SharedIndexInformer,比如 专门用于获取和监听 Pod的资源的 SharedIndexInformer


    图2

    NewSharedInformerFactory 内部创建的sharedInformerFactory

      
        factory := &sharedInformerFactory{
            client:           client,
            namespace:        v1.NamespaceAll,
            defaultResync:    defaultResync,
                   //这里就是图2表示的,informers管理了各种类型的SharedIndexInformer 
            informers:        make(map[reflect.Type]cache.SharedIndexInformer),
            startedInformers: make(map[reflect.Type]bool),
            customResync:     make(map[reflect.Type]time.Duration),
        }
    

    每个SharedIndexInformer都会进入各自的informer流程。这里用 用于Pod 的 SharedIndexInformer 做分析
    用于 Pod 的 SharedIndexInformer 和 用于 Deployment 的 SharedIndexInformer 都是 SharedIndexInformer ,但他们请求的api资源是不一样的,所以代码上肯定有区别。
    实际上sharedInformerFactory 创建 Pod的SharedIndexInformer 是委托给 PodInformer 的。创建 Deployment 的SharedIndexInformer 是委托给 DeploymentInformer

            //PodInformer 
        podInformer := informerFactory.Core().V1().Pods()
            //DeploymentInformer
           //deploymentInformer := informerFactory.Apps().V1().Deployments()
           //通过podInformer 的 Informer()方法创建SharedIndexInformer 
           podInformer.Informer().AddEventHandler(.....
    

    Pod 的 SharedIndexInformer 就是通过调用podInformer Informer() 方法获取的。

    //PodInformer创建SharedIndexInformer 
    func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
        return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
    }
    //Informer()方法实际是又回到了sharedInformerFactory,主要是获取已经存在的缓存,
    //如果缓存没有会执行 defaultInformer,也就是上面的函数去创建一个SharedIndexInformer 
    func (f *podInformer) Informer() cache.SharedIndexInformer {
        return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
    }
    

    接着NewFilteredPodInformer方法创建Pod的SharedIndexInformer

    func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
        return cache.NewSharedIndexInformer(
            &cache.ListWatch{
                ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                    if tweakListOptions != nil {
                        tweakListOptions(&options)
                    }
                                   //这里专用于请求Pods资源,可以看出 不同 SharedIndexInformer 的区别
                    return client.CoreV1().Pods(namespace).List(context.TODO(), options)
                },
                WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                    if tweakListOptions != nil {
                        tweakListOptions(&options)
                    }
                    return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
                },
            },
            &corev1.Pod{},
            resyncPeriod,
            indexers,
        )
    }
    

    对比下 创建DeploymentI的SharedIndexInformer

    func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
        return cache.NewSharedIndexInformer(
            &cache.ListWatch{
                ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                    if tweakListOptions != nil {
                        tweakListOptions(&options)
                    }
                                //这里专用于请求Deploymens资源,可以看出 不同于 Pod SharedIndexInformer
                    return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
                },
                WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                    if tweakListOptions != nil {
                        tweakListOptions(&options)
                    }
                    return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
                },
            },
            &appsv1.Deployment{},
            resyncPeriod,
            indexers,
        )
    }
    

    现在我们有了Pod的SharedIndexInformer,现在可以把它启动了。

    informerFactory.Start(wait.NeverStop)
    

    informerFactory的Start内部其实就是循环调用各个SharedIndexInformer.Run方法(启动)

    // Start initializes all requested informers.
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
        f.lock.Lock()
        defer f.lock.Unlock()
    
        for informerType, informer := range f.informers {
            if !f.startedInformers[informerType] {
                           //启动 SharedIndexInformer
                go informer.Run(stopCh)
                f.startedInformers[informerType] = true
            }
        }
    }
    

    可以看到前面主要是创建并初始化 Pod的SharedIndexInformer,然后调用 run()方法进入Informer机制。
    在开始讲run()方法之前,先讲下 创建的SharedIndexInformer 都有哪些成员变量。


    图3

    图3中以SharedIndexInformer为中心,在SharedIndexInformer创建和run()运行过程中会初始化的主要对象。run方法的流程围绕这些对象展开。
    先上图然后分析


    图4

    我们以中间的虚线分割,分上下2个部分分析。

    SharedIndexInformer的Run方法会调用Controller 的run

    //https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/shared_informer.go#L443
    s.controller.Run(stopCh)
    

    //controller run方法会调用 Reflector 的 Run 方法和 自身的 processLoop 方法

    //https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/controller.go#L153
    wg.StartWithChannel(stopCh, r.Run)
    wait.Until(c.processLoop, time.Second, stopCh)
    

    //processLoop 方法会走图4虚线下面的逻辑
    //Reflector 的run方法 会调用 自身的 ListAndWatch

    https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/reflector.go#L221
    if err := r.ListAndWatch(stopCh); err != nil {...}
    

    ListViewAndWatch内部很长,作用见方法的描述。这里主要看Watch和Watch到事件后的回调 (也是图1中的 1List & Watch逻辑 和 2 Add Object)

    //https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/reflector.go#L415
            w, err := r.listerWatcher.Watch(options) //Watch Pod资源
    //https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/reflector.go#L429
            if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {...}//事件回调
    

    listerWatcher.Watch方法会去请求 apiService的pod资源 (Watch是通过http 1.1 持久连接特性)
    watchHandler 会将事件添加到 DeltaFIFO (队列)中。

    上面事件添加到 DeltaFIFO中,下面就是去消费了(也是图1中 3Pop Object)。
    processLoop 方法会走图4虚线下面的逻辑。
    processLoop 看名字就是循环,循环消费 DeltaFIFO。

    //https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/controller.go#L184
        func (c *controller) processLoop() {
        for {
    
            obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    

    实际消费的处理函数是 PopProcessFunc 该函数指向 SharedIndexInformer 的 HandleDeltas函数。

    //https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/shared_informer.go#L566
    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
        s.blockDeltas.Lock()
        defer s.blockDeltas.Unlock()
    
        if deltas, ok := obj.(Deltas); ok {
            return processDeltas(s, s.indexer, s.transform, deltas)
        }
        return errors.New("object given as Process argument is not Deltas")
    }
    

    processDeltas会将事件添加到Indexer。

    //https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/controller.go#L438
            case Sync, Replaced, Added, Updated:
                           //clientState.Get(obj);就是图1  9 get Object for Key的操作,通过key获取到old
                if old, exists, err := clientState.Get(obj); err == nil && exists {
                    if err := clientState.Update(obj); err != nil {
                        return err
                    }
                    handler.OnUpdate(old, obj)
                } else {
                    if err := clientState.Add(obj); err != nil {
                        return err
                    }
                    handler.OnAdd(obj)
                }
    

    hanlder.OnAdd(obj) 实际调用 sharedIndexInformer 的 OnAdd。

    
    //https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/shared_informer.go#L577
    // Conforms to ResourceEventHandler
    func (s *sharedIndexInformer) OnAdd(obj interface{}) {
        // Invocation of this function is locked under s.blockDeltas, so it is
        // save to distribute the notification
        s.cacheMutationDetector.AddObject(obj)
            //最后到processor 的 distribute,distribute 方法将事件发送到processorListener的 addCh(Chan)
        s.processor.distribute(addNotification{newObj: obj}, false)
    }
    

    distribute 方法将事件发送到processorListener的 addCh(Chan)。 用于下面processorListener 去接收
    这里补下前提:processorListener 在启动的时候就会启动每个 Listener(processorListener),主要是 processorListener 的run方法和pop方法,用于接收 addCh 的数据和消费数据。

    //https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/shared_informer.go#L664
        p.wg.Start(listener.run)
        p.wg.Start(listener.pop)
    

    Pop方法是接收事件(从addCh接收),run方法是消费事件。
    processorListener 是对 AddEventHandler 调用参数 Listener的封装 。

    //https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/shared_informer.go#L545
        listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
    

    所以 processorListener 的run方法消费事件后会将事件回调给 客户端,比如最开头的使用例子

           //设置监听响应函数
        podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    
            AddFunc: func(obj interface{}) {
                //pod添加回调(processorListener 的run方法消费事件后会将事件回调)
                newPod := obj.(*v1.Pod)
                fmt.Printf("new pod added %v\n", newPod.GetName())
            },
    
    

    最后在提下 processorListener pop方法
    case nextCh <- notification: //case 1
    case notificationToAdd, ok := <-p.addCh:// case 2
    这里2个case
    事件接收比消费快时,事件会进入pendingNotifications(环形可增长的buff).
    当 pendingNotifications没有数据时 case 1会停止(nextCh = nil // Disable this select case)
    当addCh 来数据后,会重新开启case1 ( nextCh = p.nextCh)

    func (p *processorListener) pop() {
        defer utilruntime.HandleCrash()
        defer close(p.nextCh) // Tell .run() to stop
    
        var nextCh chan<- interface{}
        var notification interface{}
        for {
            select {
            case nextCh <- notification:
                // Notification dispatched
                fmt.Printf(" nextCh in")
                var ok bool
                fmt.Printf(" ReadOne \n")
                notification, ok = p.pendingNotifications.ReadOne()
                if !ok { // Nothing to pop
                    fmt.Printf(" nextCh nil \n ")
                    nextCh = nil // Disable this select case
                }
            case notificationToAdd, ok := <-p.addCh:
                if !ok {
                    return
                }
                if notification == nil { // No notification to pop (and pendingNotifications is empty)
                    // Optimize the case - skip adding to pendingNotifications
                    notification = notificationToAdd
                    fmt.Printf(" nextCh change \n")
                    nextCh = p.nextCh
                } else { // There is already a notification waiting to be dispatched
                    fmt.Printf(" WriteOne \n")
                    p.pendingNotifications.WriteOne(notificationToAdd)
                }
            }
        }
    }
    

    以上通过 Pod特例简单分析了下informer机制。

    相关文章

      网友评论

        本文标题:k8s go client Informer 结合源码分析

        本文链接:https://www.haomeiwen.com/subject/mdrqdrtx.html