golang sync .pool

作者: Stevennnmmm | 来源:发表于2020-12-22 16:21 被阅读0次

    Go 是一个自动垃圾回收的编程语言,它的算法我们后续会讲到,主要就是采用三色并发标记算法标记对象并回收。我们可以不用考虑为golang来节省什么,但是我们如果想将程序做到优秀我们就不得不考虑减少它gc的次数,毕竟,Go 的自动垃圾回收机制还是有一个 STW(stop-the-world,程序暂停)的时间,而且,大量地创建在堆上的对象,也会影响垃圾回收标记的时间

    所以,一般我们做性能优化的时候,会采用对象池的方式,把不用的对象回收起来,避免被垃圾回收掉,这样使用的时候就不必在堆上重新创建了
    按照惯例我们先看官方文档

    官方文档

    // A Pool is a set of temporary objects that may be individually saved and
    // retrieved.
    //
    // Any item stored in the Pool may be removed automatically at any time without
    // notification. If the Pool holds the only reference when this happens, the
    // item might be deallocated.
    //
    // A Pool is safe for use by multiple goroutines simultaneously.
    //
    // Pool's purpose is to cache allocated but unused items for later reuse,
    // relieving pressure on the garbage collector. That is, it makes it easy to
    // build efficient, thread-safe free lists. However, it is not suitable for all
    // free lists.
    //
    // An appropriate use of a Pool is to manage a group of temporary items
    // silently shared among and potentially reused by concurrent independent
    // clients of a package. Pool provides a way to amortize allocation overhead
    // across many clients.
    //
    // An example of good use of a Pool is in the fmt package, which maintains a
    // dynamically-sized store of temporary output buffers. The store scales under
    // load (when many goroutines are actively printing) and shrinks when
    // quiescent.
    //
    // On the other hand, a free list maintained as part of a short-lived object is
    // not a suitable use for a Pool, since the overhead does not amortize well in
    // that scenario. It is more efficient to have such objects implement their own
    // free list.
    //
    // A Pool must not be copied after first use.
    

    引用:/src/sync/pool.go:44

    //池是一组临时对象,可以分别保存和
    //检索到。
    //
    //池中存储的任何项目都可以随时自动删除,而无需
    //通知。如果发生这种情况时,池中只有唯一的引用,则
    //可能已释放项目。
    //
    //一个Pool可以安全地同时被多个goroutine使用。
    //
    //池的目的是缓存已分配但未使用的项目,以供以后重用,
    //减轻垃圾收集器的压力。也就是说,它很容易
    //建立有效的,线程安全的空闲列表。但是,它并不适合所有人
    //免费列表。
    //
    //池的适当用法是管理一组临时项
    //在并发的独立服务器之间静默共享并有可能被重用
    //包的客户。池提供了一种摊销分配开销的方法
    //在许多客户中。
    //
    // fmt包中有一个很好使用Pool的示例,它维护了一个
    //动态大小的临时输出缓冲区存储。店铺规模在
    //加载(当许多goroutine正在活动打印时),并在收缩时收缩
    //静止。
    //
    //另一方面,作为短期对象的一部分维护的空闲列表是
    //不适合作为Pool的用途,因为间接费用无法在
    //这种情况。使此类对象实现自己的效率更高
    //空闲列表。
    //
    //池在第一次使用后不得复制。
    
    

    具体使用

    sync.Pool 数据类型用来保存一组可独立访问的临时对象,为什么说是临时,因为可能随时被移除掉,在stw的时候也有可能被移除掉。所以我们不要使用这个做长链接保存

    pool是一个线程安全的对象,池中的东西随时可能被销毁
    

    New

    Pool struct 包含一个 New 字段,这个字段的类型是函数 func() interface{}。当调用 Pool 的 Get 方法从池中获取元素,没有更多的空闲元素可返回时,就会调用这个 New 方法来创建新的元素。如果你没有设置 New 字段,没有更多的空闲元素可返回时,Get 方法将返回 nil,表明当前没有可用的元素

    Put

    这个方法用于将一个元素返还给 Pool,Pool 会把这个元素保存到池中,并且可以复用。但如果 Put 一个 nil 值,Pool 就会忽略这个值。

    Get

    如果调用这个方法,就会从 Pool取走一个元素,这也就意味着,这个元素会从 Pool 中移除,返回给调用者。不过,除了返回值是正常实例化的元素,Get 方法的返回值还可能会是一个 nil(Pool.New 字段没有设置,又没有空闲元素可以返回),所以你在使用的时候,可能需要判断。

    sync.pool源码解析

    这个是池的结构

    type Pool struct {
        noCopy noCopy          //这个好像是为了使用govet 可以检测冲突
        local     unsafe.Pointer // 这是一个本地的环的指针
        localSize uintptr        // 本地数组大小
        victim     unsafe.Pointer // local from previous cycle
        victimSize uintptr        // size of victims array
        // New optionally specifies a function to generate
        // a value when Get would otherwise return nil.
        // It may not be changed concurrently with calls to Get.
        New func() interface{}
    }
    

    在这段代码中,你需要关注一下 local 字段,因为所有当前主要的空闲可用的元素都存放在 local 字段中,请求元素时也是优先从 local 字段中查找可用的元素。local 字段包含一个 poolLocalInternal 字段,并提供 CPU 缓存对齐,从而避免 false sharing。

    // Local per-P Pool appendix.
    type poolLocalInternal struct {
        private interface{} // Can be used only by the respective P.
        shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
    }
    type poolLocal struct {
        poolLocalInternal
        // Prevents false sharing on widespread platforms with
        // 128 mod (cache line size) = 0 .
        pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
    }
    

    而 poolLocalInternal 也包含两个字段:private 和 shared。private,代表一个缓存的元素,而且只能由相应的一个 P 存取。因为一个 P 同时只能执行一个 goroutine,所以不会有并发的问题。shared,可以由任意的 P 访问,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail,相当于只有一个本地的 P 作为生产者(Producer),多个 P 作为消费者(Consumer),它是使用一个 local-free 的 queue 列表实现的。

    • 其实首先我们得明确几个点存储数据指针是local

    • 当发生GC的时候,pool是怎么运行的:实际上会调用一个函数叫做poolcleanup

    1.poolCleanup方法

    func poolCleanup() {
        // This function is called with the world stopped, at the beginning of a garbage collection.
        // It must not allocate and probably should not call any runtime functions.
        // Because the world is stopped, no pool user can be in a
        // pinned section (in effect, this has all Ps pinned).
        // Drop victim caches from all pools.
        for _, p := range oldPools {
            p.victim = nil
            p.victimSize = 0
        }
        // Move primary cache to victim cache.
        for _, p := range allPools {
            p.victim = p.local
            p.victimSize = p.localSize
            p.local = nil
            p.localSize = 0
        }
        // The pools with non-empty primary caches now have non-empty
        // victim caches and no pools have primary caches.
        oldPools, allPools = allPools, nil
    }
    

    整体大概意思是:将victim的数据给清空,将local池中的数据丢给victim,

    2.get方法

    // Get selects an arbitrary item from the Pool, removes it from the
    // Pool, and returns it to the caller.
    // Get may choose to ignore the pool and treat it as empty.
    // Callers should not assume any relation between values passed to Put and
    // the values returned by Get.
    //
    // If Get would otherwise return nil and p.New is non-nil, Get returns
    // the result of calling p.New.
    func (p *Pool) Get() interface{} {
        if race.Enabled {
            race.Disable()
        }
        l, pid := p.pin() //将goroutine 绑定到P上
        x := l.private  
        l.private = nil
        if x == nil {
            // Try to pop the head of the local shard. We prefer
            // the head over the tail for temporal locality of
            // reuse.
            x, _ = l.shared.popHead()
            if x == nil {
                x = p.getSlow(pid)
            }
        }
        runtime_procUnpin()
        if race.Enabled {
            race.Enable()
            if x != nil {
                race.Acquire(poolRaceAddr(x))
            }
        }
        if x == nil && p.New != nil {
            x = p.New()
        }
        return x
    }
    

    其实就四种可能
    1.从本地的private取出来x
    2.当第一步取不出的时候,从本地的分片头部取一个出来
    3.当本地分片没有了,走慢方法getslow
    4.当大家都没有了,生成一个新的

    3.put方法

    // Put adds x to the pool.
    func (p *Pool) Put(x interface{}) {
        if x == nil {
            return
        }
        if race.Enabled {
            if fastrand()%4 == 0 {
                // Randomly drop x on floor.
                return
            }
            race.ReleaseMerge(poolRaceAddr(x))
            race.Disable()
        }
        l, _ := p.pin()
        if l.private == nil {
            l.private = x
            x = nil
        }
        if x != nil {
            l.shared.pushHead(x)
        }
        runtime_procUnpin()
        if race.Enabled {
            race.Enable()
        }
    }
    
    

    这个要比get方法简单一些
    1.如果放入的东西是nil ,return
    2.如果private !=nil ,直接放到private中
    3.从头部放入本地分片中

    3.getslow方法

    func (p *Pool) getSlow(pid int) interface{} {
        // See the comment in pin regarding ordering of the loads.
        size := atomic.LoadUintptr(&p.localSize) // load-acquire
        locals := p.local                        // load-consume
        // Try to steal one element from other procs.
        for i := 0; i < int(size); i++ {
            l := indexLocal(locals, (pid+i+1)%int(size))
            if x, _ := l.shared.popTail(); x != nil {
                return x
            }
        }
        // Try the victim cache. We do this after attempting to steal
        // from all primary caches because we want objects in the
        // victim cache to age out if at all possible.
        size = atomic.LoadUintptr(&p.victimSize)
        if uintptr(pid) >= size {
            return nil
        }
        locals = p.victim
        l := indexLocal(locals, pid)
        if x := l.private; x != nil {
            l.private = nil
            return x
        }
        for i := 0; i < int(size); i++ {
            l := indexLocal(locals, (pid+i)%int(size))
            if x, _ := l.shared.popTail(); x != nil {
                return x
            }
        }
        // Mark the victim cache as empty for future gets don't bother
        // with it.
        atomic.StoreUintptr(&p.victimSize, 0)
        return nil
    }
    

    这里其实是一个内置函数,我就不做过多讲解,其实就是:当本地分片没有了资源了后,尝试去窃取一个资源,窃取不到的时候我们,就会从victim的内容中去获取一个对象,也就是垃圾分拣站获取一个旧的对象,但是他们的注释其实还是很有意思的:

        // Try the victim cache. We do this after attempting to steal
        // from all primary caches because we want objects in the
        // victim cache to age out if at all possible.
    

    我们想尽可能的不去用这个·victim对象,至于原因我目前还不太了解。

    踩坑点

    1.内存泄露

    
    var buffers = sync.Pool{
      New: func() interface{} { 
        return new(bytes.Buffer)
      },
    }
    
    func GetBuffer() *bytes.Buffer {
      return buffers.Get().(*bytes.Buffer)
    }
    
    func PutBuffer(buf *bytes.Buffer) {
      buf.Reset()
      buffers.Put(buf)
    }
    

    这段代码其实看上去人畜无害,但是实际上在我们应用中,buf底层是一个包含切片的结构体,当我们将这个切片扩展到一定长度后归还,仅仅是将len重置了,实际上的cap会保留,那么我们知道切片的底层就是array,那么这个内存我们一直回收不了就造成了泄露。

    2.内存浪费

    除了内存泄漏以外,还有一种浪费的情况,就是池子中的 buffer 都比较大,但在实际使用的时候,很多时候只需要一个小的 buffer,这也是一种浪费现象。其实,我们可以将 buffer 池分成几层。首先,小于 512 byte 的元素的 buffer 占一个池子;其次,小于 1K byte 大小的元素占一个池子;再次,小于 4K byte 大小的元素占一个池子。这样分成几个池子以后,就可以根据需要,到所需大小的池子中获取 buffer 了。

    相关文章

      网友评论

        本文标题:golang sync .pool

        本文链接:https://www.haomeiwen.com/subject/tdrlnktx.html