Go语言——channel详解
channel和goroutine是go语言最具特色是结构,有必要仔细研究。
源码路径:go1.10\src\runtime\chan.go
struct
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
- qcount:当前chann元素个数,len()
- dataqsiz:chann容量,cap()
- buf:元素缓冲区
- elemsize:元素大小
- closed:close标志位
- elemtype:元素类型
- sendx:下一个发送元素所在缓冲区索引
- recvx:下一个接收元素所在缓冲区索引
- recvq:接收G的等待队列
- sendq:发送G的等待队列
从上面可以看出,waitq
非常重要
type waitq struct {
first *sudog
last *sudog
}
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
g *g
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
waitq
是一个queue,具有头尾指针。
sudog
封装了等待chann接收/发送的G。sudogs有一个特殊的池,提高性能。
new
chan不能直接new,只能make,所以观察makechan方法。make的时候,会根据size判断是缓冲chan还是非缓冲chan,这个逻辑需要注意。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
var c *hchan
switch {
case size == 0 || elem.size == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = unsafe.Pointer(c)
case elem.kind&kindNoPointers != 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
这里看到由于要对buf对象分配,所以没有提供new方法,只能make。除了对default情况比较容易理解,另外两只buf的内存分配都不是很理解=。=,先遗留。
chansend
send & revcfunc chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
- lock
- 如果chann close,就直接panic,因为不能往close chan send数据
- 如果有G在等待接收数据,就直接将数据发给它。
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
- 如果buf还有剩余,就将数据保存在buf里面
- sendx++,跟上面的gif动图一样,指向下一个send位置
- sendx == 0表示buf满了
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
c.sendq.enqueue(mysg)
releaseSudog(mysg)
return true
}
以上情况不满足的话,接下来要做的就是block发送数据的g,看看chan具体怎么做的
- 获取当前g,注意这里的g是往chan发数据的g
- 从pool中获取一个sudog,保存当前g信息,方便后面唤醒
- 将sudog入队到sendq发送等待队列
- 释放sudog这里不是很懂
chanrecv
没找到动图,就不贴了。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
- lock
- 如果chan已经close,并且没有数据,就直接返回空值
- 如果有G正在等待发送数据,分情况
- 如果chan没有buf,就直接将G的数据发送给当前g
- 如果chan有buf, 就从buf头中取出数据G发送给当前g,并且唤醒G,然后把数据写入buf尾部
这里有两个g,一个是G表示等待发送数据的g,一个g表示接受chan数据的g。
这里ep对象有点意思,send中ep表示发送到的数据,揣测recv会从堆中取出数据,然后将ep指针指向这个堆数据。
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
如果chan有数据,就直接从buf里面取出来,然后维护recvx和qcount
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
c.recvq.enqueue(mysg)
releaseSudog(mysg)
return true, !closed
}
以上情况都不满足,即chan没有数据,也没有等待发送的g。那就需要block当前接收的g
- 获取当前等待接收数据的g
- 获取sudog
- 用sudog封装g信息
- 将sudog入队到recvq等待接收队列
- 释放sudog
网友评论