美文网首页
【源码】FreeCache - Go的缓存库,具有零GC开销

【源码】FreeCache - Go的缓存库,具有零GC开销

作者: iceinto | 来源:发表于2019-02-28 21:35 被阅读0次

    FreeCache

    FreeCache 是一个 Go 语言的缓存库,无额外的 GC 负荷。数百万对象的垃圾收集延迟仅在数百毫秒。您可以在内存中缓存无限数量的对象,而不会增加延迟和降低吞吐量。

    特征

    • 可存储数以百万计条目
    • 零垃圾收集负荷
    • 高并发而且线程安全的访问
    • 纯 Go 语言实现
    • 支持对象失效
    • 近乎 LRU 的算法
    • 严格限制内存使用
    • 提供一个测试用的服务器,支持一些基本 Redis 命令

    案例

    cacheSize := 1024*1024
    cache := freecache.NewCache(cacheSize)
    key := []byte("abc")
    val := []byte("def")
    expire := 60 // expire in 60 seconds
    cache.Set(key, val, expire)
    got, err := cache.Get(key)
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(string(got))
    }
    affected := cache.Del(key)
    fmt.Println("deleted key ", affected)
    fmt.Println("entry count ", cache.EntryCount())
    

    注意事项

    • 内存是预先分配的
    • 如果你分配的内存非常大,那么应该设置 debug.SetGCPercent() 到一个很小的比例来获得正常的 GC 频率

    如何做到的

    FreeCache通过减少指针数来避免GC开销。 无论存储多少个条目,只有512个指针。 数据集通过密钥的哈希值分片为256个段。 每个段只有两个指针,一个是存储键和值的环形缓冲区,另一个是用于查找条目的索引片。 每个段都有自己的锁,因此它支持高并发访问。

    案例解释

    // 设置开始内存大小
    cacheSize := 1024*1024
    // NewCache按大小返回一个新初始化缓存。
    // 缓存大小至少设置为512KB。
    // 如果大小设置得比较大,你应该打电话
    // `debug.SetGCPercent()`,将其设置为更小的值
    // 限制内存消耗和GC暂停时间。
    cache := freecache.NewCache(cacheSize)
    
    key := []byte("abc")
    val := []byte("def")
    expire := 60 // expire in 60 seconds
    
    // Set设置缓存条目的key,值和过期,并将其存储在缓存中。
    // 如果密钥大于65535或者值大于缓存大小的1/1024,
    // 该条目不会写入缓存。 expireSeconds <= 0表示没有过期,
    // 但是当缓存已满时可以将其逐出。
    cache.Set(key, val, expire)
    // 获取返回值或未找到错误。
    got, err := cache.Get(key)
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(string(got))
    }
    
    affected := cache.Del(key)
    fmt.Println("deleted key ", affected)
    // EntryCount返回当前缓存中的项目数。
    fmt.Println("entry count ", cache.EntryCount())
    

    源码分析

    依赖说明

    本项目主要依赖项目:xxhash 具体使用参考 xxhash 源码分析

    文件结构

    cache.go
    cache_test.go
    segment.go
    ringbuf.go
    ringbuf_test.go
    iterator.go
    

    具体代码

    从 cache.go 开始,主要是对外方法

    声明的常量

    const (
        // segmentCount表示freecache实例中的段数
        segmentCount    = 256
        // segmentAndOpVal按位并应用于hashVal以查找段ID
        segmentAndOpVal = 255
        // 最小声明内存大小
        minBufSize      = 512 * 1024
    )
    

    Cache 结构体

    type Cache struct {
        locks    [segmentCount]sync.Mutex
        segments [segmentCount]segment
    }
    
    // segment.go 文件中 segment 结构体
    type segment struct 
    

    xxhash Sum64 计算

    func hashFunc(data []byte) uint64 {
        return xxhash.Sum64(data)
    }
    

    初始化一个内存分配

    func NewCache(size int) (cache *Cache) {
        if size < minBufSize { // 判断初始化空间是不是小于最小化设置
            size = minBufSize
        }
        cache = new(Cache)
        for i := 0; i < segmentCount; i++ { // 循环分配内存变量
            cache.segments[i] = newSegment(size/segmentCount, i)
        }
        return
    }
    

    设置一个键值,里面还有其他的设置方法例如:

    • func (cache *Cache) SetInt(key int64, value []byte, expireSeconds int) (err error)
    func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
        hashVal := hashFunc(key) // 调用key 的 hash 生成
        segID := hashVal & segmentAndOpVal
        cache.locks[segID].Lock()
        err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
        cache.locks[segID].Unlock()
        return
    }
    

    获取一个键值,里面还有其他的方法例如:

    • func (cache *Cache) GetInt(key int64) (value []byte, err error)
    func (cache *Cache) Get(key []byte) (value []byte, err error) {
        hashVal := hashFunc(key)
        segID := hashVal & segmentAndOpVal
        cache.locks[segID].Lock()
        value, _, err = cache.segments[segID].get(key, hashVal)
        cache.locks[segID].Unlock()
        return
    }
    

    其他可能会用到的方法

    • func (cache *Cache) HitCount() (count int64) HitCount是一个指标值,用于返回在缓存中找到密钥的次数
    • func (cache *Cache) MissCount() (count int64) MissCount是一个指标值,用于返回缓存中未命中的次数
    • func (cache *Cache) LookupCount() int64 LookupCount是一个度量值,它返回给定键的查找发生的次数
    • func (cache *Cache) HitRate() float64 HitRate是命中率与查找率之比
    • func (cache *Cache) OverwriteCount() (overwriteCount int64)

    有啦上面的一些方法我们就可以比较容易的基于一些监控提交做监控报表,例如:写一个 Prometheus exporter 就可以做监控啦

    下面看看核心存储如何做的,来看 segment.go

    const HASH_ENTRY_SIZE = 16  // 没有看到哪里有用到?
    const ENTRY_HDR_SIZE = 24   // byte 什么默认大小
    
    // 环形缓冲区中的条目头结构,后跟键和值。
    type entryHdr struct {
        accessTime uint32
        expireAt   uint32
        keyLen     uint16
        hash16     uint16
        valLen     uint32
        valCap     uint32
        deleted    bool
        slotId     uint8
        reserved   uint16
    }
    

    segment 结构体说明

    // 一个段包含256个槽,一个槽是由hash16值排序的入口指针数组
    // 可以通过密钥的哈希值查找条目。
    type segment struct {
        rb            RingBuf // ring buffer that stores data
        segId         int
        _             uint32
        missCount     int64
        hitCount      int64
        entryCount    int64
        totalCount    int64      // 环缓冲区中的条目数,包括已删除
        totalTime     int64      // 用于计算最近最少使用的条目.
        totalEvacuate int64      // used for debug
        totalExpired  int64      // used for debug
        overwrites    int64      // used for debug
        vacuumLen     int64      // 直到 vacuumLen,可以写入新数据而不会覆盖旧数据
        slotLens      [256]int32 // 每个插槽的实际长度
        slotCap       int32      // 插槽可以容纳的最大入口指针数
        slotsData     []entryPtr // 所有256个插槽
    }
    

    newSegment 初始化声明内存空间

    func newSegment(bufSize int, segId int) (seg segment) {
        seg.rb = NewRingBuf(bufSize, 0)
        seg.segId = segId
        seg.vacuumLen = int64(bufSize)
        seg.slotCap = 1
        seg.slotsData = make([]entryPtr, 256*seg.slotCap)
        return
    }
    

    顺着这个进入 NewRingBuf

    // 当数据超过时,环形缓冲区具有固定大小
    // 大小,旧数据将被新数据覆盖。
    // 它只包含从开始到结束的流中的数据
    type RingBuf struct {
        begin int64 // beginning offset of the data stream.
        end   int64 // ending offset of the data stream.
        data  []byte
        index int //range from '0' to 'len(rb.data)-1'
    }
    // 声明 RingBuf size 为之前传递的大小(size/segmentCount)或最小值
    func NewRingBuf(size int, begin int64) (rb RingBuf) {
        rb.data = make([]byte, size) // 声明一个空值
        rb.begin = begin
        rb.end = begin
        rb.index = 0
        return
    }
    

    声明完成后顺着这个顺序进入 segment:Set 和 Get 方法

    func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
        if len(key) > 65535 { // 对大 key 限制
            return ErrLargeKey
        }
        maxKeyValLen := len(seg.rb.data)/4 - ENTRY_HDR_SIZE
        if len(key)+len(value) > maxKeyValLen { // 限制值得大小
            // Do not accept large entry.
            return ErrLargeEntry
        }
        now := uint32(time.Now().Unix()) // 获取当前时间戳
        expireAt := uint32(0)
        if expireSeconds > 0 {
            expireAt = now + uint32(expireSeconds)
        }
    
        slotId := uint8(hashVal >> 8)
        hash16 := uint16(hashVal >> 16)
    
        var hdrBuf [ENTRY_HDR_SIZE]byte // 声明默认大小
        hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
    
        slotOff := int32(slotId) * seg.slotCap
        slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] // 获取槽的位置
        idx, match := seg.lookup(slot, hash16, key)
        if match {
            matchedPtr := &slot[idx]
            seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
            hdr.slotId = slotId
            hdr.hash16 = hash16
            hdr.keyLen = uint16(len(key))
            originAccessTime := hdr.accessTime
            hdr.accessTime = now
            hdr.expireAt = expireAt
            hdr.valLen = uint32(len(value))
            if hdr.valCap >= hdr.valLen {
                //in place overwrite
                atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
                seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
                seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
                atomic.AddInt64(&seg.overwrites, 1)
                return
            }
            // avoid unnecessary memory copy.
            seg.delEntryPtr(slotId, hash16, slot[idx].offset)
            //seg.delEntryPtr(slotId, hash16, seg.slotsData[idx].offset)
            match = false
            // increase capacity and limit entry len.
            for hdr.valCap < hdr.valLen {
                hdr.valCap *= 2
            }
            if hdr.valCap > uint32(maxKeyValLen-len(key)) {
                hdr.valCap = uint32(maxKeyValLen - len(key))
            }
        } else {
            hdr.slotId = slotId
            hdr.hash16 = hash16
            hdr.keyLen = uint16(len(key))
            hdr.accessTime = now
            hdr.expireAt = expireAt
            hdr.valLen = uint32(len(value))
            hdr.valCap = uint32(len(value))
            if hdr.valCap == 0 { // avoid infinite loop when increasing capacity.
                hdr.valCap = 1
            }
        }
    
        entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap)
        slotModified := seg.evacuate(entryLen, slotId, now)
        if slotModified {
            // the slot has been modified during evacuation, we need to looked up for the 'idx' again.
            // otherwise there would be index out of bound error.
            slot = seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
            idx, match = seg.lookup(slot, hash16, key)
            // assert(match == false)
        }
        newOff := seg.rb.End()
        seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen)
        seg.rb.Write(hdrBuf[:])
        seg.rb.Write(key)
        seg.rb.Write(value)
        seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
        atomic.AddInt64(&seg.totalTime, int64(now))
        atomic.AddInt64(&seg.totalCount, 1)
        seg.vacuumLen -= entryLen
        return
    }
    
    func (seg *segment) get(key []byte, hashVal uint64) (value []byte, expireAt uint32, err error) {
        slotId := uint8(hashVal >> 8)
        hash16 := uint16(hashVal >> 16)
        slotOff := int32(slotId) * seg.slotCap
        var slot = seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
        idx, match := seg.lookup(slot, hash16, key)
        if !match {
            err = ErrNotFound
            atomic.AddInt64(&seg.missCount, 1)
            return
        }
        ptr := &slot[idx]
        now := uint32(time.Now().Unix())
    
        var hdrBuf [ENTRY_HDR_SIZE]byte
        seg.rb.ReadAt(hdrBuf[:], ptr.offset)
        hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
        expireAt = hdr.expireAt
    
        if hdr.expireAt != 0 && hdr.expireAt <= now {
            seg.delEntryPtr(slotId, hash16, ptr.offset)
            atomic.AddInt64(&seg.totalExpired, 1)
            err = ErrNotFound
            atomic.AddInt64(&seg.missCount, 1)
            return
        }
        atomic.AddInt64(&seg.totalTime, int64(now-hdr.accessTime))
        hdr.accessTime = now
        seg.rb.WriteAt(hdrBuf[:], ptr.offset)
        value = make([]byte, hdr.valLen)
    
        seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
        atomic.AddInt64(&seg.hitCount, 1)
        return
    }
    
    func (seg *segment) del(key []byte, hashVal uint64) (affected bool) {
        slotId := uint8(hashVal >> 8)
        hash16 := uint16(hashVal >> 16)
        slotOff := int32(slotId) * seg.slotCap
        slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
        idx, match := seg.lookup(slot, hash16, key)
        if !match {
            return false
        }
        ptr := &slot[idx]
        seg.delEntryPtr(slotId, hash16, ptr.offset)
        return true
    }
    

    总结

    流程总结

    申请内存空间->初始化槽位和值->设置键值->获取键值

    代码总结

    项目核心文件有 4 个:

    • cache.go 主要方法
    • iterator.go 迭代
    • segment.go 存储操作
    • ringbuf.go 环形存储

    相关文章

      网友评论

          本文标题:【源码】FreeCache - Go的缓存库,具有零GC开销

          本文链接:https://www.haomeiwen.com/subject/rpkluqtx.html