美文网首页Golang
gcache 源码学习

gcache 源码学习

作者: 0xE8551CCB | 来源:发表于2019-12-04 20:43 被阅读0次

    引言

    在 Web 请求中,后端可以引入内存缓存来改善接口的响应速度,方法就是对部分热点数据增加本地缓存。例如,我们经常会获取一个课程的详情页数据,对于其中基本不怎么变化的部分可以缓存到本地内存中,这样可以避免频繁回源(数据库、Redis 或者 RPC 调用)而造成额外的性能开销。当然,缓存的使用有利有弊,需要根据情况进行权衡。比如可能存在短时间的不一致性,或者容器内存使用增加等。

    不过,我们一般只会对一些请求量较大、且明显适合使用本地缓存来改善的接口应用本地缓存。在使用这种缓存时,通常需要设置过期时间和最大容量,避免过渡占用内存的问题。所以带失效时间的 LRU Cache 就非常适合。

    gocache 正好提供了我们期望的功能,整体来说,接口设计比较优雅,提供了灵活的 Hook 机制,对于一般业务来说已经很实用了。它的基本特点如下:

    1. 提供了 LRU, LFU 和 ARC 三种缓存失效策略;

    2. 支持基于过期时间的失效策略;

    3. 考虑了并发访问失效 key 并回源的场景(可能引起击穿问题),采用单一 goroutine 执行回调函数,避免并发回源造成下游请求压力过大的问题;

    4. 提供了非常丰富的 Hook,如:

      • LoaderFunc
      • LoaderWithExpireFunc
      • EvictedFunc
      • AddedFunc
      • PurgeVisitorFunc
    5. 最最重要的是,goroutine safe。

    本文对 gocache 核心源码进行了分析,重点是学习它提供的几种 Cache 的实现思想,正所谓知其然,还要知其所以然~

    保证单个 goroutine 执行回源函数

    由于提供了 LoaderFunc,那么在 key 失效的情况下,假设存在 100 个并发的 goroutine 来访问的时候,就会存在一个问题,也就是它们可能会同时调用 LoaderFunc,从而对下游服务造成较大的请求压力。那么如何避免呢?如何保证只有一个真正干活的 goroutine 呢?其它 goroutine 是继续等待呢,还是直接返回 key 不存在的错误呢?

    我们来看看源码库中的 singleflight.go 文件中提供的算法。整体描述如下:

    1. 将调用抽象为 call 结构,并且只会有一个 goroutine 能够成功创建这个 call 对象;
    2. 使用 map 维护每个 key 对应的 call 对象,如果 key 对应的 call 对象存在,则表明当前 key 对应的回源回调已经在执行了,其它的 goroutine 则可以选择等待,或者认为 key 不存在;
    3. 当完成回源回调执行后,所有等待的 goroutine (如果有的话)会得到 key 对应的结果。
    type call struct {
        wg sync.WaitGroup
    
        // 这里存放调用结果
        val interface{}
        err error
    }
    
    // Group represents a class of work and forms a namespace in which
    // units of work can be executed with duplicate suppression.
    type Group struct {
        cache Cache
        mu sync.Mutex // protects m
        m map[interface{}]*call // 这个对象惰性初始化的
    }
    
    // Do 会执行并返回给定回源函数回调,并会保证在并发的情况下,任意时刻
    // 只有一个正在执行该函数的 worker。如果出现该 key 对应的 fn 并发重复调用的
    // 情况,则该 worker 会直接等待,直到获取到最终的结果;或者也可以直接被告知
    // key 不存在。
    func (g *Group) Do(key interface{}, fn func() (interface{}, error), isWait bool) (interface{}, bool, error) {
        // 首先尝试看有没有缓存的结果了,如果有的话,就直接返回了
        g.mu.Lock()
        v, err := g.cache.get(key, true)
        if err == nil {
            g.mu.Unlock()
            return v, false, nil
        }
    
        if g.m == nil {
            // 这里执行惰性初始化
            g.m = make(map[interface{}]*call)
        }
    
        if c, ok := g.m[key]; ok {
            // 此时表明已经有其它 goroutine 在执行了
            g.mu.Unlock()
            if !isWait {
                // 可以不等待,从而被告知 key 不存在
                return nil, false, KeyNotFoundError
            }
            // 阻塞等待被唤醒,当 c.wg.Done 的时候,会得到最终的结果
            c.wg.Wait()
            return c.val, false, c.err
        }
        // 只有一个 worker 有权利创建 call 对象,代表一次调用
        c := new(call)
        c.wg.Add(1)
        g.m[key] = c
        g.mu.Unlock()
        if !isWait {
            // 对于当前 worker 而言,如果是异步的回源,则同样是直接被告知 key
            // 不存在
            go g.call(c, key, fn)
            return nil, false, KeyNotFoundError
        }
    
        // 否则同步等待最终执行结果
        v, err = g.call(c, key, fn)
        return v, true, err
    }
    
    func (g *Group) call(c *call, key interface{}, fn func() (interface{}, error)) (interface{}, error) {
        c.val, c.err = fn()
        c.wg.Done()
        g.mu.Lock()
        delete(g.m, key)
        g.mu.Unlock()
        return c.val, c.err
    }
    

    缓存实现源码

    Simple Cache

    type SimpleCache struct {
        baseCache
        items map[interface{}]*simpleItem
    }
    
    type simpleItem struct {
        clock Clock
        value interface{}
        expiration *time.Time
    }
    

    这是一个简单的基于 map 实现的内存缓存,基本上来说几乎是无限大小的,限定于实际的内存。它的 evict 策略也很简单,如果 key 指定了过期时间的的话,那么会在如下两种条件下进行 evict:

    1. 当存储的 key 的个数超出限定大小时,执行 c.evict(1)
    2. 在执行查询某个 key 对应的 value 时,也会检查过期时间,如果过期,则执行 remove()

    总结来说,实现的比较简单,满足基本的使用场景,对于一般性能要求不高的简单场景是绝对够用了。但可能的改进点还是有的:

    1. GC 的问题可能比较严重,尤其是如果字典中存放的 key 特别多的时候,GC Scan 自然会很耗时;
    2. 对于内存的感知能力较弱,如果有一个硬限制就好了,至少保证别把内存耗尽(这个自然是 key-value 特别的多的情况下才需要考虑,不过结合过期时间来做缓存的话,实际中应该是不会太担心内存耗尽的问题)。

    LRU Cache

    关于 LRU 实现思想其实很简单,只需要一个链表维护和访问活跃度有关的 entry 顺序,再使用一个字典存放 key 对应的 entry 映射,从而实现快速查找。

    LRU 存在的问题:针对循环出现的数字,缓存利用率不高。比如缓存空间为 3,访问列表 1,1,1,2,2,2,3,4,1,1,1,2,2,2, ... ,则当访问到 4 的时候,前面重复出现的 1 和 2 都被剔除了。

    看看这个库的实现源码吧,为了方便学习其核心思想,这里移除了一些不太要紧的代码:

    type LRUCache struct {
        baseCache
        items map[interface{}]*list.Element
        evictList *list.List
    }
    
    type lruItem struct {
        clock Clock
        key interface{}
        value interface{}
        expiration *time.Time
    }
    
    func (c *LRUCache) set(key, value interface{}) (interface{}, error) {
        var err error
    
        var item *lruItem
        // 检查 key 是否已经缓存了
        if it, ok := c.items[key]; ok {
            // 如果已经存在了,则直接更新该 key 对应的 value
            // 同时将该 key 移到链表头部
            c.evictList.MoveToFront(it)
            item = it.Value.(*lruItem)
            item.value = value
        } else {
            // 如果链表长度超过限制,则需要移除一个节点
            if c.evictList.Len() >= c.size {
                c.evict(1)
            }
            item = &lruItem{
                clock: c.clock,
                key: key,
                value: value,
            }
            c.items[key] = c.evictList.PushFront(item)
        }
        if c.expiration != nil {
            t := c.clock.Now().Add(*c.expiration)
            item.expiration = &t
        }
        if c.addedFunc != nil {
            c.addedFunc(key, value)
        }
        return item, nil
    }
    
    func (c *LRUCache) getValue(key interface{}, onLoad bool) (interface{}, error) {
        c.mu.Lock()
        item, ok := c.items[key]
        if ok {
            it := item.Value.(*lruItem)
            if !it.IsExpired(nil) {
                // 如果 cache hits,直接将该 key 对应的 entry 
                // 移动到链表头部
                c.evictList.MoveToFront(item)
                v := it.value
                c.mu.Unlock()
                return v, nil
            }
            c.removeElement(item)
        }
        c.mu.Unlock()
        return nil, KeyNotFoundError
    }
    

    LFU Cache

    LFU 的全称就是 Least Frequently Used,也就是基于该策略的 Cache 会将访问频次低的数据剔除掉。所以它在实现的时候,需要记录每个 Key 访问的频率,并根据频率排序,在需要剔除的时候,找到访问频率最低的条目剔除即可。

    LFU 存在的问题是,对于交替出现的请求,缓存利用率较低。比如请求组合:1,1,1,2,2,3,4,3,4,3,4,3,4,3, 4,3,4

    接下来看看该库中是如何实现 LFU Cache 的。首先是它的数据结构定义:

    type LFUCache struct {
        baseCache
        // 这里使用一个 map 记录 key ->  item 的映射
        items map[interface{}]*lfuItem
        // 链表中维护的是基于访问频次排列的 Entry
        freqList *list.List // list for freqEntry
    }
    
    type freqEntry struct {
        // freq 记录访问的频率
        freq uint 
        // 相同访问频次下,关联的 items 集合
        items map[*lfuItem]struct{}
    }
    
    type lfuItem struct {
        clock Clock
        key interface{}
        value interface{}
        // 关联最长访问的链表节点,即 freqEntry
        freqElement *list.Element
        expiration *time.Time
    }
    

    示例图如下:


    image

    然后来看看核心操作的源码:

    func (c *LFUCache) init() {
        c.freqList = list.New()
        c.items = make(map[interface{}]*lfuItem, c.size+1)
        // 链表中是按照访问频次对 Entry 排序的
        // Entry(freq 0) -> Entry(freq 1) -> Entry(freq 2) -> ...
        c.freqList.PushFront(&freqEntry{
            freq: 0,
            items: make(map[*lfuItem]struct{}),
        })
    }
    
    func (c *LFUCache) set(key, value interface{}) (interface{}, error) {
        var err error
        item, ok := c.items[key]
        if ok {
            // key 命中时,只需要更新 value 即可
            // 这里不像 LRU,在 set 时将数据调整到链表头部
            item.value = value
        } else {
            if len(c.items) >= c.size {
                // 确保有足够空间容纳新的 key-value
                c.evict(1)
            }
            item = &lfuItem{
                clock: c.clock,
                key: key,
                value: value,
                freqElement: nil,
            }
            // 链表首节点是访问频次为 0 的 items 集合
            el := c.freqList.Front()
            fe := el.Value.(*freqEntry)
            // 把新建的 item 放到集合中,同时更新 item 关联的
            // Entry 指针
            fe.items[item] = struct{}{}
            item.freqElement = el
            c.items[key] = item
        }
        if c.expiration != nil {
            t := c.clock.Now().Add(*c.expiration)
            item.expiration = &t
        }
        return item, nil
    }
    
    func (c *LFUCache) getValue(key interface{}, onLoad bool) (interface{}, error) {
        c.mu.Lock()
        item, ok := c.items[key]
        if ok {
            if !item.IsExpired(nil) {
                // 更新 item 的访问频率
                c.increment(item)
                v := item.value
                c.mu.Unlock()
                if !onLoad {
                    c.stats.IncrHitCount()
                }
                return v, nil
            }
            c.removeItem(item)
        }
        c.mu.Unlock()
        return nil, KeyNotFoundError
    }
    
    // increment 实际上是将 item 访问频率 +1。
    // 它的实际操作如下:
    // 1. 获得 item 在链表上的 Entry,该 Entry 记录了 item 此前的访问频率
    // 2. 计算得到新的访问频率(+1)
    // 3. 将 item 从原先的 Entry 关联的 items 集合中移除
    // 4. 将 item 加入到下一个(在旧 Entry 之后)Entry 关联的 items 集合中
    func (c *LFUCache) increment(item *lfuItem) {
        currentFreqElement := item.freqElement
        currentFreqEntry := currentFreqElement.Value.(*freqEntry)
        nextFreq := currentFreqEntry.freq + 1
        delete(currentFreqEntry.items, item)
        nextFreqElement := currentFreqElement.Next()
        if nextFreqElement == nil {
            nextFreqElement = c.freqList.InsertAfter(&freqEntry{
                freq: nextFreq,
                items: make(map[*lfuItem]struct{}),
            }, currentFreqElement)
        }
        nextFreqElement.Value.(*freqEntry).items[item] = struct{}{}
        item.freqElement = nextFreqElement
    }
    
    func (c *LFUCache) removeItem(item *lfuItem) {
        // 从大字典中删除 key->item 映射关系
        delete(c.items, item.key)
        // 从维护访问频率的链表对应的节点的关联 items 集合移除当前的 item
        delete(item.freqElement.Value.(*freqEntry).items, item)
    }
    

    Adaptive Replacement Cache, ARC

    ARC 算法是由 IBM 所属的实验室开发出来的一种置换算法(甚至还为它申请了相关专利),它的性能要比传统的 LRU 算法更好。它主要的改进点就是自适应的特性,具体来说,它将管理缓存元素的 LRU 列表分成两组 T1 和 T2,其中 T1 存放的是最近访问的元素,而 T2 存放的是最近频繁访问(一次及以上)的元素。假设我们的缓存容器的容量为 M,则 T1 和 T2 的总大小最多也是 M。ARC 引入了另外两组 LRU 列表用来存放由 T1 和 T2 剔除的元素元信息(实际指向的数据可能已经被释放,这样也不必占用过多内存),即 B1 和 B2,它们被称为 ghost 列表。算法会根据元素访问在 B1 或者 B2 的命中情况来调整 T1 和 T2 容量,在下面介绍的源码中使用了 part 来表示 T1 需要占用的容量,剩下 T2 容量的自然是 M-part

    image

    具体策略

    image

    我们以维基百科上的图为例,了解下在添加和置换时,使用了什么策略。在上图中,! 表示 L1(B1+T1) 和 L2(T2+B2) 实际分隔的位置;^ 表示 T1 期望扩充或者缩小到的位置。

    元素访问、添加策略

    1. 当新的元素加入时,会先进入 T1,也就是 ! 左边。随着新元素的增加,之前加入的元素会逐渐移到最左边,然后被剔除到 B1,最终被从 L1 中移除;
    2. 任意在 L1 中的元素被访问超过一次,都有机会移动到 T2 中,也就是 ! 右边。随着访问时间推移,在 T2 中的元素逐渐右移,然后进入 B2,最终从 L2 移除。

    元素置换和 T1 & T2 容量调整策略:当元素进入 (T1, T2) 缓存时,会导致 ! 移到到目标位置 ^,实现 T1 的扩容或缩容。如果没有空闲空间的话,还需要考虑从 T1 或者 T2 中剔除一个元素。

    1. 当命中 B1 时(表明不久前元素才被从 T1 中剔除,T1 容量应该再加点就好了),会导致 ^ 向右移动。T2 中最后一个元素会被剔除到 B2 中;
    2. 当命中 B2 时(同理,表明 T2 的容量还要增加点才能放更多频繁访问的元素),会导致 ^ 向左移动。T1 中最后一个元素会被剔除到 B1 中;
    3. 如果二者都不满足,则 ^ 不会收到影响,但是 ! 会更加接近于 ^

    源码实现

    一旦了解了 ARC 的基本原理,再来阅读代码就比较轻松了。由于实际的源码比较多,这里只是挑选了比较重要的,且能体现核心策略的部分源码加以注释,具体如下:

    // ARC 通过自动调整的方式,在 LRU 和 LFU 策略之间进行
    // 自动权衡(实际就是调整二者的容量,但保持总容量不变)
    type ARC struct {
        baseCache
        items map[interface{}]*arcItem
        part int
        t1 *arcList // 最近使用的元素,基于 LRU 剔除策略
        t2 *arcList // 频繁使用的元素,基于 LRU 剔除策略
        b1 *arcList // t1 关联的 ghost list,从 t1 剔除的元素会进来
        b2 *arcList // t2 关联的 ghost list,从 t2 剔除的元素会进来
    }
    
    func (c *ARC) set(key, value interface{}) (interface{}, error) {
        var err error
        item, ok := c.items[key]
        if ok {
            // 如果 key 存在的话,直接更新对应的值
            // 此时并不关心 key 是在 t1/t2 中
            item.value = value
        } else {
            // 如果 key 不存在,则新建一个并加到字典
            // 注意,此时还没有决定要将新的 item 放在 t1 还是 t2
            // 精彩操作还在后面
            item = &arcItem{
                clock: c.clock,
                key: key,
                value: value,
            }
            c.items[key] = item
        }
    
        // 这里是针对缓存命中的情况,直接返回,无需执行复杂的剔除和调整大小等操作
        if c.t1.Has(key) || c.t2.Has(key) {
            return item, nil
        }
    
        // 接下里需要决定将新的元素放在哪里?是否需要剔除元素?如果是,该从 t1/t2 剔除?
        // 是否需要根据 b1/b2 来调整 t1/t2 的大小(也就是 part 值的调整)?
        if elt := c.b1.Lookup(key); elt != nil {
            // 处理在 t1 关联的 ghost 链表 b1 中命中 key 的情况
            // 此时表明该 key 之前处于 t1 中,并且刚被移除不久,此时表明
            // t1 的大小需要再调大点,说明最存储近访问的元素空间不太够了
            // `c.b2.Len()/c.b1.Len()` 被称为 regret ratio,也就是需要增加的大小(delta)
            c.setPart(minInt(c.size, c.part+maxInt(c.b2.Len()/c.b1.Len(), 1)))
            // 看看缓存空间是否已经满了,如果是的话,才会执行复杂的剔除逻辑
            c.replace(key)
    
            // 这种情况下,表明该 key 算频繁访问了,会被移动到 t2
            // 自然以后过期会移动到 b2,所以需要在 b1 中移除该 key
            c.b1.Remove(key, elt)
            c.t2.PushFront(key)
            return item, nil
        }
    
        if elt := c.b2.Lookup(key); elt != nil {
            // 如果 key 在 b2 中命中,表明 key 曾经在 t2 存在过
            // 且是频繁访问过的,所以会放到 t2 中。同时会对 t2 的
            // 大小进行调整,delta = -b1.Len()/b2.Len()
            c.setPart(maxInt(0, c.part-maxInt(c.b1.Len()/c.b2.Len(), 1)))
            c.replace(key)
            c.b2.Remove(key, elt)
            c.t2.PushFront(key)
            return item, nil
        }
    
        // 下面的部分处理 key 从来没有存在于 b1/b2 的情况,新元素
        // 最后会先加到 t1 中。
        if c.isCacheFull() && c.t1.Len()+c.b1.Len() == c.size {
            if c.t1.Len() < c.size {
                c.b1.RemoveTail()
                c.replace(key)
            } else {
                pop := c.t1.RemoveTail()
                _, ok := c.items[pop]
                if ok {
                    delete(c.items, pop)
                }
            }
        } else {
            total := c.t1.Len() + c.b1.Len() + c.t2.Len() + c.b2.Len()
            if total >= c.size {
                if total == (2 * c.size) {
                    if c.b2.Len() > 0 {
                        c.b2.RemoveTail()
                    } else {
                        c.b1.RemoveTail()
                    }
                }
                c.replace(key)
            }
        }
        c.t1.PushFront(key)
        return item, nil
    }
    
    func (c *ARC) replace(key interface{}) {
        if !c.isCacheFull() {
            return
        }
    
        var old interface{}
        if c.t1.Len() > 0 && ((c.b2.Has(key) && c.t1.Len() == c.part) || (c.t1.Len() > c.part)) {
            // 给 t1 瘦身
            old = c.t1.RemoveTail()
            c.b1.PushFront(old)
        } else if c.t2.Len() > 0 {
            // 给 t2 瘦身
            old = c.t2.RemoveTail()
            c.b2.PushFront(old)
        } else {
            old = c.t1.RemoveTail()
            c.b1.PushFront(old)
        }
        _, ok := c.items[old]
        if ok {
            delete(c.items, old)
        }
    }
    
    // get 从缓存中查找指定的 key 队应的值,同时会调整元素到对应的
    // LRU 列表中。
    func (c *ARC) get(key interface{}) (interface{}, error) {
        // 如果在 t1 中找到了对应的元素,标记它为频繁访问的元素
        // 因此会被移动到 t2 中。
        if elt := c.t1.Lookup(key); elt != nil {
            c.t1.Remove(key, elt)
            item := c.items[key]
            c.t2.PushFront(key)
            return item.value, nil
        }
        // 如果在高频访问 LRU 缓存中找到,则依然保留在原 LRU 缓存
        // 只是需要将其移动到链表头部,表明是最近访问过的元素。
        if elt := c.t2.Lookup(key); elt != nil {
            item := c.items[key]
            c.t2.MoveToFront(elt)
            return item.value, nil
        }
        return nil, KeyNotFoundError
    }
    
    // arcList 实际上是个 LRU 缓存
    type arcList struct {
        l *list.List
        keys map[interface{}]*list.Element
    }
    
    type arcItem struct {
        clock Clock
        key interface{}
        value interface{}
        expiration *time.Time
    }
    

    参考

    1. gcache 文档
    2. 缓存淘汰算法 LRU 和 LFU
    3. Adaptive Replacement Cache
    4. Adaptive replacement cache algorithm
    5. ARC 介绍,看起来是翻译的,有点不太顺畅,但大体思想再加配图表达得比较清晰了

    声明

    相关文章

      网友评论

        本文标题:gcache 源码学习

        本文链接:https://www.haomeiwen.com/subject/uxfrgctx.html