美文网首页kubernetes我是程序员Kubernetes
Kubernetes Client-go Informer 源码

Kubernetes Client-go Informer 源码

作者: 阿里云云栖号 | 来源:发表于2019-01-22 17:44 被阅读18次

    几乎所有的Controller manager 和CRD Controller 都会使用Client-go 的Informer 函数,这样通过Watch 或者Get List 可以获取对应的Object,下面我们从源码分析角度来看一下Client go Informer 的机制。

    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }
    
    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
    
    controller := NewController(kubeClient, exampleClient,
        kubeInformerFactory.Apps().V1().Deployments(),
        exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
    
    // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
    // Start method is non-blocking and runs all registered informers in a dedicated goroutine.
    kubeInformerFactory.Start(stopCh)
    

    这里的例子是以https://github.com/kubernetes/sample-controller/blob/master/main.go节选,主要以 k8s 默认的Deployment Informer 为例子。可以看到直接使用Client-go Informer 还是非常简单的,先不管NewCOntroller函数里面执行了什么,顺着代码来看一下kubeInformerFactory.Start都干了啥。

    // Start initializes all requested informers.
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
        f.lock.Lock()
        defer f.lock.Unlock()
    
        for informerType, informer := range f.informers {
            if !f.startedInformers[informerType] {
                go informer.Run(stopCh)
                f.startedInformers[informerType] = true
            }
        }
    }
    

    可以看到这里遍历了f.informers,而informers 的定义我们来看一眼数据结构

    type sharedInformerFactory struct {
        client           kubernetes.Interface
        namespace        string
        tweakListOptions internalinterfaces.TweakListOptionsFunc
        lock             sync.Mutex
        defaultResync    time.Duration
        customResync     map[reflect.Type]time.Duration
    
        informers map[reflect.Type]cache.SharedIndexInformer
        // startedInformers is used for tracking which informers have been started.
        // This allows Start() to be called multiple times safely.
        startedInformers map[reflect.Type]bool
    }
    

    我们这里的例子,在运行的时候,f.informers里面含有的内容如下

    type *v1.Deployment informer &{0xc000379fa0 <nil> 0xc00038ccb0 {} 0xc000379f80 0xc00033bb00 30000000000 30000000000 0x28e5ec8 false false {0 0} {0 0}}
    

    也就是说,每一种k8s 类型都会有自己的Informer函数。下面我们来看一下这个函数是在哪里注册的,这里以Deployment Informer 为例。

    首先回到刚开始初始化kubeClient 的代码,

    controller := NewController(kubeClient, exampleClient,
            kubeInformerFactory.Apps().V1().Deployments(),
            exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
            
            
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: controller.handleObject,
            UpdateFunc: func(old, new interface{}) {
                newDepl := new.(*appsv1.Deployment)
                oldDepl := old.(*appsv1.Deployment)
                if newDepl.ResourceVersion == oldDepl.ResourceVersion {
                    // Periodic resync will send update events for all known Deployments.
                    // Two different versions of the same Deployment will always have different RVs.
                    return
                }
                controller.handleObject(new)
            },
            DeleteFunc: controller.handleObject,
        })
    

    注意这里的传参, kubeInformerFactory.Apps().V1().Deployments(), 这句话的意思就是指创建一个只关注Deployment 的Informer.

    controller := &Controller{
            kubeclientset:     kubeclientset,
            sampleclientset:   sampleclientset,
            deploymentsLister: deploymentInformer.Lister(),
            deploymentsSynced: deploymentInformer.Informer().HasSynced,
            foosLister:        fooInformer.Lister(),
            foosSynced:        fooInformer.Informer().HasSynced,
            workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
            recorder:          recorder,
        }
    

    deploymentInformer.Lister() 这里就是初始化了一个Deployment Lister,下面来看一下Lister函数里面做了什么。

    // NewFilteredDeploymentInformer constructs a new informer for Deployment type.
    // Always prefer using an informer factory to get a shared informer instead of getting an independent
    // one. This reduces memory footprint and number of connections to the server.
    func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
        return cache.NewSharedIndexInformer(
            &cache.ListWatch{
                ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                    if tweakListOptions != nil {
                        tweakListOptions(&options)
                    }
                    return client.AppsV1().Deployments(namespace).List(options)
                },
                WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                    if tweakListOptions != nil {
                        tweakListOptions(&options)
                    }
                    return client.AppsV1().Deployments(namespace).Watch(options)
                },
            },
            &appsv1.Deployment{},
            resyncPeriod,
            indexers,
        )
    }
    
    func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
        return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
    }
    
    func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
        return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
    }
    
    func (f *deploymentInformer) Lister() v1.DeploymentLister {
        return v1.NewDeploymentLister(f.Informer().GetIndexer())
    }
    

    注意这里的Lister 函数,它调用了Informer ,然后触发了f.factory.InformerFor
    这就最终调用了sharedInformerFactory InformerFor函数,

    // InternalInformerFor returns the SharedIndexInformer for obj using an internal
    // client.
    func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
        f.lock.Lock()
        defer f.lock.Unlock()
    
        informerType := reflect.TypeOf(obj)
        informer, exists := f.informers[informerType]
        if exists {
            return informer
        }
    
        resyncPeriod, exists := f.customResync[informerType]
        if !exists {
            resyncPeriod = f.defaultResync
        }
    
        informer = newFunc(f.client, resyncPeriod)
        f.informers[informerType] = informer
    
        return informer
    }
    

    这里可以看到,informer = newFunc(f.client, resyncPeriod)这句话最终完成了对于informer的创建,并且注册到了Struct object中,完成了前面我们的问题。

    下面我们再回到informer start

    // Start initializes all requested informers.
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
        f.lock.Lock()
        defer f.lock.Unlock()
    
        for informerType, informer := range f.informers {
            if !f.startedInformers[informerType] {
                go informer.Run(stopCh)
                f.startedInformers[informerType] = true
            }
        }
    }
    

    这里可以看到,它会遍历所有的informer,然后选择异步调用Informer 的RUN方法。我们来全局看一下Run方法

    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
        defer utilruntime.HandleCrash()
    
        fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    
        cfg := &Config{
            Queue:            fifo,
            ListerWatcher:    s.listerWatcher,
            ObjectType:       s.objectType,
            FullResyncPeriod: s.resyncCheckPeriod,
            RetryOnError:     false,
            ShouldResync:     s.processor.shouldResync,
    
            Process: s.HandleDeltas,
        }
    
        func() {
            s.startedLock.Lock()
            defer s.startedLock.Unlock()
    
            s.controller = New(cfg)
            s.controller.(*controller).clock = s.clock
            s.started = true
        }()
    
        // Separate stop channel because Processor should be stopped strictly after controller
        processorStopCh := make(chan struct{})
        var wg wait.Group
        defer wg.Wait()              // Wait for Processor to stop
        defer close(processorStopCh) // Tell Processor to stop
        wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
        wg.StartWithChannel(processorStopCh, s.processor.run)
    
        defer func() {
            s.startedLock.Lock()
            defer s.startedLock.Unlock()
            s.stopped = true // Don't want any new listeners
        }()
        s.controller.Run(stopCh)
    }
    

    首先它根据得到的 key 拆分函数和Store index 创建一个FIFO队列,这个队列是一个先进先出的队列,主要用来保存对象的各种事件。

    func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
        f := &DeltaFIFO{
            items:        map[string]Deltas{},
            queue:        []string{},
            keyFunc:      keyFunc,
            knownObjects: knownObjects,
        }
        f.cond.L = &f.lock
        return f
    }
    

    可以看到这个队列创建的比较简单,就是使用 Map 来存放数据,String 数组来存放队列的 Key。

    后面根据client 创建的List 和Watch 函数,还有队列创建了一个 config,下面将根据这个config 来初始化controller. 这个controller是client-go 的Cache controller ,主要用来控制从 APIServer 获得的对象的 cache 以及更新对象。

    下面主要关注这个函数调用

    wg.StartWithChannel(processorStopCh, s.processor.run)
    

    这里进行了真正的Listering 调用。

    func (p *sharedProcessor) run(stopCh <-chan struct{}) {
        func() {
            p.listenersLock.RLock()
            defer p.listenersLock.RUnlock()
            for _, listener := range p.listeners {
                p.wg.Start(listener.run)
                p.wg.Start(listener.pop)
            }
            p.listenersStarted = true
        }()
        <-stopCh
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
        }
        p.wg.Wait() // Wait for all .pop() and .run() to stop
    }
    

    主要看 run 方法,还记得前面已经把ADD UPDATE DELETE 注册了自定义的处理函数了吗。这里就实现了前面函数的触发

    func (p *processorListener) run() {
        // this call blocks until the channel is closed.  When a panic happens during the notification
        // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
        // the next notification will be attempted.  This is usually better than the alternative of never
        // delivering again.
        stopCh := make(chan struct{})
        wait.Until(func() {
            // this gives us a few quick retries before a long pause and then a few more quick retries
            err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
                for next := range p.nextCh {
                    switch notification := next.(type) {
                    case updateNotification:
                        p.handler.OnUpdate(notification.oldObj, notification.newObj)
                    case addNotification:
                        p.handler.OnAdd(notification.newObj)
                    case deleteNotification:
                        p.handler.OnDelete(notification.oldObj)
                    default:
                        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
                    }
                }
                // the only way to get here is if the p.nextCh is empty and closed
                return true, nil
            })
    
            // the only way to get here is if the p.nextCh is empty and closed
            if err == nil {
                close(stopCh)
            }
        }, 1*time.Minute, stopCh)
    }
    

    可以看到当p.nexhCh channel 接收到一个对象进入的时候,就会根据通知类型的不同,选择对应的用户注册函数去调用。那么这个channel 谁来向其中传入参数呢

    func (p *processorListener) pop() {
        defer utilruntime.HandleCrash()
        defer close(p.nextCh) // Tell .run() to stop
    
        var nextCh chan<- interface{}
        var notification interface{}
        for {
            select {
            case nextCh <- notification:
                // Notification dispatched
                var ok bool
                notification, ok = p.pendingNotifications.ReadOne()
                if !ok { // Nothing to pop
                    nextCh = nil // Disable this select case
                }
            case notificationToAdd, ok := <-p.addCh:
                if !ok {
                    return
                }
                if notification == nil { // No notification to pop (and pendingNotifications is empty)
                    // Optimize the case - skip adding to pendingNotifications
                    notification = notificationToAdd
                    nextCh = p.nextCh
                } else { // There is already a notification waiting to be dispatched
                    p.pendingNotifications.WriteOne(notificationToAdd)
                }
            }
        }
    }
    

    答案就是这个pop 函数,这里会从p.addCh中读取增加的通知,然后转给p.nexhCh 并且保证每个通知只会读取一次。

    下面就是最终的Controller run 函数,我们来看看到底干了什么

    // Run begins processing items, and will continue until a value is sent down stopCh.
    // It's an error to call Run more than once.
    // Run blocks; call via go.
    func (c *controller) Run(stopCh <-chan struct{}) {
        defer utilruntime.HandleCrash()
        go func() {
            <-stopCh
            c.config.Queue.Close()
        }()
        r := NewReflector(
            c.config.ListerWatcher,
            c.config.ObjectType,
            c.config.Queue,
            c.config.FullResyncPeriod,
        )
        r.ShouldResync = c.config.ShouldResync
        r.clock = c.clock
    
        c.reflectorMutex.Lock()
        c.reflector = r
        c.reflectorMutex.Unlock()
    
        var wg wait.Group
        defer wg.Wait()
    
        wg.StartWithChannel(stopCh, r.Run)
    
        wait.Until(c.processLoop, time.Second, stopCh)
    }
    

    这里主要的就是wg.StartWithChannel(stopCh, r.Run)

    // Run starts a watch and handles watch events. Will restart the watch if it is closed.
    // Run will exit when stopCh is closed.
    func (r *Reflector) Run(stopCh <-chan struct{}) {
        klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
        wait.Until(func() {
            if err := r.ListAndWatch(stopCh); err != nil {
                utilruntime.HandleError(err)
            }
        }, r.period, stopCh)
    }
    

    这里就调用了r.ListAndWatch 方法,这个方法比较复杂,我们慢慢来看。

    // watchHandler watches w and keeps *resourceVersion up to date.
    func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
        start := r.clock.Now()
        eventCount := 0
    
        // Stopping the watcher should be idempotent and if we return from this function there's no way
        // we're coming back in with the same watch interface.
        defer w.Stop()
        // update metrics
        defer func() {
            r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
            r.metrics.watchDuration.Observe(time.Since(start).Seconds())
        }()
    
    loop:
        for {
            select {
            case <-stopCh:
                return errorStopRequested
            case err := <-errc:
                return err
            case event, ok := <-w.ResultChan():
                if !ok {
                    break loop
                }
                if event.Type == watch.Error {
                    return apierrs.FromObject(event.Object)
                }
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
                meta, err := meta.Accessor(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                    continue
                }
                newResourceVersion := meta.GetResourceVersion()
                switch event.Type {
                case watch.Added:
                    err := r.store.Add(event.Object)
                    if err != nil {
                        utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                    }
                case watch.Modified:
                    err := r.store.Update(event.Object)
                    if err != nil {
                        utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                    }
                case watch.Deleted:
                    // TODO: Will any consumers need access to the "last known
                    // state", which is passed in event.Object? If so, may need
                    // to change this.
                    err := r.store.Delete(event.Object)
                    if err != nil {
                        utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                    }
                default:
                    utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                }
                *resourceVersion = newResourceVersion
                r.setLastSyncResourceVersion(newResourceVersion)
                eventCount++
            }
        }
    
        watchDuration := r.clock.Now().Sub(start)
        if watchDuration < 1*time.Second && eventCount == 0 {
            r.metrics.numberOfShortWatches.Inc()
            return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
        }
        klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
        return nil
    }
    

    这里就是真正调用watch 方法,根据返回的watch 事件,将其放入到前面创建的 FIFO 队列中。

    最终调用了controller 的POP 方法

    // processLoop drains the work queue.
    // TODO: Consider doing the processing in parallel. This will require a little thought
    // to make sure that we don't end up processing the same object multiple times
    // concurrently.
    //
    // TODO: Plumb through the stopCh here (and down to the queue) so that this can
    // actually exit when the controller is stopped. Or just give up on this stuff
    // ever being stoppable. Converting this whole package to use Context would
    // also be helpful.
    func (c *controller) processLoop() {
        for {
            obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
            if err != nil {
                if err == FIFOClosedError {
                    return
                }
                if c.config.RetryOnError {
                    // This is the safe way to re-enqueue.
                    c.config.Queue.AddIfNotPresent(obj)
                }
            }
        }
    }
    

    前面是将 watch 到的对象加入到队列中,这里的goroutine 就是用来消费的。具体的消费函数就是前面创建的Process 函数

    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
        s.blockDeltas.Lock()
        defer s.blockDeltas.Unlock()
    
        // from oldest to newest
        for _, d := range obj.(Deltas) {
            switch d.Type {
            case Sync, Added, Updated:
                isSync := d.Type == Sync
                s.cacheMutationDetector.AddObject(d.Object)
                if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                    if err := s.indexer.Update(d.Object); err != nil {
                        return err
                    }
                    s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
                } else {
                    if err := s.indexer.Add(d.Object); err != nil {
                        return err
                    }
                    s.processor.distribute(addNotification{newObj: d.Object}, isSync)
                }
            case Deleted:
                if err := s.indexer.Delete(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
            }
        }
        return nil
    }
    

    这个函数就是根据传进来的obj,先从自己的cache 中取一下,看是否存在,如果存在就代表是Update ,那么更新自己的队列后,调用用户注册的Update 函数,如果不存在,就调用用户的 Add 函数。

    到此Client-go 的Informer 流程源码分析基本完毕。



    本文作者:xianlubird

    阅读原文

    本文为云栖社区原创内容,未经允许不得转载。

    相关文章

      网友评论

        本文标题:Kubernetes Client-go Informer 源码

        本文链接:https://www.haomeiwen.com/subject/qcrljqtx.html