美文网首页
Go: channel 源码实现

Go: channel 源码实现

作者: 董泽润 | 来源:发表于2019-02-15 10:58 被阅读47次

    关于 channel 实现的文章一大堆,网上好多抄来抄去的,这篇也没啥新意。实现一点也不复杂,直接撸代码 src/runtime/chan.go

    channel 结构体

    先看一下 channel 结构体的实现,代码注释己经足够好,字段也足够精简

    type hchan struct {
        qcount   uint           // total data in the queue
        dataqsiz uint           // size of the circular queue
        buf      unsafe.Pointer // points to an array of dataqsiz elements
        elemsize uint16
        closed   uint32
        elemtype *_type // element type
        sendx    uint   // send index
        recvx    uint   // receive index
        recvq    waitq  // list of recv waiters
        sendq    waitq  // list of send waiters
    
        // lock protects all fields in hchan, as well as several
        // fields in sudogs blocked on this channel.
        //
        // Do not change another G's status while holding this lock
        // (in particular, do not ready a G), as this can deadlock
        // with stack shrinking.
        lock mutex
    }
    

    qcount 当前缓存元素个数,dataqsiz 当前能缓存的最大容量,类似 slice 的 len 和 cap, 实际上如果 len(ch), cap(ch) 就是取得这两个字段。buf 缓存底层数据的指针,是一个定长的环形数组。elemsizeeletype 代表底层元素类型和长度,这个好理解,一个 channel 创建时就己经确定存储元素类型。sendx, srecvx 环境数组的索引,recvq, sendq 消费者和生产者队列。closed 非 0 代表己关闭不允许再写入。lock 是一把大锁,有点经难的就会明白大锁意味着什么,在多生产者时性能会有影响。

    创建 channel

    quitCh1 := make(chan int, 10)
    quitCh2 := make(chan int)
    quitCh3 := make(chan *SomeStruct, 10)
    

    上面是常用的创建 ch 语句,第一个创建 int 类型管道,缓存容量 10, 第二个创建 int 类型管道,缓存容量是 0, 第三个同样创建容量 10 个,但是类型是指针。

    func makechan(t *chantype, size int) *hchan {
        elem := t.elem
    
        // compiler checks this but be safe.
        if elem.size >= 1<<16 {
            throw("makechan: invalid channel element type")
        }
        if hchanSize%maxAlign != 0 || elem.align > maxAlign {
            throw("makechan: bad alignment")
        }
    
        if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
            panic(plainError("makechan: size out of range"))
        }
    
        // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
        // buf points into the same allocation, elemtype is persistent.
        // SudoG's are referenced from their owning thread so they can't be collected.
        // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
        var c *hchan
        switch {
        case size == 0 || elem.size == 0:
            // Queue or element size is zero.
            c = (*hchan)(mallocgc(hchanSize, nil, true))
            // Race detector uses this location for synchronization.
            c.buf = c.raceaddr()
        case elem.kind&kindNoPointers != 0:
            // Elements do not contain pointers.
            // Allocate hchan and buf in one call.
            c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
            c.buf = add(unsafe.Pointer(c), hchanSize)
        default:
            // Elements contain pointers.
            c = new(hchan)
            c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
        }
    
        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        c.dataqsiz = uint(size)
    
        if debugChan {
            print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
        }
        return c
    }
    
    1. 首先判断 elem.size 存储元素的大小,不能大于 1<<16, 一般结构体远不会这么大
    2. hchanSize 是 channel 本身结构体的大小加上一定 padding,必须是按 maxAlign 对齐的,并且元素 elem.align 对齐也不能大于 maxAlign
        maxAlign  = 8
        hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    

    看源码文件,默认 maxAlign 值是 8,hchanSize 是 sizeof(hchan) 再根据 maxAlign 对齐调整后的值

    1. 判断创建 channel size 是否合法
    2. switch 语句块根据三种情况分别创建 hchan 结构体并分配底层 buf 空间。这里要看元素是否是指针类型的,如果 kindNoPointers 那就是值类型的,hchan 结构体和 buf 一起分配,地址连续。如果 size == 0, 代表是非缓冲 channel,不分配 buf
    3. 最后赋值 elemsize, elemtype, dataqsiz, 返回 hchan

    读写数据语法糖

    我们一般用 select case 去读取 channel 数据,左侧接收变量可以有两个,第一个是管道读出的数据,第二个 bool 值代表是否有效,如果管理己经被关闭了,那读到得数据无效,第二个就是 false

    case v, ok = <-c:
    case v = <-c:
    

    上面是常见的两种语法糖,最终都会被编绎器展开成 selectnbrecv2 或是 selectnbrecv

    compiler implements
    
        select {
        case v, ok = <-c:
            ... foo
        default:
            ... bar
        }
    
    as
    
        if c != nil && selectnbrecv2(&v, &ok, c) {
            ... foo
        } else {
            ... bar
        }
    

    这是官方源码里的注释,还有 send 发送数据的语法糖,大家可以自行查看。

    生产数据

    参数有四个,c 是管道,ep 是将要生产写入的数据,block 表是是否阻塞发送,利用 select 语句可以实现非阻塞发送

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        if c == nil {  // 检测 chan 是否为 nil
            if !block { // 如果为 nil 并且非阻塞那么直接返回 false
                return false
            } // 如果 chan 为 nil, 并且阻塞试操作,那么掉用者直接 panic 挂掉
            gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
    
        if debugChan {
            print("chansend: chan=", c, "\n")
        }
    
        if raceenabled {
            racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
        }
    
        // Fast path: check for failed non-blocking operation without acquiring the lock.
        //
        // After observing that the channel is not closed, we observe that the channel is
        // not ready for sending. Each of these observations is a single word-sized read
        // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
        // Because a closed channel cannot transition from 'ready for sending' to
        // 'not ready for sending', even if the channel is closed between the two observations,
        // they imply a moment between the two when the channel was both not yet closed
        // and not ready for sending. We behave as if we observed the channel at that moment,
        // and report that the send cannot proceed.
        //
        // It is okay if the reads are reordered here: if we observe that the channel is not
        // ready for sending and then observe that it is not closed, that implies that the
        // channel wasn't closed during the first observation.
        // fast path 就是快速判断的意思,并不是所有情况都要加锁,有时可以直接退出,返回 false 要同时满足以下三种情况
        //1. 非阻塞试写
        //2. 当前管理没有被关闭
        //3. 容量为 0,并且没有消费者在待待,或是,容量不为 0,但是己经满了
        if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
            (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
            return false
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        lock(&c.lock) // 开始加上了万恶的锁
    
        if c.closed != 0 { // 如果向己经关闭的管道写数据,解锁,panic
            unlock(&c.lock)
            panic(plainError("send on closed channel"))
        }
    
        if sg := c.recvq.dequeue(); sg != nil { // 如果当前有消费者在等待数据,说明 buf 空了,有消费者阻塞着,直接发送给他,不用缓存到 buf 中
            // Found a waiting receiver. We pass the value we want to send
            // directly to the receiver, bypassing the channel buffer (if any).
            send(c, sg, ep, func() { unlock(&c.lock) }, 3) // 最后一定要 unlock 
            return true
        }
    
        if c.qcount < c.dataqsiz { // 如果 buf 缓存中有空间,那么就写到 buf 中
            // Space is available in the channel buffer. Enqueue the element to send.
            qp := chanbuf(c, c.sendx) // 计算获取存放新元素的位置
            if raceenabled {
                raceacquire(qp)
                racerelease(qp)
            }
            typedmemmove(c.elemtype, qp, ep) // 将数据 copy 到指定位置
            c.sendx++ // 更新环形数组 sendx 和 qcount 然后解锁
            if c.sendx == c.dataqsiz {
                c.sendx = 0
            }
            c.qcount++
            unlock(&c.lock) // 成对,不能忘了解锁
            return true // 写成功了,返回 true
        }
    
        if !block { // 如果 buf 空间不足了,并且非阻塞写入,那么解锁退出,返回 false,代表写入失败了
            unlock(&c.lock)
            return false
        }
    
        // Block on the channel. Some receiver will complete our operation for us. 如果空间没有,并且阻塞式写入,那么 block 在这里,等待别人来唤醒
        gp := getg() // 获取当前 goroutine 
        mysg := acquireSudog() // mysg 是在此封装的结构体,有大用处
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
        // No stack splits between assigning elem and enqueuing mysg
        // on gp.waiting where copystack can find it.
        mysg.elem = ep // 将要写入的数据,放到 mysg.elem 字段中,等待以后用
        mysg.waitlink = nil
        mysg.g = gp
        mysg.isSelect = false
        mysg.c = c
        gp.waiting = mysg
        gp.param = nil
        c.sendq.enqueue(mysg) // 将自己入队,写入 channel 的 sendq 队列
        goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // 交出控制权,当前生产者 goroutine 休眠 park 在这里,等待被唤配
    
        // someone woke us up. 此时有人把生产者唤醒了,但是 gp.waiting 不是当前的 mysg 说明数据被搞花了,破坏了直接 panic 掉好了
        if mysg != gp.waiting {
            throw("G waiting list is corrupted")
        }
        gp.waiting = nil // 重置空值
        if gp.param == nil { // gp.param 为空说明出问题了,panic,具体为什么呢?需要配对看如何 chanrecv 的代码
            if c.closed == 0 {
                throw("chansend: spurious wakeup")
            }
            panic(plainError("send on closed channel"))
        }
        gp.param = nil
        if mysg.releasetime > 0 {
            blockevent(mysg.releasetime-t0, 2)
        }
        mysg.c = nil
        releaseSudog(mysg)
        return true // 到此数据生产者写入成功,返回 true,释放 mysg
    }
    

    如果当前有消费者在等待数据,生产者直接把数据发送过去,看下源码

    // send processes a send operation on an empty channel c.
    // The value ep sent by the sender is copied to the receiver sg.
    // The receiver is then woken up to go on its merry way.
    // Channel c must be empty and locked.  send unlocks c with unlockf.
    // sg must already be dequeued from c.
    // ep must be non-nil and point to the heap or the caller's stack.
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        if raceenabled {
            if c.dataqsiz == 0 {
                racesync(c, sg)
            } else {
                // Pretend we go through the buffer, even though
                // we copy directly. Note that we need to increment
                // the head/tail locations only when raceenabled.
                qp := chanbuf(c, c.recvx)
                raceacquire(qp)
                racerelease(qp)
                raceacquireg(sg.g, qp)
                racereleaseg(sg.g, qp)
                c.recvx++
                if c.recvx == c.dataqsiz {
                    c.recvx = 0
                }
                c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
            }
        }
        if sg.elem != nil {
            sendDirect(c.elemtype, sg, ep)
            sg.elem = nil
        }
        gp := sg.g
        unlockf()
        gp.param = unsafe.Pointer(sg)
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        goready(gp, skip+1)
    }
    
    
    func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
        // src is on our stack, dst is a slot on another stack.
    
        // Once we read sg.elem out of sg, it will no longer
        // be updated if the destination's stack gets copied (shrunk).
        // So make sure that no preemption points can happen between read & use.
        dst := sg.elem
        typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
        // No need for cgo write barrier checks because dst is always
        // Go memory.
        memmove(dst, src, t.size)
    }
    

    消费数据

    对应的,读取数据是消费者

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
        // raceenabled: don't need to check ep, as it is always on the stack
        // or is new memory allocated by reflect.
    
        if debugChan {
            print("chanrecv: chan=", c, "\n")
        }
    
        if c == nil { // 对于空 channel 读取是有问题的
            if !block { // 如果非阻塞的,直接退出好了
                return
            } // 对于阻塞的读取,直接 panic 掉
            gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
    
        // Fast path: check for failed non-blocking operation without acquiring the lock.
        //
        // After observing that the channel is not ready for receiving, we observe that the
        // channel is not closed. Each of these observations is a single word-sized read
        // (first c.sendq.first or c.qcount, and second c.closed).
        // Because a channel cannot be reopened, the later observation of the channel
        // being not closed implies that it was also not closed at the moment of the
        // first observation. We behave as if we observed the channel at that moment
        // and report that the receive cannot proceed.
        //
        // The order of operations is important here: reversing the operations can lead to
        // incorrect behavior when racing with a close. 也是有一个快速检测的,这样不用加锁了,和 chansend 原理类似
        if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
            c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
            atomic.Load(&c.closed) == 0 {
            return
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        lock(&c.lock)
    
        if c.closed != 0 && c.qcount == 0 { // 如果管道己经关闭,并且没有未读数据,解锁,返回零值
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            unlock(&c.lock)
            if ep != nil { // 重点在这里,memclr 使返回值 ep 变成了零值
                typedmemclr(c.elemtype, ep)
            } // 返回的第二个 false,所以说从 channel 中读取零值不代表是正确的数据,也有可能管道己经 close 了
            return true, false 
        }
    
        if sg := c.sendq.dequeue(); sg != nil { // 如果此时有 send 生产者阻塞在这里,那么直接从 send 生产者取数据
            // Found a waiting sender. If buffer is size 0, receive value
            // directly from sender. Otherwise, receive from head of queue
            // and add sender's value to the tail of the queue (both map to
            // the same buffer slot because the queue is full).
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    
        if c.qcount > 0 { // 如果此时没有生产者写数据,但是 buf 有数据,那么从 buf 中读
            // Receive directly from queue 找到未读的第一个数据
            qp := chanbuf(c, c.recvx)
            if raceenabled {
                raceacquire(qp)
                racerelease(qp)
            }
            if ep != nil { // 将未读的第一个数据,写到 ep 里面
                typedmemmove(c.elemtype, ep, qp)
            }
            typedmemclr(c.elemtype, qp) // 清空环形数组己经消费的 gp,维护 recvx 
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.qcount--
            unlock(&c.lock) // 解锁,返回 true ,true
            return true, true
        }
    
        if !block { // 如果没有可用数据,并且读是非阻塞,直接锁锁返回好了
            unlock(&c.lock)
            return false, false
        }
    
        // no sender available: block on this channel.
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
        // No stack splits between assigning elem and enqueuing mysg
        // on gp.waiting where copystack can find it.
        mysg.elem = ep
        mysg.waitlink = nil
        gp.waiting = mysg
        mysg.g = gp
        mysg.isSelect = false
        mysg.c = c
        gp.param = nil
        c.recvq.enqueue(mysg) // 将消费者 mysg 入队,然后 gopark 在这里,等待被唤醒
        goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
    
        // someone woke us up
        if mysg != gp.waiting {
            throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        if mysg.releasetime > 0 {
            blockevent(mysg.releasetime-t0, 2)
        }
        closed := gp.param == nil
        gp.param = nil
        mysg.c = nil
        releaseSudog(mysg)
        return true, !closed
    }
    

    recv 函数表示此时有 send 生产者阻塞在这里,那么直接从 send 生产者取数据

    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        if c.dataqsiz == 0 { // 没有缓冲 buf,直接从生产者获取数据
            if raceenabled {
                racesync(c, sg)
            }
            if ep != nil {
                // copy data from sender
                recvDirect(c.elemtype, sg, ep)
            }
        } else {
            // Queue is full. Take the item at the
            // head of the queue. Make the sender enqueue
            // its item at the tail of the queue. Since the
            // queue is full, those are both the same slot.
            qp := chanbuf(c, c.recvx) // 从 buf 中获取数据,此时有 send 阻塞在这里,那么 buf 肯定数据己经满了
            if raceenabled {
                raceacquire(qp)
                racerelease(qp)
                raceacquireg(sg.g, qp)
                racereleaseg(sg.g, qp)
            }
            // copy data from queue to receiver
            if ep != nil { // 复制 buf 指定位置的数据给消费者
                typedmemmove(c.elemtype, ep, qp)
            }
            // copy data from sender to queue 然后把阻塞的生产者数据写到此位置
            typedmemmove(c.elemtype, qp, sg.elem)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
        sg.elem = nil // 此时生产者暂时在 sg.elem 数据己经写入 channel 了,那么清空
        gp := sg.g
        unlockf()
        gp.param = unsafe.Pointer(sg) // gp.param 被赋值 sg,从这可以回头看 send 时为什么要检查该字段
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        goready(gp, skip+1) // 唤醒生产者
    }
    

    相关文章

      网友评论

          本文标题:Go: channel 源码实现

          本文链接:https://www.haomeiwen.com/subject/zeueeqtx.html