美文网首页kubernets学习kuberne...
client-go中的SharedInformerFactory

client-go中的SharedInformerFactory

作者: zoux | 来源:发表于2022-08-18 10:18 被阅读0次

Table of Contents

1.章节介绍

本章首先介绍SharedInformerFactory,了解其组成和作用。

然后以Podinformer为例,了解一个资源实例的Informer应该需要实现哪些函数。

本节并没有设计到具体图中的informer机制,只是从大的入口入手,看看SharedInformerFactory到底是什么

image.png

2. SharedInformerFactory

SharedInformerFactory封装了NewSharedIndexInformer方法。字如其名,SharedInformerFactory使用的是工厂模式来生成各类的Informer。无论是k8s控制器,还是自定义控制器, SharedInformerFactory都是非常重要的一环。所以首先分析SharedInformerFactory。这里以一个实例入手分析SharedInformerFactory。

2.1 SharedInformerFactory实例介绍

package main

import (
    "fmt"
    clientset "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/tools/cache"
    "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "time"
)

func main()  {
    config := &rest.Config{
        Host: "http://172.21.0.16:8080",
    }
    client := clientset.NewForConfigOrDie(config)
    // 生成SharedInformerFactory
    factory := informers.NewSharedInformerFactory(client, 5 * time.Second)
    // 生成PodInformer
    podInformer := factory.Core().V1().Pods()
    // 获得一个cache.SharedIndexInformer 单例模式
    sharedInformer := podInformer.Informer()

    //注册add, update, del处理事件
    sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj interface{}) {fmt.Printf("add: %v\n", obj.(*v1.Pod).Name)},
        UpdateFunc: func(oldObj, newObj interface{}) {fmt.Printf("update: %v\n", newObj.(*v1.Pod).Name)},
        DeleteFunc: func(obj interface{}){fmt.Printf("delete: %v\n", obj.(*v1.Pod).Name)},
    })

    stopCh := make(chan struct{})

    // 第一种方式
    // 可以这样启动  也可以按照下面的方式启动
    // go sharedInformer.Run(stopCh)
    // time.Sleep(2 * time.Second)

    // 第二种方式,这种方式是启动factory下面所有的informer
    factory.Start(stopCh)
    factory.WaitForCacheSync(stopCh)

    pods, _ := podInformer.Lister().Pods("default").List(labels.Everything())

    for _, p := range pods {
        fmt.Printf("list pods: %v\n", p.Name)
    }
    <- stopCh
}

2.2 sharedInformerFactory结构体

type sharedInformerFactory struct {
  // client客户端
    client           kubernetes.Interface            
    // sharedInformerFactory是没有namespaces限制的。不过可以设置namespaces限制该factory后面的informer都是指定namespaces的
    namespace        string          
  // TweakListOptionsFunc其实就是ListOptions,这个是针对所有Informer List生效的 (WithTweakListOptions可以看出来)
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    lock             sync.Mutex
    // 这个是list默认定期同步的时间间隔
    defaultResync    time.Duration
    // 每种informer还可以自定义
    customResync     map[reflect.Type]time.Duration
  
  // 属于该factory下面的所有的informer
    informers map[reflect.Type]cache.SharedIndexInformer
    // startedInformers is used for tracking which informers have been started.
    // This allows Start() to be called multiple times safely.
    // 判断informer是否已经 Run起来了
    startedInformers map[reflect.Type]bool   
}

2.3 sharedInformerFactory成员函数

定义customResync
// WithCustomResyncConfig sets a custom resync period for the specified informer types.
func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption 

定义tweakListOptions
// WithTweakListOptions sets a custom filter on all listers of the configured SharedInformerFactory.
func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption 

定义namespaces
// WithNamespace limits the SharedInformerFactory to the specified namespace.
func WithNamespace(namespace string) SharedInformerOption 

// start所有的informer
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

// WaitForCacheSync让所有的informers同步cache。一般informer.run函数中都有一个这样的语句。先等cache同步。这个的含义就是等list完了的数据,全部转换到cache中去。
    // Wait for all involved caches to be synced, before processing items from the queue is started
    if !cache.WaitForCacheSync(stopCh, ctrl.Informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }
    
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
    informers := func() map[reflect.Type]cache.SharedIndexInformer {
        f.lock.Lock()
        defer f.lock.Unlock()

        informers := map[reflect.Type]cache.SharedIndexInformer{}
        for informerType, informer := range f.informers {
            if f.startedInformers[informerType] {
                informers[informerType] = informer
            }
        }
        return informers
    }()

    res := map[reflect.Type]bool{}
    for informType, informer := range informers {
        res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
    }
    return res
}


// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    // 如果存在同类的,直接返回,不会再new一个。这里的type就是 pod/deploy
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}

// SharedInformerFactory provides shared informers for resources in all known
// API group versions.
type SharedInformerFactory interface {
    internalinterfaces.SharedInformerFactory
    ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
    WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

  // 提供k8s内置资源的定义接口,从这里可以看出来
    Admissionregistration() admissionregistration.Interface 
    Apps() apps.Interface
    Autoscaling() autoscaling.Interface
    Batch() batch.Interface
    Certificates() certificates.Interface
    Coordination() coordination.Interface
    Core() core.Interface
    Events() events.Interface
    Extensions() extensions.Interface
    Networking() networking.Interface
    Policy() policy.Interface
    Rbac() rbac.Interface
    Scheduling() scheduling.Interface
    Settings() settings.Interface
    Storage() storage.Interface
}

// 例如core组下面的资源,f.Core().v1.pods() 就是这个
func (f *sharedInformerFactory) Core() core.Interface {
    return core.New(f, f.namespace, f.tweakListOptions)
}

2.4 总结

通过对sharedInformerFactory的成员和函数介绍,了解到:

(1)factory就是提供了一个构造informer的入口,里面包含了一堆Informer

(2)同一中资源类型共用一个Infomer。这样的话就可以节省不必要的资源。例如kcm中,rs可以需要监听pod资源,gc也需要监听Pod资源,通过factory机制就可以使用同一个

(3)但是监听同一种类型的资源,但是不同的listOption看起来也是不行,例如一个Informer监听running的pod,一个Informer监听error的Pod, 是需要多个factory。

3. podInformer

从上诉可以看出来,sharedInformerFactory只是一个入口。接下来以podInformer为例,看看一个具体的资源Informer需要实现哪些功能。

3.1 PodInformer结构体

// PodInformer provides access to a shared informer and lister for
// Pods.
// 只需要实现Informer,Lister函数
type PodInformer interface {
    Informer() cache.SharedIndexInformer     
    Lister() v1.PodLister
}

type podInformer struct {
    factory          internalinterfaces.SharedInformerFactory   //  是哪一个factory生成的informer
    tweakListOptions internalinterfaces.TweakListOptionsFunc    //  有哪些filter
    namespace        string                                     //  命名空间
}

3.2 PodInformer成员函数

从函数定义可以看出来,informer其实就是 cache.SharedIndexInformer

New SharedIndexInformer的时候指定了ListWatch函数。

listFunc: client.CoreV1().Pods(namespace).List(options)

WatchFunc: client.CoreV1().Pods(namespace).Watch(options)

所以从结构体上推测:

(1) informer最终都是 cache.SharedIndexInformer。但是 cache.SharedIndexInformer需要先定义好list, watch函数

(2)cache.SharedIndexInformer里面的index就是存储+查询。根据定义好的list, watch更新index的数据

接下来继续看看cache.SharedIndexInformer是如何实现的。

// NewPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, namespace, resyncPeriod, indexers, nil)
}

// NewFilteredPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).Watch(options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}

// 默认只有namespaces这个indexer
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}


func (f *podInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

// 返回Lister数据, 这里是从index里面获取,而不是从apiserver中获取
func (f *podInformer) Lister() v1.PodLister {
    return v1.NewPodLister(f.Informer().GetIndexer())
}

cache中的index定义
k8s.io/client-go/tools/cache/index.go
// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
    Store
    // Retrieve list of objects that match on the named indexing function
    Index(indexName string, obj interface{}) ([]interface{}, error)
    // IndexKeys returns the set of keys that match on the named indexing function.
    IndexKeys(indexName, indexKey string) ([]string, error)
    // ListIndexFuncValues returns the list of generated values of an Index func
    ListIndexFuncValues(indexName string) []string
    // ByIndex lists object that match on the named indexing function with the exact key
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    // GetIndexer return the indexers
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
}

4.总结

(1)factory就是提供了一个构造informer的入口,里面包含了一堆Informer

(2)同一中资源类型共用一个Infomer。这样的话就可以节省不必要的资源。例如kcm中,rs可以需要监听pod资源,gc也需要监听Pod资源,通过factory机制就可以使用同一个

(3)但是监听同一种类型的资源,但是不同的listOption看起来也是不行,例如一个Informer监听running的pod,一个Informer监听error的Pod, 是需要多个factory。

(4)当前factory并没有利用到图中表示Informer机制。最终是cache.SharedIndexInformer 包含了所有的参数,实现了上诉图中的Informer机制。下一节开始介绍cache.SharedIndexInformer

相关文章

网友评论

    本文标题:client-go中的SharedInformerFactory

    本文链接:https://www.haomeiwen.com/subject/jftsgrtx.html