美文网首页
【源码】FreeCache - Go的缓存库,具有零GC开销

【源码】FreeCache - Go的缓存库,具有零GC开销

作者: iceinto | 来源:发表于2019-02-28 21:35 被阅读0次

FreeCache

FreeCache 是一个 Go 语言的缓存库,无额外的 GC 负荷。数百万对象的垃圾收集延迟仅在数百毫秒。您可以在内存中缓存无限数量的对象,而不会增加延迟和降低吞吐量。

特征

  • 可存储数以百万计条目
  • 零垃圾收集负荷
  • 高并发而且线程安全的访问
  • 纯 Go 语言实现
  • 支持对象失效
  • 近乎 LRU 的算法
  • 严格限制内存使用
  • 提供一个测试用的服务器,支持一些基本 Redis 命令

案例

cacheSize := 1024*1024
cache := freecache.NewCache(cacheSize)
key := []byte("abc")
val := []byte("def")
expire := 60 // expire in 60 seconds
cache.Set(key, val, expire)
got, err := cache.Get(key)
if err != nil {
    fmt.Println(err)
} else {
    fmt.Println(string(got))
}
affected := cache.Del(key)
fmt.Println("deleted key ", affected)
fmt.Println("entry count ", cache.EntryCount())

注意事项

  • 内存是预先分配的
  • 如果你分配的内存非常大,那么应该设置 debug.SetGCPercent() 到一个很小的比例来获得正常的 GC 频率

如何做到的

FreeCache通过减少指针数来避免GC开销。 无论存储多少个条目,只有512个指针。 数据集通过密钥的哈希值分片为256个段。 每个段只有两个指针,一个是存储键和值的环形缓冲区,另一个是用于查找条目的索引片。 每个段都有自己的锁,因此它支持高并发访问。

案例解释

// 设置开始内存大小
cacheSize := 1024*1024
// NewCache按大小返回一个新初始化缓存。
// 缓存大小至少设置为512KB。
// 如果大小设置得比较大,你应该打电话
// `debug.SetGCPercent()`,将其设置为更小的值
// 限制内存消耗和GC暂停时间。
cache := freecache.NewCache(cacheSize)

key := []byte("abc")
val := []byte("def")
expire := 60 // expire in 60 seconds

// Set设置缓存条目的key,值和过期,并将其存储在缓存中。
// 如果密钥大于65535或者值大于缓存大小的1/1024,
// 该条目不会写入缓存。 expireSeconds <= 0表示没有过期,
// 但是当缓存已满时可以将其逐出。
cache.Set(key, val, expire)
// 获取返回值或未找到错误。
got, err := cache.Get(key)
if err != nil {
    fmt.Println(err)
} else {
    fmt.Println(string(got))
}

affected := cache.Del(key)
fmt.Println("deleted key ", affected)
// EntryCount返回当前缓存中的项目数。
fmt.Println("entry count ", cache.EntryCount())

源码分析

依赖说明

本项目主要依赖项目:xxhash 具体使用参考 xxhash 源码分析

文件结构

cache.go
cache_test.go
segment.go
ringbuf.go
ringbuf_test.go
iterator.go

具体代码

从 cache.go 开始,主要是对外方法

声明的常量

const (
    // segmentCount表示freecache实例中的段数
    segmentCount    = 256
    // segmentAndOpVal按位并应用于hashVal以查找段ID
    segmentAndOpVal = 255
    // 最小声明内存大小
    minBufSize      = 512 * 1024
)

Cache 结构体

type Cache struct {
    locks    [segmentCount]sync.Mutex
    segments [segmentCount]segment
}

// segment.go 文件中 segment 结构体
type segment struct 

xxhash Sum64 计算

func hashFunc(data []byte) uint64 {
    return xxhash.Sum64(data)
}

初始化一个内存分配

func NewCache(size int) (cache *Cache) {
    if size < minBufSize { // 判断初始化空间是不是小于最小化设置
        size = minBufSize
    }
    cache = new(Cache)
    for i := 0; i < segmentCount; i++ { // 循环分配内存变量
        cache.segments[i] = newSegment(size/segmentCount, i)
    }
    return
}

设置一个键值,里面还有其他的设置方法例如:

  • func (cache *Cache) SetInt(key int64, value []byte, expireSeconds int) (err error)
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
    hashVal := hashFunc(key) // 调用key 的 hash 生成
    segID := hashVal & segmentAndOpVal
    cache.locks[segID].Lock()
    err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
    cache.locks[segID].Unlock()
    return
}

获取一个键值,里面还有其他的方法例如:

  • func (cache *Cache) GetInt(key int64) (value []byte, err error)
func (cache *Cache) Get(key []byte) (value []byte, err error) {
    hashVal := hashFunc(key)
    segID := hashVal & segmentAndOpVal
    cache.locks[segID].Lock()
    value, _, err = cache.segments[segID].get(key, hashVal)
    cache.locks[segID].Unlock()
    return
}

其他可能会用到的方法

  • func (cache *Cache) HitCount() (count int64) HitCount是一个指标值,用于返回在缓存中找到密钥的次数
  • func (cache *Cache) MissCount() (count int64) MissCount是一个指标值,用于返回缓存中未命中的次数
  • func (cache *Cache) LookupCount() int64 LookupCount是一个度量值,它返回给定键的查找发生的次数
  • func (cache *Cache) HitRate() float64 HitRate是命中率与查找率之比
  • func (cache *Cache) OverwriteCount() (overwriteCount int64)

有啦上面的一些方法我们就可以比较容易的基于一些监控提交做监控报表,例如:写一个 Prometheus exporter 就可以做监控啦

下面看看核心存储如何做的,来看 segment.go

const HASH_ENTRY_SIZE = 16  // 没有看到哪里有用到?
const ENTRY_HDR_SIZE = 24   // byte 什么默认大小

// 环形缓冲区中的条目头结构,后跟键和值。
type entryHdr struct {
    accessTime uint32
    expireAt   uint32
    keyLen     uint16
    hash16     uint16
    valLen     uint32
    valCap     uint32
    deleted    bool
    slotId     uint8
    reserved   uint16
}

segment 结构体说明

// 一个段包含256个槽,一个槽是由hash16值排序的入口指针数组
// 可以通过密钥的哈希值查找条目。
type segment struct {
    rb            RingBuf // ring buffer that stores data
    segId         int
    _             uint32
    missCount     int64
    hitCount      int64
    entryCount    int64
    totalCount    int64      // 环缓冲区中的条目数,包括已删除
    totalTime     int64      // 用于计算最近最少使用的条目.
    totalEvacuate int64      // used for debug
    totalExpired  int64      // used for debug
    overwrites    int64      // used for debug
    vacuumLen     int64      // 直到 vacuumLen,可以写入新数据而不会覆盖旧数据
    slotLens      [256]int32 // 每个插槽的实际长度
    slotCap       int32      // 插槽可以容纳的最大入口指针数
    slotsData     []entryPtr // 所有256个插槽
}

newSegment 初始化声明内存空间

func newSegment(bufSize int, segId int) (seg segment) {
    seg.rb = NewRingBuf(bufSize, 0)
    seg.segId = segId
    seg.vacuumLen = int64(bufSize)
    seg.slotCap = 1
    seg.slotsData = make([]entryPtr, 256*seg.slotCap)
    return
}

顺着这个进入 NewRingBuf

// 当数据超过时,环形缓冲区具有固定大小
// 大小,旧数据将被新数据覆盖。
// 它只包含从开始到结束的流中的数据
type RingBuf struct {
    begin int64 // beginning offset of the data stream.
    end   int64 // ending offset of the data stream.
    data  []byte
    index int //range from '0' to 'len(rb.data)-1'
}
// 声明 RingBuf size 为之前传递的大小(size/segmentCount)或最小值
func NewRingBuf(size int, begin int64) (rb RingBuf) {
    rb.data = make([]byte, size) // 声明一个空值
    rb.begin = begin
    rb.end = begin
    rb.index = 0
    return
}

声明完成后顺着这个顺序进入 segment:Set 和 Get 方法

func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
    if len(key) > 65535 { // 对大 key 限制
        return ErrLargeKey
    }
    maxKeyValLen := len(seg.rb.data)/4 - ENTRY_HDR_SIZE
    if len(key)+len(value) > maxKeyValLen { // 限制值得大小
        // Do not accept large entry.
        return ErrLargeEntry
    }
    now := uint32(time.Now().Unix()) // 获取当前时间戳
    expireAt := uint32(0)
    if expireSeconds > 0 {
        expireAt = now + uint32(expireSeconds)
    }

    slotId := uint8(hashVal >> 8)
    hash16 := uint16(hashVal >> 16)

    var hdrBuf [ENTRY_HDR_SIZE]byte // 声明默认大小
    hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))

    slotOff := int32(slotId) * seg.slotCap
    slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] // 获取槽的位置
    idx, match := seg.lookup(slot, hash16, key)
    if match {
        matchedPtr := &slot[idx]
        seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
        hdr.slotId = slotId
        hdr.hash16 = hash16
        hdr.keyLen = uint16(len(key))
        originAccessTime := hdr.accessTime
        hdr.accessTime = now
        hdr.expireAt = expireAt
        hdr.valLen = uint32(len(value))
        if hdr.valCap >= hdr.valLen {
            //in place overwrite
            atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
            seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
            seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
            atomic.AddInt64(&seg.overwrites, 1)
            return
        }
        // avoid unnecessary memory copy.
        seg.delEntryPtr(slotId, hash16, slot[idx].offset)
        //seg.delEntryPtr(slotId, hash16, seg.slotsData[idx].offset)
        match = false
        // increase capacity and limit entry len.
        for hdr.valCap < hdr.valLen {
            hdr.valCap *= 2
        }
        if hdr.valCap > uint32(maxKeyValLen-len(key)) {
            hdr.valCap = uint32(maxKeyValLen - len(key))
        }
    } else {
        hdr.slotId = slotId
        hdr.hash16 = hash16
        hdr.keyLen = uint16(len(key))
        hdr.accessTime = now
        hdr.expireAt = expireAt
        hdr.valLen = uint32(len(value))
        hdr.valCap = uint32(len(value))
        if hdr.valCap == 0 { // avoid infinite loop when increasing capacity.
            hdr.valCap = 1
        }
    }

    entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap)
    slotModified := seg.evacuate(entryLen, slotId, now)
    if slotModified {
        // the slot has been modified during evacuation, we need to looked up for the 'idx' again.
        // otherwise there would be index out of bound error.
        slot = seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
        idx, match = seg.lookup(slot, hash16, key)
        // assert(match == false)
    }
    newOff := seg.rb.End()
    seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen)
    seg.rb.Write(hdrBuf[:])
    seg.rb.Write(key)
    seg.rb.Write(value)
    seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
    atomic.AddInt64(&seg.totalTime, int64(now))
    atomic.AddInt64(&seg.totalCount, 1)
    seg.vacuumLen -= entryLen
    return
}

func (seg *segment) get(key []byte, hashVal uint64) (value []byte, expireAt uint32, err error) {
    slotId := uint8(hashVal >> 8)
    hash16 := uint16(hashVal >> 16)
    slotOff := int32(slotId) * seg.slotCap
    var slot = seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
    idx, match := seg.lookup(slot, hash16, key)
    if !match {
        err = ErrNotFound
        atomic.AddInt64(&seg.missCount, 1)
        return
    }
    ptr := &slot[idx]
    now := uint32(time.Now().Unix())

    var hdrBuf [ENTRY_HDR_SIZE]byte
    seg.rb.ReadAt(hdrBuf[:], ptr.offset)
    hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
    expireAt = hdr.expireAt

    if hdr.expireAt != 0 && hdr.expireAt <= now {
        seg.delEntryPtr(slotId, hash16, ptr.offset)
        atomic.AddInt64(&seg.totalExpired, 1)
        err = ErrNotFound
        atomic.AddInt64(&seg.missCount, 1)
        return
    }
    atomic.AddInt64(&seg.totalTime, int64(now-hdr.accessTime))
    hdr.accessTime = now
    seg.rb.WriteAt(hdrBuf[:], ptr.offset)
    value = make([]byte, hdr.valLen)

    seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
    atomic.AddInt64(&seg.hitCount, 1)
    return
}

func (seg *segment) del(key []byte, hashVal uint64) (affected bool) {
    slotId := uint8(hashVal >> 8)
    hash16 := uint16(hashVal >> 16)
    slotOff := int32(slotId) * seg.slotCap
    slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
    idx, match := seg.lookup(slot, hash16, key)
    if !match {
        return false
    }
    ptr := &slot[idx]
    seg.delEntryPtr(slotId, hash16, ptr.offset)
    return true
}

总结

流程总结

申请内存空间->初始化槽位和值->设置键值->获取键值

代码总结

项目核心文件有 4 个:

  • cache.go 主要方法
  • iterator.go 迭代
  • segment.go 存储操作
  • ringbuf.go 环形存储

相关文章

  • 【源码】FreeCache - Go的缓存库,具有零GC开销

    FreeCache FreeCache 是一个 Go 语言的缓存库,无额外的 GC 负荷。数百万对象的垃圾收集延迟...

  • Go语言——垃圾回收GC

    Go语言——垃圾回收GC 参考: Go 垃圾回收原理 Golang源码探索(三) GC的实现原理 Getting ...

  • 如何在Mac源码安装Go1.5开发环境

    到这里下载Go语言源码包。Go1.4到Go1.5是Go语言脱胎换骨的一次变更,The gc tool chain ...

  • Go中的缓存现状(BigCache&FreeCache&Grou

    Go中的缓存现状 这篇文章登上了Golang 在Reddit subreddit板块的顶部,并在Hacker Ne...

  • go gc 源码分析

    简介 go 堆上内存分配是通过 newobject 来分配的 runtime/malloc.go 中 runtim...

  • go-内存机制(4)

    go的GC机制 GO的GC是并行GC,也就是说GC的大部分清理和普通的go代码是同时运行的,这让GO的GC流程比较...

  • Go GC

    1、什么是GC?2、为什么会有GC?3、GC的优点?4、GC的缺点?5、Go中的GC历史6、Go中的GC实现原理(...

  • go mod vendor小计

    调试库代码 go mod现在已经深入人心。想用fmt.Printf大法调库代码,会发现缓存的go mod代码是不能...

  • client-go的使用及源码分析

    client-go[1]是Kubernetes 官方团队支持的Go语言客户端库。 示例代码 client-go源码...

  • 2018-05-02 Go Reflect

    转载自: Go Reflect 最近在看一些go语言标准库以及第三方库的源码时,发现go的reflect被大量使用...

网友评论

      本文标题:【源码】FreeCache - Go的缓存库,具有零GC开销

      本文链接:https://www.haomeiwen.com/subject/rpkluqtx.html