Pool是一个可以被单独保存和回收的临时对象集合。
在pool中保存的对象会在任意时间没有通知的情况下自动清除。如果发生了这种情况,且pool持有对象的唯一的指针,那么这个对象可能会被回收掉。
pool是在多个goroutine使用时并发安全的。
Pool的目的是缓存已经被分配但是还没有被使用以便之后使用,缓解gc的压力。也就是说,它可以轻易的建立起有效的,线程安全的free lists。但是它并不适合于所有的 free lists。
Pool比较合适用法是来管理一组临时对象,这些对象可能会被一个包的不同独立客户端同时复用。Pool提供了一种方法,可以用来分摊分配的开销跨多个客户端。
fmt包是Pool用的比较好的一个例子,它维护了一个动态大小的临时输出缓存区存储。这个存储区会扫描负载(当goroutine拼命打印的时候),当不打印的时候会收缩大小。
另一方面,一个短生命周期的free list并不适合于pool,因为并不能很好的分摊开销。让这些短生命周期的对象实现自己的free list的话效率会更高。
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
local代表pool的数组对象,localsizepool数组对象的大小通常也gomaxprocs一样。
New是初始化pool的时候需要提供的new函数
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
Pool结构体中的local其实指向的是poolLocal,localsize其实也是pollLocal的大小,而poolLocal的值就是poolLocalInternal.
poolLocal里面有两个部分一个是private,另一个是shared。private就只能使用当前P为索引的数据,shared是一个链表,数据可以被所有的数据使用,当前的P可以执行pushHead和popHead操作,其余P可以执行popTail操作。当G运行在某个P上时,获取pool中的数据总是先从private找,找不到再从shared里面找。
如果发生gc,那么就直接把local和localSize的值直接赋值给victim和victimSize,如果以前的victim和victimSize有值的话,就会被直接覆盖(这大概为什么就叫 victim吧)。如果上面shared还没找到数据就会从victim里面去找。
var poolRaceHash [128]uint64
// poolRaceAddr returns an address to use as the synchronization point
// for race detector logic. We don't use the actual pointer stored in x
// directly, for fear of conflicting with other synchronization on that address.
// Instead, we hash the pointer to get an index into poolRaceHash.
// See discussion on golang.org/cl/31589.
func poolRaceAddr(x interface{}) unsafe.Pointer {
ptr := uintptr((*[2]unsafe.Pointer)(unsafe.Pointer(&x))[1])
h := uint32((uint64(uint32(ptr)) * 0x85ebca6b) >> 16)
return unsafe.Pointer(&poolRaceHash[h%uint32(len(poolRaceHash))])
}
poolRaceAddr返回一个poolRaceHash索引的指针,在存储数据时不会使用实际的指针,因为可能相同的地址会有别的同步操作,所以就把指针hash存到poolRaceHash中返回。
// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
pin方法将当前的goroutine绑定在P上,关闭抢占返回当前P的poolLocal和p的id。调用者必须调用runtime_procUnpin()方法,在完成pool的时候。
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
put方法将x放入pool,除去冲突问题,首先是调用pin方法,返回当前p的poolLocal,先去判断private有没有值,如果没有值就直接赋值给private,但是如果有值的话会把值放在shared里面,然后调用unpin解除。
// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Try the victim cache. We do this after attempting to steal
// from all primary caches because we want objects in the
// victim cache to age out if at all possible.
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Mark the victim cache as empty for future gets don't bother
// with it.
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
get方法获得当前P在pool中的值,先执行pin()方法获得当前P的localPool,检测private的值,并将private的值置为空,如果之前有值就返回了,否则就会从shared里面找,并且移除该值,如果还为空,就去执行getSlow()方法,去别的P中找值,如果还是找不到就会去victim中找,如果依旧找不到就会调用pool中New()并且返回p.New()。
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Because the world is stopped, no pool user can be in a
// pinned section (in effect, this has all Ps pinned).
// Drop victim caches from all pools.
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
oldPools, allPools = allPools, nil
}
当发生STW,在垃圾回收开始的时候就会调用poolCleanup()函数。这个方法不允许调用任何runtime的方法,因为发生STW,任何东西都调用不到pin()。
poolCleanup()流程是,先将所有pool的victim丢弃掉,然后将所有pool中的localPool放到victim中,然后将当前allPools赋值给oldPools,将allPools置为空。
以上为粗略理解,如有不妥之处,请不吝赐教,谢谢。
网友评论