美文网首页Docker容器虚拟化技术k8s那点事儿
[k8s源码分析][client-go] informer之st

[k8s源码分析][client-go] informer之st

作者: nicktming | 来源:发表于2019-10-19 15:53 被阅读0次

    1. 前言

    转载请说明原文出处, 尊重他人劳动成果!

    源码位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
    分支: tming-v13.0 (基于v13.0版本)

    本文将分析tools/cache包中的store. 主要会涉及到store.go, thread_safe_store.goindex.go. 这部分是整个informer其中的一环, 功能是提供本地缓存.

    2. 整体接口与实现类

    architecture.png

    Store: 接口定义了基本的方法.
    Indexer:Store的基础上添加了几个关于index的方法.
    ThreadSafeStore: 定义了一系列方法, 与Indexer中所有方法(会包括Store中的方法)的最大区别是它有key.
    threadSafeMap:ThreadSafeStore的一个实现类.
    cache:IndexerStore的一个实现类, 它会根据keyFunc生成该obj对应的一个key, 然后调用ThreadSafeStore的方法.

    2.1 Store

    // tools/cache/store.go
    type Store interface {
        Add(obj interface{}) error
        Update(obj interface{}) error
        Delete(obj interface{}) error
        List() []interface{}
        ListKeys() []string
        Get(obj interface{}) (item interface{}, exists bool, err error)
        GetByKey(key string) (item interface{}, exists bool, err error)
    
        // Replace will delete the contents of the store, using instead the
        // given list. Store takes ownership of the list, you should not reference
        // it after calling this function.
        // 1. 会删除store里面的内容
        // 2. 用传进来的list代替以前的内容
        Replace([]interface{}, string) error
        // 同步
        Resync() error
    }
    

    可以看到该Store接口中有两个方法ListKeysGetByKey方法, 是与key有关的, 也就是存储一个obj的时候是根据key来存储的, 每一个obj都有一个唯一的key. 等到回头看该key的实现类的时候在仔细说一下.

    2.2 Indexer

    // tools/cache/thread_safe_store.go
    type Indexer interface {
        Store
        // Index returns the stored objects whose set of indexed values
        // intersects the set of indexed values of the given object, for
        // the named index
        Index(indexName string, obj interface{}) ([]interface{}, error)
        // IndexKeys returns the storage keys of the stored objects whose
        // set of indexed values for the named index includes the given
        // indexed value
        IndexKeys(indexName, indexedValue string) ([]string, error)
        // ListIndexFuncValues returns all the indexed values of the given index
        ListIndexFuncValues(indexName string) []string
        // ByIndex returns the stored objects whose set of indexed values
        // for the named index includes the given indexed value
        ByIndex(indexName, indexedValue string) ([]interface{}, error)
        // GetIndexer return the indexers
        GetIndexers() Indexers
    
        // AddIndexers adds more indexers to this store.  If you call this after you already have data
        // in the store, the results are undefined.
        AddIndexers(newIndexers Indexers) error
    }
    // IndexFunc knows how to compute the set of indexed values for an object.
    type IndexFunc func(obj interface{}) ([]string, error)
    // Index maps the indexed value to a set of keys in the store that match on that value
    type Index map[string]sets.String
    
    // Indexers maps a name to a IndexFunc
    type Indexers map[string]IndexFunc
    
    // Indices maps a name to an Index
    type Indices map[string]Index
    

    说实话, 这块不太好理解, 等到它的实现类的时候可以看到这个Indexer的功能, 并且会用一个例子进行说明.

    2.3 ThreadSafeStore

    // tools/cache/thread_safe_store.go
    type ThreadSafeStore interface {
        Add(key string, obj interface{})
        Update(key string, obj interface{})
        Delete(key string)
        Get(key string) (item interface{}, exists bool)
        List() []interface{}
        ListKeys() []string
        Replace(map[string]interface{}, string)
        Index(indexName string, obj interface{}) ([]interface{}, error)
        IndexKeys(indexName, indexKey string) ([]string, error)
        ListIndexFuncValues(name string) []string
        ByIndex(indexName, indexKey string) ([]interface{}, error)
        GetIndexers() Indexers
        AddIndexers(newIndexers Indexers) error
        Resync() error
    }
    
    type threadSafeMap struct {
        lock  sync.RWMutex
        // 存储着key与obj的对应关系
        items map[string]interface{}
    
        // indexers maps a name to an IndexFunc
        // 存着indexer的名字与它对应的生成index的方法
        indexers Indexers
        // indices maps a name to an Index
        indices Indices
    }
    func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
        return &threadSafeMap{
            items:    map[string]interface{}{},
            indexers: indexers,
            indices:  indices,
        }
    }
    

    这里与Store有点区别的是ThreadSafeStoreindex无关的全部都是针对key的操作, 而index方面的操作都是与Indexer方法意义.

    另外threadSafeMapThreadSafeStore接口的实现类, 也是真正实现逻辑的核心实体.

    2.4 cache

    // tools/cache/store.go
    type KeyFunc func(obj interface{}) (string, error)
    type cache struct {
        cacheStorage ThreadSafeStore
        keyFunc KeyFunc
    }
    // 一个Store实例 没有index相关方法
    func NewStore(keyFunc KeyFunc) Store {
        return &cache{
            cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
            keyFunc:      keyFunc,
        }
    }
    // 一个带有indexer实例 有index相关方法
    func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
        return &cache{
            cacheStorage: NewThreadSafeStore(indexers, Indices{}),
            keyFunc:      keyFunc,
        }
    }
    type ExplicitKey string
    // 一般情况下都是<namespace>/<name> unless <namespace> is empty, then it's just <name>.
    func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
        if key, ok := obj.(ExplicitKey); ok {
            return string(key), nil
        }
        meta, err := meta.Accessor(obj)
        if err != nil {
            return "", fmt.Errorf("object has no meta: %v", err)
        }
        if len(meta.GetNamespace()) > 0 {
            return meta.GetNamespace() + "/" + meta.GetName(), nil
        }
        return meta.GetName(), nil
    }
    
    

    cacheIndexer接口的实现类, 那么自然也是Store接口的实现类, 可以看到cacheStorage是一个ThreadSafeStore的对象, 而ThreadSafeStore是一个根据key来操作的类, 所以cache中有一个为obj生成唯一keykeyFunc方法(比如MetaNamespaceKeyFunc), 然后就可以调用ThreadSafeStore的对应方法.

    3. 方法

    此部分将会以一个例子来贯穿整个方法的使用, 与上流调用程序打交道的是Store或者Indexer, 真正的核心实体类是threadSafeMap, 所以接下来会从上流程序的调用的角度来看其如何实现.

    3.1 生成一个Indexer实例

    func testUsersIndexFunc(obj interface{}) ([]string, error) {
        pod := obj.(*v1.Pod)
        usersString := pod.Annotations["users"]
    
        return strings.Split(usersString, ","), nil
    }
    
    func TestMultiIndexKeys(t *testing.T) {
        index := NewIndexer(MetaNamespaceKeyFunc, Indexers{"byUser": testUsersIndexFunc})
    }
    

    注意
    1. keyFuncMetaNamespaceKeyFunc方法.
    2. 一个indexer的名字byUser, 以及该byUser生成index方法.

    3.2 Add

    上流程序调用

        pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
        pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
        pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}
    
        index.Add(pod1)
        index.Add(pod2)
        index.Add(pod3)
    

    接下来看一下它的逻辑, 分析以pod1为例.

    // tools/cache/store.go
    func (c *cache) Add(obj interface{}) error {
    // 根据它的keyFunc生成该obj的key
        key, err := c.keyFunc(obj)
        if err != nil {
            return KeyError{obj, err}
        }
    // 会调用threadSafeMap的Add方法
        c.cacheStorage.Add(key, obj)
        return nil
    }
    

    1. 根据MetaNamespaceKeyFunc生成key. (pod1生成的keyone).
    2. 调用threadSafeMapAdd方法. (c.cacheStorage.Add("one", pod1))

    func (c *threadSafeMap) Add(key string, obj interface{}) {
        c.lock.Lock()
        defer c.lock.Unlock()
        oldObject := c.items[key]
        c.items[key] = obj
        c.updateIndices(oldObject, obj, key)
    }
    

    1. 因为以前该key可能存在, 取出oldObject, 不存在则为nil. (oldObject=nil)
    2. 将对应的keyobj存储到一个map结构(item)中.(c.item["one"] = pod1)
    2. 调用updateIndices方法.

    func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
        // if we got an old object, we need to remove it before we add it again
        // 如果以前存在 则删除
        if oldObj != nil {
            c.deleteFromIndices(oldObj, key)
        }
        // 遍历所有的indexers 
        for name, indexFunc := range c.indexers {
            // 根据indexFunc生成该对象newObj的键
            indexValues, err := indexFunc(newObj)
            if err != nil {
                panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
            }
            // 取出当前indexer的结构 是一个map对象
            index := c.indices[name]
            // 如果不存在 则创建一个新的
            if index == nil {
                index = Index{}
                c.indices[name] = index
            }
    
            // 遍历刚刚生成的键
            for _, indexValue := range indexValues {
                set := index[indexValue]
                if set == nil {
                    set = sets.String{}
                    index[indexValue] = set
                }
                set.Insert(key)
            }
        }
    }
    

    不多说, 直接用pod1来说明吧.

    c.indexers = {"byUser": testUsersIndexFunc}
    // 所以只有一次循环
    name = "byUser"
    indexFunc = testUsersIndexFunc
    ===> indexValues = ["ernie", "bert"]
    ===> indices["byUser"] = {}
    ======> indices["byUser"]["ernie"] = [{"one": Empty}]
    ======> indices["byUser"]["bert"] = [{"one": Empty}]
    

    最终加入pod1, pod2pod3的结果如下:

    res1.png

    3.3 查询方法

    理解了Add方法, 接下来看一下几个查询方法, 有了上面的基础, 查询的话基本上对照着图看就差不多可以得到答案了.

    3.3.1 ByIndex
    // 上流程序调用
    index.ByIndex("byUser", "ernie")
    
    // tools/cache/store.go
    func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
        return c.cacheStorage.ByIndex(indexName, indexKey)
    }
    // tools/cache/thread_safe_store.go
    func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
        c.lock.RLock()
        defer c.lock.RUnlock()
    
        indexFunc := c.indexers[indexName]
        if indexFunc == nil {
            return nil, fmt.Errorf("Index with name %s does not exist", indexName)
        }
        index := c.indices[indexName]
        set := index[indexKey]
        list := make([]interface{}, 0, set.Len())
        for key := range set {
            list = append(list, c.items[key])
        }
        return list, nil
    }
    

    可以看到其实就是取indices["byUser"]["ernie"], 所以返回值就是["one", "tre"]

    ListIndexFuncValues
    // 上流程序调用
    index.ListIndexFuncValues("byUser")
    
    // tools/cache/store.go
    func (c *cache) ListIndexFuncValues(indexName string) []string {
        return c.cacheStorage.ListIndexFuncValues(indexName)
    }
    
    // tools/cache/thread_safe_store.go
    func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
        c.lock.RLock()
        defer c.lock.RUnlock()
        index := c.indices[indexName]
        names := make([]string, 0, len(index))
        for key := range index {
            names = append(names, key)
        }
        return names
    }
    

    返回某个indexName生成的所有键. 相当于indices["byUser"].keySet(), 所以返回值将会是["ernie", "bert", "elmo", "oscar"].

    List() 和 Get(obj interface{})
    // 上流程序调用
    index.List()
    index.Get("pod1")
    
    // tools/cache/store.go
    func (c *cache) List() []interface{} {
        return c.cacheStorage.List()
    }
    func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
        key, err := c.keyFunc(obj)
        if err != nil {
            return nil, false, KeyError{obj, err}
        }
        return c.GetByKey(key)
    }
    func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
        item, exists = c.cacheStorage.Get(key)
        return item, exists, nil
    }
    
    // tools/cache/thread_safe_store.go
    func (c *threadSafeMap) List() []interface{} {
        c.lock.RLock()
        defer c.lock.RUnlock()
        list := make([]interface{}, 0, len(c.items))
        for _, item := range c.items {
            list = append(list, item)
        }
        return list
    }
    func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
        c.lock.RLock()
        defer c.lock.RUnlock()
        item, exists = c.items[key]
        return item, exists
    }
    

    很明显List()Get方法是对items的操作.
    所以List()返回[pod1, pod2, pod3], Get方法返回pod1.

    Delete

    有了上面的基础, 这些操作无非都是在维护indicesitems这两个数据结构, 所以可想而知, 删除操作就是从这两个数据结构中删除某个obj带来的数据.

    // 上流程序调用
    index.Delete(pod3)
    
    // tools/cache/store.go
    func (c *cache) Delete(obj interface{}) error {
        key, err := c.keyFunc(obj)
        if err != nil {
            return KeyError{obj, err}
        }
        c.cacheStorage.Delete(key)
        return nil
    }
    
    // tools/cache/thread_safe_store.go
    func (c *threadSafeMap) Delete(key string) {
        c.lock.Lock()
        defer c.lock.Unlock()
        if obj, exists := c.items[key]; exists {
            c.deleteFromIndices(obj, key)
            delete(c.items, key)
        }
    }
    func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
        for name, indexFunc := range c.indexers {
            indexValues, err := indexFunc(obj)
            if err != nil {
                panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
            }
    
            index := c.indices[name]
            if index == nil {
                continue
            }
            for _, indexValue := range indexValues {
                set := index[indexValue]
                if set != nil {
                    set.Delete(key)
                }
            }
        }
    }
    

    其实也没有什么, 就是怎么增加的就怎么删除就行了.

    del.png

    删除完的结果如下:

    res2.png
    update
    // tools/cache/store.go
    func (c *cache) Update(obj interface{}) error {
        key, err := c.keyFunc(obj)
        if err != nil {
            return KeyError{obj, err}
        }
        c.cacheStorage.Update(key, obj)
        return nil
    }
    // tools/cache/thread_safe_store.go
    func (c *threadSafeMap) Update(key string, obj interface{}) {
        c.lock.Lock()
        defer c.lock.Unlock()
        oldObject := c.items[key]
        c.items[key] = obj
        c.updateIndices(oldObject, obj, key)
    }
    

    可以看到updateAdd方法是一模一样的, 因为Add方法是先删除旧的, 然后再添加新的.

    resync
    // tools/cache/store.go
    func (c *cache) Resync() error {
        return c.cacheStorage.Resync()
    }
    // tools/cache/thread_safe_store.go
    func (c *threadSafeMap) Resync() error {
        // Nothing to do
        return nil
    }
    

    该方法在这里没有任何实现, 在一些子类中会有具体的实现. 比如FIFO, DeltaFIFO等等.

    Replace
    // tools/cache/store.go
    func (c *cache) Replace(list []interface{}, resourceVersion string) error {
        items := make(map[string]interface{}, len(list))
        for _, item := range list {
            key, err := c.keyFunc(item)
            if err != nil {
                return KeyError{item, err}
            }
            items[key] = item
        }
        c.cacheStorage.Replace(items, resourceVersion)
        return nil
    }
    
    // tools/cache/thread_safe_store.go
    func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
        c.lock.Lock()
        defer c.lock.Unlock()
        // 更新items
        c.items = items
    
        // rebuild any index
        // 重新构建indices
        c.indices = Indices{}
        for key, item := range c.items {
            c.updateIndices(nil, item, key)
        }
    }
    

    简单一点理解就是把之前items, indices存的数据全部删除, 然后将list里面的内容一个个添加进去.

    informer整体

    整个informer体系在k8s代码中占有重要一环, 理解informer可以更好理解k8s的工作机制.

    informer.png

    1. [k8s源码分析][client-go] informer之store和index
    2. [k8s源码分析][client-go] informer之delta_fifo
    3. [k8s源码分析][client-go] informer之reflector
    4. [k8s源码分析][client-go] informer之controller和shared_informer(1)
    5. [k8s源码分析][client-go] informer之controller和shared_informer(2)
    6. [k8s源码分析][client-go] informer之SharedInformerFactory

    相关文章

      网友评论

        本文标题:[k8s源码分析][client-go] informer之st

        本文链接:https://www.haomeiwen.com/subject/tbigmctx.html