关于 channel 实现的文章一大堆,网上好多抄来抄去的,这篇也没啥新意。实现一点也不复杂,直接撸代码 src/runtime/chan.go
channel 结构体
先看一下 channel 结构体的实现,代码注释己经足够好,字段也足够精简
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
qcount
当前缓存元素个数,dataqsiz
当前能缓存的最大容量,类似 slice 的 len 和 cap, 实际上如果 len(ch), cap(ch) 就是取得这两个字段。buf
缓存底层数据的指针,是一个定长的环形数组。elemsize
和 eletype
代表底层元素类型和长度,这个好理解,一个 channel 创建时就己经确定存储元素类型。sendx
, srecvx
环境数组的索引,recvq
, sendq
消费者和生产者队列。closed
非 0 代表己关闭不允许再写入。lock
是一把大锁,有点经难的就会明白大锁意味着什么,在多生产者时性能会有影响。
创建 channel
quitCh1 := make(chan int, 10)
quitCh2 := make(chan int)
quitCh3 := make(chan *SomeStruct, 10)
上面是常用的创建 ch 语句,第一个创建 int 类型管道,缓存容量 10, 第二个创建 int 类型管道,缓存容量是 0, 第三个同样创建容量 10 个,但是类型是指针。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case size == 0 || elem.size == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
- 首先判断 elem.size 存储元素的大小,不能大于 1<<16, 一般结构体远不会这么大
- hchanSize 是 channel 本身结构体的大小加上一定 padding,必须是按 maxAlign 对齐的,并且元素 elem.align 对齐也不能大于 maxAlign
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
看源码文件,默认 maxAlign 值是 8,hchanSize 是 sizeof(hchan) 再根据 maxAlign 对齐调整后的值
- 判断创建 channel size 是否合法
- switch 语句块根据三种情况分别创建 hchan 结构体并分配底层 buf 空间。这里要看元素是否是指针类型的,如果 kindNoPointers 那就是值类型的,hchan 结构体和 buf 一起分配,地址连续。如果 size == 0, 代表是非缓冲 channel,不分配 buf
- 最后赋值 elemsize, elemtype, dataqsiz, 返回 hchan
读写数据语法糖
我们一般用 select case 去读取 channel 数据,左侧接收变量可以有两个,第一个是管道读出的数据,第二个 bool 值代表是否有效,如果管理己经被关闭了,那读到得数据无效,第二个就是 false
case v, ok = <-c:
case v = <-c:
上面是常见的两种语法糖,最终都会被编绎器展开成 selectnbrecv2
或是 selectnbrecv
compiler implements
select {
case v, ok = <-c:
... foo
default:
... bar
}
as
if c != nil && selectnbrecv2(&v, &ok, c) {
... foo
} else {
... bar
}
这是官方源码里的注释,还有 send 发送数据的语法糖,大家可以自行查看。
生产数据
参数有四个,c 是管道,ep 是将要生产写入的数据,block 表是是否阻塞发送,利用 select 语句可以实现非阻塞发送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil { // 检测 chan 是否为 nil
if !block { // 如果为 nil 并且非阻塞那么直接返回 false
return false
} // 如果 chan 为 nil, 并且阻塞试操作,那么掉用者直接 panic 挂掉
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
// fast path 就是快速判断的意思,并不是所有情况都要加锁,有时可以直接退出,返回 false 要同时满足以下三种情况
//1. 非阻塞试写
//2. 当前管理没有被关闭
//3. 容量为 0,并且没有消费者在待待,或是,容量不为 0,但是己经满了
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock) // 开始加上了万恶的锁
if c.closed != 0 { // 如果向己经关闭的管道写数据,解锁,panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil { // 如果当前有消费者在等待数据,说明 buf 空了,有消费者阻塞着,直接发送给他,不用缓存到 buf 中
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3) // 最后一定要 unlock
return true
}
if c.qcount < c.dataqsiz { // 如果 buf 缓存中有空间,那么就写到 buf 中
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 计算获取存放新元素的位置
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep) // 将数据 copy 到指定位置
c.sendx++ // 更新环形数组 sendx 和 qcount 然后解锁
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock) // 成对,不能忘了解锁
return true // 写成功了,返回 true
}
if !block { // 如果 buf 空间不足了,并且非阻塞写入,那么解锁退出,返回 false,代表写入失败了
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us. 如果空间没有,并且阻塞式写入,那么 block 在这里,等待别人来唤醒
gp := getg() // 获取当前 goroutine
mysg := acquireSudog() // mysg 是在此封装的结构体,有大用处
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep // 将要写入的数据,放到 mysg.elem 字段中,等待以后用
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 将自己入队,写入 channel 的 sendq 队列
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // 交出控制权,当前生产者 goroutine 休眠 park 在这里,等待被唤配
// someone woke us up. 此时有人把生产者唤醒了,但是 gp.waiting 不是当前的 mysg 说明数据被搞花了,破坏了直接 panic 掉好了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil // 重置空值
if gp.param == nil { // gp.param 为空说明出问题了,panic,具体为什么呢?需要配对看如何 chanrecv 的代码
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true // 到此数据生产者写入成功,返回 true,释放 mysg
}
如果当前有消费者在等待数据,生产者直接把数据发送过去,看下源码
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src is on our stack, dst is a slot on another stack.
// Once we read sg.elem out of sg, it will no longer
// be updated if the destination's stack gets copied (shrunk).
// So make sure that no preemption points can happen between read & use.
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// No need for cgo write barrier checks because dst is always
// Go memory.
memmove(dst, src, t.size)
}
消费数据
对应的,读取数据是消费者
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil { // 对于空 channel 读取是有问题的
if !block { // 如果非阻塞的,直接退出好了
return
} // 对于阻塞的读取,直接 panic 掉
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close. 也是有一个快速检测的,这样不用加锁了,和 chansend 原理类似
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 { // 如果管道己经关闭,并且没有未读数据,解锁,返回零值
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil { // 重点在这里,memclr 使返回值 ep 变成了零值
typedmemclr(c.elemtype, ep)
} // 返回的第二个 false,所以说从 channel 中读取零值不代表是正确的数据,也有可能管道己经 close 了
return true, false
}
if sg := c.sendq.dequeue(); sg != nil { // 如果此时有 send 生产者阻塞在这里,那么直接从 send 生产者取数据
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
if c.qcount > 0 { // 如果此时没有生产者写数据,但是 buf 有数据,那么从 buf 中读
// Receive directly from queue 找到未读的第一个数据
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil { // 将未读的第一个数据,写到 ep 里面
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp) // 清空环形数组己经消费的 gp,维护 recvx
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock) // 解锁,返回 true ,true
return true, true
}
if !block { // 如果没有可用数据,并且读是非阻塞,直接锁锁返回好了
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 将消费者 mysg 入队,然后 gopark 在这里,等待被唤醒
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
recv
函数表示此时有 send 生产者阻塞在这里,那么直接从 send 生产者取数据
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { // 没有缓冲 buf,直接从生产者获取数据
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx) // 从 buf 中获取数据,此时有 send 阻塞在这里,那么 buf 肯定数据己经满了
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil { // 复制 buf 指定位置的数据给消费者
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue 然后把阻塞的生产者数据写到此位置
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil // 此时生产者暂时在 sg.elem 数据己经写入 channel 了,那么清空
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg) // gp.param 被赋值 sg,从这可以回头看 send 时为什么要检查该字段
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 唤醒生产者
}
网友评论