美文网首页kubernets学习kuberne...
informer机制之cache.indexer机制

informer机制之cache.indexer机制

作者: zoux | 来源:发表于2022-08-18 10:18 被阅读0次

    Table of Contents

    1. 背景

    tool/cache.indexer是informer中提供本地缓存,并且带有丰富索引的机制。

    index是索引的实现。类似于数据库的索引一样,index可以加快查找速度。

    本节就是弄清楚cache中indexer是如何实现的

    本节研究的内容位置整个informer机制的红色圈起来区域

    image.png

    如何存储+如何索引

    2. Indexer结构说明

    Indexer是一个接口,包含两个部分:

    (1)Store。从Store定义来看,Store是真正保存数据的结构体。Store本身也是一个接口,具体的存储需要实现这些接口。

    (2)Index,IndexKeys,ListIndexFuncValues,ByIndex,GetIndexers,AddIndexers 等和操作索引有关的函数

    // IndexFunc knows how to provide an indexed value for an object.
    type IndexFunc func(obj interface{}) ([]string, error)
    
    // Index maps the indexed value to a set of keys in the store that match on that value
    type Index map[string]sets.String
    
    // Indexers maps a name to a IndexFunc
    type Indexers map[string]IndexFunc
    
    // Indices maps a name to an Index
    type Indices map[string]Index
    
    
    // Indexer接口是为了添加或者查询索引用的。当前可能一下子看注释很迷惑,先看看后面的例子就清楚了
    type Indexer interface {
        Store
        // 通过indexName获得索引函数,然后obj(pod)对象作为函数输入,输出所有检索值。然后找出来所有包含检索值的对象(pod)
        // 举例pod1 通过byuser这个函数,检索出来有ernie,bert两个检索值
        // 然后Index("byuser",pod1) 会输出pod1, pod2(包含bert),pod3(包含ernie)
        // Retrieve list of objects that match on the named indexing function
        Index(indexName string, obj interface{}) ([]interface{}, error)
        
        // 通过索引函数的名字(byUser)+具体的值(bert),获得pod的名字(ns/podName)
        // IndexKeys returns the set of keys that match on the named indexing function.
        IndexKeys(indexName, indexKey string) ([]string, error)
        
        // 通过索引函数的名字(byUser), 获得所有的索引值。这里输入byuser, 输出:ernie, bert, elmo, oscar
        // ListIndexFuncValues returns the list of generated values of an Index func
        ListIndexFuncValues(indexName string) []string
        
        // 通过索引函数的名字(byUser)+具体的值(bert),获得pod对象
        // ByIndex lists object that match on the named indexing function with the exact key
        ByIndex(indexName, indexKey string) ([]interface{}, error)
        
        // 返回所有的索引函数
        // GetIndexer return the indexers
        GetIndexers() Indexers
        
        // AddIndexers adds more indexers to this store.  If you call this after you already have data
        // in the store, the results are undefined.
        // 添加 索引函数。每个索引函数都有一个唯一的名字,那就是 indexName 
        AddIndexers(newIndexers Indexers) error
    }
    

    Store是一个存储的接口,后面结合具体存储实现再讲。这里先讲一下 Index, Indexers, Indices的关系。

    IndexFunc:索引函数。输入对象,输出对象在该索引函数下匹配的字段(索引值)列表。

    Index: 索引表。 map结构,key索引值, value是对象名(初始化Indexer的时候需要指定,默认是ns+metadata.name表示一个对象)

    Indexers:索引函数表。 map结构,索引函数可以有多个,所以每个索引函数需要起一个名字来表示。map的key是一个索引函数的名称,value是一个个的索引函数。

    Indices:Index的复数形式。每个索引函数名对应一个索引函数,每个索引函数对应很多索引值。每个索引值会对应很多实际的对象。

    index只能知道索引值对应对象。

    Indices可以通过函数索引名,知道每个索引值对应的对象。

    3 store结构说明

    store可以认为只是一个父类,它只是一个接口,说明了要想实现存储,必须要实现这些函数。

    // Store is a generic object storage interface. Reflector knows how to watch a server
    // and update a store. A generic store is provided, which allows Reflector to be used
    // as a local caching system, and an LRU store, which allows Reflector to work like a
    // queue of items yet to be processed.
    //
    // Store makes no assumptions about stored object identity; it is the responsibility
    // of a Store implementation to provide a mechanism to correctly key objects and to
    // define the contract for obtaining objects by some arbitrary key type.
    type Store interface {
        Add(obj interface{}) error         //往存储增加,更新,删除元素
        Update(obj interface{}) error  
        Delete(obj interface{}) error
        List() []interface{}              
        ListKeys() []string
        Get(obj interface{}) (item interface{}, exists bool, err error)
        GetByKey(key string) (item interface{}, exists bool, err error)
    
        // Replace will delete the contents of the store, using instead the
        // given list. Store takes ownership of the list, you should not reference
        // it after calling this function.
        Replace([]interface{}, string) error
        Resync() error
    }
    

    4. cache

    4.1 cache结构说明

    cache结构体本身只有 cacheStorage + keyFunc两个元素。

    // cache responsibilities are limited to:
    // 1. Computing keys for objects via keyFunc
    //  2. Invoking methods of a ThreadSafeStorage interface
    type cache struct {
       // cacheStorage bears the burden of thread safety for the cache
       cacheStorage ThreadSafeStore
       // keyFunc is used to make the key for objects stored in and retrieved from items, and
       // should be deterministic.
       keyFunc KeyFunc
    }
    
    

    cacheStorage是真正的存储结构。

    keyFunc 就是如何通过一个 String 定位到一个对象(例如pod)

    查看k8s.io/client-go/tools/cache/store.go 中的函数定义。

    可以发现 cache即实现了 indexer的所有函数,又实现了store的所有函数。但是cache结构的所有方法都是调用了成员变量cacheStorage的方法。如下:

    // Add inserts an item into the cache.
    func (c *cache) Add(obj interface{}) error {
        key, err := c.keyFunc(obj)
        if err != nil {
            return KeyError{obj, err}
        }
        c.cacheStorage.Add(key, obj)
        return nil
    }
    

    所以ThreadSafeStore才是真正实现了 缓存+索引 功能的结构体。

    4.2 ThreadSafeStore结构说明

    ThreadSafeStore本身就是一个接口,定义了 store + indexer的所有函数。threadSafeMap是真正的实现类。

    在thread_safe_store.go文件一看就非常清楚

    k8s.io/client-go/tools/cache/thread_safe_store.go

    // threadSafeMap implements ThreadSafeStore
    type threadSafeMap struct {
        lock  sync.RWMutex
        items map[string]interface{}     //真正的存储,存储所有的元数据
    
        // indexers maps a name to an IndexFunc
        indexers Indexers
        // indices maps a name to an Index
        indices Indices
    }
    

    4.3 举例说明

    threadSafeMap的实现都非常简单。看看代码就明白了。但是结合上面对Indexer的文字描述太过于枯燥,所以这里以一个例子说明cache.indexer是如何实现 存储+索引 的。该例子来源于 k8s.io/client-go/tools/cache/index_test.go 具体如下:

    // 1. 先定义一个IndexFunc
    // testUsersIndexFunc 就是上面提到的索引函数
    // 从函数的实现可以看出来。这个就是想根据 pod Annotations中users的名字做索引
    func testUsersIndexFunc(obj interface{}) ([]string, error) {
        pod := obj.(*v1.Pod)
        usersString := pod.Annotations["users"]
    
        return strings.Split(usersString, ","), nil
    }
    
    // 2. 初始化一个NewIndexer
    // NewIndexer必须指定一个func,这个func的作用就是KeyFunc, 能用一个string代表 pod对象。这里就是MetaNamespaceKeyFunc,用ns/name来表示一个pod
    // 同时还指定一个Indexers。这个表示,当前Indexers只有一个索引函数testUsersIndexFunc,索引函数名为byUser
    index := NewIndexer(MetaNamespaceKeyFunc, Indexers{"byUser": testUsersIndexFunc})
    
    查看NewIndexer的定义可以发现,就是生成了cache结构体
    // NewIndexer returns an Indexer implemented simply with a map and a lock.
    func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
        return &cache{
            cacheStorage: NewThreadSafeStore(indexers, Indices{}),
            keyFunc:      keyFunc,
        }
    }
    
    
    // 3.定义三个pod
    // pod1 -> ernie,bert
    // pod2 -> bert,oscar
    // pod3 -> ernie,elmo
        pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
        pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
        pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}
    
    // 4.将三个pod放入pod
        index.Add(pod1)
        index.Add(pod2)
        index.Add(pod3)
        
    到这里先暂停一下,看看上面提到的IndexFunc,Index,Indexers,Indices都有哪些内容
    IndexFunc:testUsersIndexFunc
    threadSafeMap.Indexers: {
       "byUser": testUsersIndexFunc
    }
    
    threadSafeMap.Indices: {
       "byUser": {
           "ernie": ["one","tre"],
           "bert": ["one","two"],
           "oscar": ["two"],
           "elmo": ["tre"],
       }
    }
    
    Index:就是上面的Indices的一个个数据,就是byUser。因为只有一个索引函数
    "byUser": {
           "ernie": ["one","tre"],
           "bert": ["one","two"],
           "oscar": ["two"],
           "elmo": ["tre"],
       }
       
    
    threadSafeMap.items {
          "one" : pod1,
          "two" : pod2,
          "tre" : pod3
    }
    其中。pod1,pod2,pod3都是一个个pod结构的对象。
    所以可以看到 threadSafeMap 通过 items实现了存储,Indices + Indexers实现了索引
    
    // 增加一个元素,处理操作items外,还要更新Indices
    func (c *threadSafeMap) Add(key string, obj interface{}) {
        c.lock.Lock()
        defer c.lock.Unlock()
        oldObject := c.items[key]
        c.items[key] = obj
        c.updateIndices(oldObject, obj, key)
    }
    
    
    接下来再看看 threadSafeMap 是如何实现 索引的各个函数的。代码不在贴了,直接写输出。
    
    
      // indexName就是索引函数名,obj(pod)就是pod对象。
      // 该函数的功能是, 通过索引函数名,找到索引函数,在将pod作为索引函数的输入,得到所有的检索值。然后再找出来所有包含检索值的对象列表
        // Index("byuser",pod1) 会输出[pod1, pod2,pod3]
        // 原因:pod1通过byuser这个函数,检索出来有ernie,bert两个检索值
        // pod1,pod2,pod3都包含ernie,bert之一,所有都符合条件
        Index(indexName string, obj interface{}) ([]interface{}, error)
        
        
        // 该函数的功能是:通过索引函数名+索引值,得到所有的对象的名字
        // 举例:IndexKeys("byUser", "bert")的输出是: ["one","two"]
        // IndexKeys returns the set of keys that match on the named indexing function.
        IndexKeys(indexName, indexKey string) ([]string, error)
        
        
        // 该函数的功能是:根据索引函数名,得到所有的索引值
      // 举例:ListIndexFuncValues("byuser")  输出为:ernie, bert, elmo, oscar
        // ListIndexFuncValues returns the list of generated values of an Index func
        ListIndexFuncValues(indexName string) []string
        
        // 该函数的功能是:通过索引函数名+索引值,得到所有的对象
        // 举例:IndexKeys("byUser", "bert")的输出是: [pod1,pod2]
        // IndexKeys得到的是对象的名字(key)
        // ByIndex lists object that match on the named indexing function with the exact key
        ByIndex(indexName, indexKey string) ([]interface{}, error)
        
        
        // 返回所有的索引函数
        // GetIndexer return the indexers
        GetIndexers() Indexers
        
        // AddIndexers adds more indexers to this store.  If you call this after you already have data
        // in the store, the results are undefined.
        // 添加 索引函数。每个索引函数都有一个唯一的名字,那就是 indexName 
        AddIndexers(newIndexers Indexers) error
    

    4.4 Cache总结

    (1)cache提供了 存储+索引的功能,最终是通过threadSafeMap实现的

    (2)threadSafeMap中items实现了存储。indexers + Indices 实现了索引

    (3)add, del, update元素除了更新items这个map,还要更新indexers + Indices

    (4)吐槽一下,indexers,Indices,index这些名字感觉没起好,咋一看莫名其妙

    5. cache.index在informer中的应用

    以podinformer为例介绍cache这一套在informer中的应用。本节只是介绍podinformer是如何生成cache的。具体cache的更新,结合list watcher再做说明。


    k8s.io/client-go/informers/core/v1/pod.go

    (1)defaultInformer传入的是cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}

    indexers就是一个map。因为索引函数有很多,所以就需要一个名字来区分不同的索引函数。

    比如MetaNamespaceIndexFunc,就是根据对象的namespace来做索引

    func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
        return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
    }
    
    key是一个string
    const (
        NamespaceIndex string = "namespace"
    )
    
    type IndexFunc func(obj interface{}) ([]string, error)
    // MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
    func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
        meta, err := meta.Accessor(obj)
        if err != nil {
            return []string{""}, fmt.Errorf("object has no meta: %v", err)
        }
        return []string{meta.GetNamespace()}, nil
    }
    

    (2)cache.Indexers是一个参数,传入到了SharedIndexInformer的实例化

    func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
        return cache.NewSharedIndexInformer(
            &cache.ListWatch{
                ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                    if tweakListOptions != nil {
                        tweakListOptions(&options)
                    }
                    return client.CoreV1().Pods(namespace).List(options)   //直接调用apiserver的list接口
                },
                WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                    if tweakListOptions != nil {
                        tweakListOptions(&options)
                    }
                    return client.CoreV1().Pods(namespace).Watch(options)  // 直接调用apiserver的watch接口
                },
            },
            &corev1.Pod{},   //说明是pod对象
            resyncPeriod,
            indexers,    //指定indexer
        )
    }
    

    从这里可以看出来,cache只管做缓存+索引。数据来源都定义好了,不用管。

    (3) 实例化时调用了 NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)

    func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
        realClock := &clock.RealClock{}
        sharedIndexInformer := &sharedIndexInformer{
            processor:                       &sharedProcessor{clock: realClock},
            indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
            listerWatcher:                   lw,
            objectType:                      objType,
            resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
            defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
            cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
            clock: realClock,
        }
        return sharedIndexInformer
    }
    
    
    // NewIndexer returns an Indexer implemented simply with a map and a lock.
    func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
        return &cache{
            cacheStorage: NewThreadSafeStore(indexers, Indices{}),
            keyFunc:      keyFunc,
        }
    }
    
    DeletionHandlingMetaNamespaceKeyFunc 最终调用了 MetaNamespaceKeyFunc
    所以 ns/podname 就能代表一个 Pod实例
    // MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
    // keys for API objects which implement meta.Interface.
    // The key uses the format <namespace>/<name> unless <namespace> is empty, then
    // it's just <name>.
    //
    // TODO: replace key-as-string with a key-as-struct so that this
    // packing/unpacking won't be necessary.
    func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
        if key, ok := obj.(ExplicitKey); ok {
            return string(key), nil
        }
        meta, err := meta.Accessor(obj)
        if err != nil {
            return "", fmt.Errorf("object has no meta: %v", err)
        }
        if len(meta.GetNamespace()) > 0 {
            return meta.GetNamespace() + "/" + meta.GetName(), nil
        }
        return meta.GetName(), nil
    }
    

    (4)informer.indexer 最终就是一个熟悉的cache结构体

    // 
    // cache responsibilities are limited to:
    //  1. Computing keys for objects via keyFunc
    //  2. Invoking methods of a ThreadSafeStorage interface
    type cache struct {
        // cacheStorage bears the burden of thread safety for the cache
        cacheStorage ThreadSafeStore
        // keyFunc is used to make the key for objects stored in and retrieved from items, and
        // should be deterministic.
        keyFunc KeyFunc
    }
    

    总结:

    到这里就可以看出来一个Informer是如何定义本地存储+索引的。至于整个系统如何运转,看后面的Informer分析。

    相关文章

      网友评论

        本文标题:informer机制之cache.indexer机制

        本文链接:https://www.haomeiwen.com/subject/aqtsgrtx.html