美文网首页
Go 深入源码 —— channel

Go 深入源码 —— channel

作者: 哎呦卧槽_960f | 来源:发表于2020-11-06 23:00 被阅读0次

    Don't communicate by sharing memory, share memory by communicating.

    不要通过共享内存来通信,而要通过通信来实现内存共享。

    数据结构

    我们可以把 Channel 看做是一个先进先出(FIFO)的数据队列,那么如何实现这种队列

    channel 的底层数据结构是一个 *hchan,在编译时期会将 make(chan int) 语句转换成 makechan 函数调用

    hchan

    // runtime/chan.go
    type hchan struct {
        lock mutex  // lock 用来保护 hchan 上所有的字段
        
        // 缓冲区实际是一个循环队列
        buf unsafe.Pointer  // 指向缓冲区的指针
        dataqsiz uint   // 缓冲区循环队列的大小
        sendx uint      // 缓冲区循环队列接收下一个元素的索引
        recvx uint      //  缓冲区循环队列中下一个会返回的元素的索引
        
        qcount uint // 当前 hchan 缓存的元素数量
        closed uint32   // hchan 是否关闭
        
        elemsize uint16 // hchan 的元素大小
        elemtype *_type // hchan 的元素类型
        
        recvq waitq     // 等待接收的 goroutine 队列
        sendq waitq     // 等待发送的 goroutine 队列
    }
    

    可以看出 channel 的底层数据结构

    • 缓冲区 buf 底层是一个循环队列,dataqsizqcount 分别记录了缓冲区的大小和当前缓冲的元素数量,sendxrecvx 用来记录位置索引
    • elemsizeelemtype 表示元素大小和类型
    • recvqsendq 来记录被发送接收阻塞的 goroutine 队列
    • closed 用来记录是否关闭
    • lock 用来保护hchan中的字段,更新其他字段的时候都需要加锁

    对于无缓冲 channel 是不需要和缓冲区相关的字段的

    channel 在实现中依然使用到了锁,Go 所说的 使用通信来实现共享内存,实际上依然在底层使用锁来保证读写的原子性,实现出了一个面向数据流式的数据结构

    待发送者和待接收者

    注意到 recvqsendq 类型 waitq 是一个双向链表,提供了等待 goroutine 的出队入队

    // runtime/chan.go
    type waitq struct {
        first *sudog
        last * sudog
    }
    
    func(q *waitq) enqueue(sgp *sudog){
    // ...
    }
    func (q *waitq) dequeue(sgp *sudog){
    // ...
    }
    

    sudog 是对被阻塞的 goroutine 的封装,简单看一下 channel 会使用到的一些字段

    // runtime/runtime2.go
    type sudog struct {
        g       *g              //阻塞的 goroutine
        elem    unsafe.Pointer
        c       *hchan          // 阻塞的 channel
    

    elem 字段是一个指针,在 channel 会被用来指向待发送者要发送的数据或者待接收者的接收位置

    // 从 ch 接收数据被阻塞,那么 sudog.elem 会指向 x
    x <- ch 
    
    // 向 ch 发送数据被阻塞,那么 sudog.elem 会指向 y
    ch <- y 
    

    makechan 创建 channel

    channel 分为无缓冲 channel 和 缓冲 channel,虽然两种 channel 的创建方式不同,但是都是调用 makechan

    ch := make(chan int)    // 无缓冲 channel
        
    ch := make(chan int, 10)// 有缓冲 channel
    

    makechan 函数会接受元素的类型和缓冲的大小,如果 size 为 0,就是无缓冲 channel 了

    // src/runtime/chan.go
    func makechan(t *chantype, size int) *hchan{
        elem := t.elem
        
        // 检查 elem size,align
        
        // 计算出缓冲区的大小,如果是非缓冲 channel 或者元素为 struct{},那么 mem 就是 0
        mem, overflow := math.MulUintptr(elem.size, uintptr(size))
        if overflow || mem > maxAlloc-hchanSize || size < 0{
            panic(plainError("makechan: size out of range"))
        }
        
        var c *hchan
        switch{
        // 非缓冲 channel 或者 缓冲区元素 为 struct{}
        case mem == 0:
            c = (*hchan)(mallocgc(hchanSize, nil, true))
            // 如果是非缓冲,则buf并没有用
            // 如果缓冲元素类型为 struct{}, 则只会用到 sendx 和 recvx, 并不会真正拷贝数据到缓冲区
            c.buf = unsafe.Pointer(&c.buf)
            
        // channel 中元素不包含指针
        case elem.ptrdata == 0:
            // 将 hchan 结构和缓冲区的内存一起分配
            c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
            // buf 指向 hchan 后边的地址
            c.buf = add(unsafe.Pointer(c), hchanSize)
            
        // 默认,分别分配 chan 和 buf 的内存    
        default:
            c = new(hchan)
            c.buf = mallocgc(mem, elem, true)
        }
        
        // 设置 hchan 的其他字段
        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        // 底层循环队列长度
        c.datasiz = uint(size)
        return c
    

    通过 makechan 函数,可以总结出 hchan 结构的特点

    • 无缓冲或者缓冲的元素类型为 struct{} 时,并不会为缓冲区(hcha.buf)分配内存
    • 缓冲的元素结构中不包含指针时,会将 hchan 和 缓冲区buf 是一块连续的内存

    make 与 makechan

    make 函数在编译阶段又是如何转换成 makechan 函数调用的呢

    首先编译器会将 make 的调用转换成 OMAKE 类型的节点,然后判断 make 的对象类型,如果是 TCHAN 的话,将节点类型置为 OMAKECHAN,并且检查 make 的第二个参数,也就是缓冲区大小

    // src/cmd/compile/internal/gc/typecheck.go
    func typecheck1(n *Node, top int) (res *Node) {
        // ...
        switch n.Op{
        case OMAKE:
            switch t.Etype {
            case TCHAN:
                l = nil
                if i < len(args){
                    // ... 对缓冲区大小进行检测
                    n.Left = l  // 带缓冲区,赋值缓冲区大小
                }else{
                    n.Left = nodintconst(0) // 不带缓冲区
                }
                n.Op = OMAKECHAN
            }
        }
    }
    

    然后OMAKECHAN 节点会在 walkexpr 函数中转换成调用 makechan 或者 makechan64 函数

    // src/cmd/compile/internal/gc/walk.go
    func walkexpr(n *Node, init *Nodes) *Node {
        switch n.Op {
        case OMAKECHAN:
            size := n.Left
            fnname := "makechan64"
            argtype := types.Types[TINT64]
    
            if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
                fnname = "makechan"
                argtype = types.Types[TINT]
            }
            n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
        }
    }
    

    发送数据

    向 channel 发送数据的语句会在编译期间转换成 chansend 函数

    ch := make(chan int)
    ch <- 10
    

    发送语句非常简单,但是真正的函数执行会区分很多的情况,做一些小的优化,可以称为特性

    发送操作的特性

    • 向 nil channel 发送数据会被永久阻塞,并且不会被 select 语句选中
    • 如果 channel 未关闭,非缓冲并且没有待接收的 goroutine,或者缓冲区已满,那么不会被 select 语句选中
    • 向关闭的 channel 发送数据,会 panic ,并且可以被 select 语句选中,意味着 select 语句中可能会 panic
    • 如果有待接收者,那么会将发送的数据直接 copy 到待接收者的接收位置,然后唤醒接收者
    • 如果有缓冲区,并且缓冲区未满,那么就把发送的数据 copy 到缓冲区中
    • 如果 channel 未关闭,缓冲区为空并且没有待接收者,那么直接阻塞当前 goroutine, 等待被唤醒
    • 发送者被阻塞后,可以被关闭 channel 操作或者被接收操作唤醒,关闭 channel 导致发送者被唤醒后,会panic
    • 当 channel 中有待接收 goroutine,那么 channel 的状态必然是 非缓冲或者缓冲区为空
    发送数据,可以被 select 选中的情况
    • channel 已关闭
    • channel 未关闭,channel有待接收的 goroutine,或者缓冲区不为空并且缓冲区未满

    深入源码

    ch <- i 发送语句实际会被转换为 chansend1

    func chansend1(c *hchan, elem unsafe.Pointer) {
        chansend(c, elem, true, getcallerpc())
    }
    

    chansend1 会直接调用 chansend 来发送数据,并且 block 为 true,说明 ch <- i 语句可以被阻塞

    // src/runtime/chan.go
    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 
    

    c 表示操作的 channel
    ep 是一个指针,指向发送的数据 ch <- i
    block 表示是否是阻塞调用,在 select case 语句中才会设置为 false
    callerpc 暂时不需要关心

    返回值是个 bool 类型,表示是否发送成功,未发送成功的操作也不会被 select 语句选中

    首先看一下 channel 为 nil 的情况,这时并不需要加锁

        if c == nil{
            if !block {
                // block 为 false, 则直接返回 false, 表示发送失败
                return false
            }
            // 对于 nil channel,直接挂起当前 goroutine,并永久阻塞
            gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
            // 不会执行到这一步
            throw("unreadable")
        }
    

    如果是非阻塞调用,也就是 select case 语句中调用,那么直接返回 false,意味着向 nil channel 发送数据不会被选中
    阻塞调用就被 gopark 挂起,永久阻塞


    在 channel 加锁之前,对于非阻塞并且未关闭的情况会有一步快速检测的判断,可以快速返回

        // 快速检测,非阻塞时,有些情况不需要获取锁就可以直接返回
        // 非阻塞,未关闭,非缓冲+没有等待接收的 goroutine 或者 缓冲+缓冲区已满
        if !block && c.closed == 0 &&
            ((c.dataqsiz == 0 && c.recvq.first == nil) ||
            ((c.dataqsiz < 0 && c.qcount == c.dataqsiz)) {
            // 返回 false,表示未发送成功
            return false
        }
    

    缓冲区没有空间,并且待接收的 goroutine 时,可以直接返回未发送成功


    加锁,判断 channel 是否关闭,如果已关闭,直接 panic

    // 加锁
        lock(&c.lock)
        
        // 如果 channel 已关闭,则 panic
        if c.closed != 0{
            unlock(&c.lock)
            panic(plainError("send on closed channel"))
        }
    

    channel 待接收队列中有等待的 goroutine

        lock(&c.lock)
        
        // ...
        
        // 从待接收队列中获取等待的 goroutine
        if sg := c.recvq.dequeue(); seq != nil {
    
            // 只要可以从待接收队列中获取到 goroutine,那么发送操作都是只需要 copy 一次
            send(c, sg, ep, func() { unlock(&c.lock) },  3)
            return true
        }
    

    如果待接收队列中有等待的接收者的话,说明 channel 的缓冲区为空
    调用 send 函数,无论是否是无缓冲 channel,都直接复制给待接收者

    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // sg.elem 是指向待接收 goroutine 中接收数据的指针 s <- ch
        // 如果待接收 goroutine 需要接收具体的数据,那么直接将数据 copy 到 sg.elem
        if sg.elem != nil{
            sendDirect(c.elemtype, sg, ep)
            sg.elem = nil
        }
        
        gp := sg.g
        unlockf()   // unlock(&c.lock)
        
        // 赋值 param,待接收者被唤醒后会根据 param 来判断是否是被发送者唤醒的
        gp.param = unsafe.Pointer(sg)
        goready(gp, skip+1) // 唤醒待接收者
    }
    

    会判断一下接收者是否需要接收数据,也就是 sudog.elem 是否为 nil
    如果不为 nil,就调用 sendDirect 把发送的数据(ep 指向的数据) 复制到 sudog.elem

    func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
        // src 是发送的数据源地址,dst 是接收数据的地址
        // src 在当前的 goroutine 栈中,而 dst 在其他栈上
        dst := sg.elem
        
        // 使用 memove 直接进行内存 copy
        // 因为 dst 指向其他 goroutine 的栈,如果它发生了栈收缩,那么就没有修改真正的 dst 位置
        // 所以会加读写前加一个屏障
        typebitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
        memove(dst, src, t.size)    
    

    sendDirect 在进行跨 goroutine 内存 copy 时,调用 typebitsBulkBarrier 来加上了写屏障
    因为 GC 会假设对栈的写操作只会发生在 goroutine 正在运行时,并且是由当前 goroutine 写的,
    sendDirect 跨 goroutine 的栈读写会违背这个假设,为了避免出现问题,需要加上写屏障


    缓冲区未满,直接将数据发送到缓冲区中

        lock(&c.lock)
        // ...
        
        if c.qcount < c.dataqsiz {
            //  获取缓冲发送数据的指针
            // add(c.buf, uintptr(i)*uintptr(c.elemsize))
            qp := chanbuf(c, c.sendx)
            
            // copy 数据,ep, gp 都是指针,分别指向数据源和数据目的地
            typedmemove(c.elemtype, qp, ep)
            
            // 递增存放发送数据的索引
            c.sendx++
            if c.sendx == c.dataqsiz{
                // 缓冲区是一个循环数组,调整索引
                c.sendx = 0
            }
            c.qcount++
            unlock(&c.lock)
            return true
        }
    

    chanbuf 函数通过 hchan.sendx 获取到缓冲区存放发送的数据的地址,然后调整循环数组的sendx 索引


    channel 未关闭,对于非缓冲 channel,待接收队列为空,对于缓冲 channel,缓冲区已满
    逻辑依次执行到这里:

        lock(&c.lock)
        // ...
        
        // 如果非阻塞发送,那么可以直接解锁返回,未发送成功
        if !block{
            unlock(&c.lock)
            return false
        }
        
        // 阻塞发送,那么就挂起当前 goroutine
        gp := getg()
        // 生成配置 sudo,省略部分赋值操作
        mysg := acquireSudog()
        mysg.elem = ep  // 将指向发送数据的指针保存到 elem 中
        mysg.g = gp
        mysg.c = c  // 当前阻塞的 channel
        gp.wait = mysg
        
        // param 可以用来传递数据,其他 goroutine 唤醒该 goroutine 时可以设置该字段,然后根据该字段做一些判断
        pg.param = nil  
        
        // 入队待发送队列
        c.sendq.enqueue(mysg)
    
        // 挂起goroutine,等待唤醒
        // chanparkcommit 函数会解锁 ch.lock
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    

    非阻塞的话,会直接返回为发送成功
    阻塞调用,则会构建 sudog 对象,然后添加到待发送队列,解锁,挂起当前 goroutine

    会被唤醒的情况有两种

    • 关闭 channel
    • 发生接收操作,接收者可能会唤醒该发送者
        // 被唤醒,执行检查清理操作
        // ...
        
        // param 字段为 nil 表示是由于 close channel 导致的关闭,panic
        // close channel 和接收操作都可能唤醒等待发送的 goroutine, 但是他们设置 param 不一样
        if gp.param == nil {
            if c.closed = 0 {
                throw("chansend: suprious wakeup")
            }
            panic(plainError("send on closed channel"))
        }
        // 清理,释放 sudog
        pg.param == nil
        mysq.c = nil
        releaseSudog(mysg)
        // 发送成功
        return true
    }
    

    被唤醒后会判断 g.param 是否为 nil,因为关闭 channel 时会将待发送 goroutine 的 param 字段置为 nil,会根据这个字段决定是否 panic

    select & 发送操作

    golang 会对 select 语句进行一些优化

    单个发送 case

    select {
    case ch <- i:
        // ...
    }
    
    // 会被优化为
    
    if ch == nil {
        block()
    }
    ch <- i
    

    会在编译期间转换为阻塞发送语句

    非阻塞操作,发送 + default

    select {
    case ch <- i:
        // ...
    default:
        // ...
    }
    
    // =====>
    
    if selectnbsend(ch, i) {
        // ...
    } else {
        // ...
    }
    

    非阻塞操作实际调用 selectnbsend,根据函数返回值决定是否执行 default 逻辑

    func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
        // block 参数为 false,非阻塞调用
        return chansend(c,elem, false, getcallerpc())
    }
    

    返回 false 表示未发送成功,select 便会执行 default

    思考:为什么向关闭的 channel 发送数据需要 panic

    接收数据

    如何从 channel 中接收数据

    // 接收单个值,如果 channel 被关闭后,会返回 channel 中元素的零值
    i <- ch // 调用 `chanrecv1` 函数
    
    // 如果 channel 被关闭并且缓冲区为空,那么 ok 的值就是 false
    i, ok <- ch // 调用 `chanrecv2` 函数
    

    i 是接收操作的接收值ok 表示是否从 channel 中接收到有效的数据,即使 channel 已经关闭,但是缓冲区中依然存在数据,那么 ok 也会是 true

    接收操作的特性

    • 从 nil channel 中接收数据会永久阻塞,而且不会被select 语句选中
    • 如果 channel 未关闭,没有待发送者或者缓冲 channel 的缓冲区为空的话,不会被 select 语句选中
    • 从已关闭并且缓冲区为空的 channel 中接收数据的话,会把接收值置为空值,而且可以被 select 语句选中
    • 如果待发送队列不为空,说明无缓冲或者缓冲已满,对于无缓冲直接从待发送者复制数据到接收值,如果缓冲区已满,那么先将缓冲区中数据复制给接收者,然后将待发送者的数据复制到缓冲区中并唤醒发送者
    • 只要缓冲区不为空,即使channel已关闭,依然可以从缓冲区中获取到数据
    • 如果缓冲为空并且没有待发送者,不会被 select 语句选中,如果是阻塞接收操作的话,会被阻塞直到 channel 被关闭或者被发送者唤醒
    • 接收者被关闭操作唤醒,那么接收值会被置为空值
    接收操作被 select 语句选中的情况
    • channel 已关闭
    • 缓冲区中有数据
    • 待发送队列不为空

    深入源码

    单值的接收语句实际调用 chanrecv1

    // src/runtime/chan.go
    i <- ch
    
    // ===>
    
    func chanrecv1(c *hchan, elem unsafe.Pointer){
        chanrecv(c, elem, true)
    }
    

    接收两个值实际调用 chanrecv2

    i, ok <- ch
    
    // ===>
    
    func chanrecv2(c *hchan, elem unsafe.Pointer)(received bool) {
        _, received = chanrecv(c, elem, true)
    }
    

    chanrecv1chanrecv2 实际都是调用 chanrecv ,他们两个之间的区别就是是否返回接收到有效数据


    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
    

    c 表示接收操作的 channel
    ep 是一个指针,指向接收值i <- ch语句 ep 就是 接收值 i 的地址
    block 是否是阻塞操作,chanrecv1chanrecv2 函数中block为 true,说明是阻塞操作

    返回值 selected 表示是否可以被 select 语句选中
    返回值 received 表示是否可以接收到有效数据


    **channel 在加锁前会判断一下是否为 nil **

        if c == nil {
            // 非阻塞下会直接返回
            if !block {
                return
            }
            // 永久挂起
            gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
    

    阻塞接收会被永久阻塞,非阻塞的话就直接返回,而且不会被 select 选中


    阻塞接收时,对于未关闭 channel 满足一些条件不需要加锁就可以直接返回

        // 快速检测,在非阻塞模式下,和发送一样有些条件不需要加锁就可以直接判断返回
        // 非阻塞并且未关闭,非缓冲+没有待发送者或者有缓冲+缓冲为空
        if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
            c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
            atomic.Load(&c.closed) == 0 {
            return
        }
    
    • 非缓冲 channel 如果没有待发送者
    • 缓冲 channel 但是缓冲区为空

    加锁,首先判断 channel 是否已关闭,缓冲区中是否还有数据

        lock(&c.lock)
        
        // channel 处于关闭,并且缓冲区已空
        if c.closed != 0 && c.qcount == 0{
            unlock(&c.lock)
            if ep != nil{
                // 如果接收的值需要赋值到变量 x <- ch
                // 将接收的值置为空值
                typedmemclr(c.elemtype, ep)
            }
            // 可被 select 语句选中,但是未接收到有效数据
            return true, false
        }
    

    channel 已经关闭,而且缓冲区没有数据,如果 ep 不为nil ,也就是说存在接收值,那么就把接收值置为空值

    ep 为空的情况是 <- chan 接收操作没有接收值

    selected 返回 true,表示可以被 select 语句选中


    待发送队列不为空,存在待发送者

        lock(&c.lock)
        // ...
        
        // 待发送队列中有 goroutine,说明是非缓冲 channel 或者 缓冲已满的 channel
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func(){ unlock(&c.lock) }, 3)
            return true, true   // 可被选中,并且接收成功
        }
    

    如果待发送队列中有等待发送的 goroutine,说明 channel 是非缓冲channel,或者缓冲区已经满了

    • 非缓冲channel,会将数据从待发送者复制给接收者
    • 缓冲区已满的话,会先从缓冲区中接收数据,然后将待发送者的数据发送到缓冲区中

    这里和发送操作时,channel 的待接收队列不为空的情况不一样,因为待接收队列不为空,说明缓冲区肯定是没有数据的,可以跳过缓冲区,直接将数据发送到等待接收的 goroutine

    因为要区分 channel 的类型所以 recv 函数的逻辑就会有一点复杂
    对于非缓冲 channel,如果有接收值,直接调用 recvDirect 从待发送者复制值

    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // 无缓冲 channel
        if c.dataqsiz == 0 {
            // 如果ep 不为 nil,那么直接从发送 goroutine 中将数据 copy 到接收位置
            if ep != nil{
                recvDirect(c.elemtype, sg, ep)
            }
        } 
    

    对于缓冲区有数据的情况

    • 先从缓冲区复制数据到接收值,也就是 ep 指向的地址
    • 然后将待发送者要发送的数据复制到缓冲区中
    • 调整缓冲区循环数据的接收索引 recvx
        } else {
            // 获取缓冲区中待接收的地址
            gp := chanbuf(c, c.recvx)
            if ep != nil {
                // 将待接收数据复制到接收位置
                typedmemmove(c.elemtype, ep, qp)
            }
            // 将待发送者发送的数据复制到相应缓冲区的位置
            typedmemmove(c.elemtype, qp,sq.elem)
            // 调整 recvx
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            // 由于缓冲区已满,sendx 和 recvx 必然相等
            c.sendx = c.recvx
        }
    

    无论是缓冲还是非缓冲 channel,recv 函数最后都会唤醒发送者

        // 赋值发送者的 param,发送者被唤醒后会根据 param 来判断是否是关闭唤醒的
        sg.elem = nil
        gp := sg,g
        unlockf()
        gp.param = unsafe.Pointer(sg)
        goready(gp, skip+1)
    }
    

    接收操作会赋值发送者 goroutine 的 param 字段,发送者被唤醒后,会根据 param 参数来判断是有接收操作唤醒还是被关闭 channel 操作唤醒


    缓冲区中有数据,无论 channel 被关闭,都会发送给接收者

        lock(&c.lock)
        // ...
        
        // 如果缓冲区不为空,依然有未发送的数据
        // 需要注意,这时 channel 可能已经处于关闭状态了,但是依然可以从关闭的缓冲区中接收到数据
        if c.qcount > 0{
            // 获取指向缓冲区中待接收数据的指针
            gp ;= chanbuf(c, c.recvx)
            if ep != nil{
                // 如果接收操作有接收值,那么直接 copy 到 ep
                typedmemmove(c.elemtype, ep, gp)
            }
            // 清理缓冲区中已接收到的数据内存
            typedememclr(c.elemtype, gp)
            // 调整待接收索引
            c.recv++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.qcount--
            unlock(&c.lock)
            // 可以被选中,并且接收成功
            return true, true
        }
    

    这一部分的逻辑就比较简单

    • 获取缓冲区的待接收数据的地址 gp,如果有接收者,便将数据复制给接收者
    • 调整缓冲区循环数据的待接收索引recvx

    channel 未关闭, 缓冲区没有元素,并且没有待接收者
    非阻塞操作,可以直接解锁返回,并且不会被 select 语句选中

        lock(&c.lock)
        // ...
    
        // 缓冲区没有元素并且没有待发送者
        if !block {
            unblock(&c.block)
            // 不会被选中,并且没有接收到有效数据
            return false, false
        }
    

    阻塞操作,挂起当前 goroutine,等待被发送操作或者关闭操作唤醒

        lock(&c.lock)
        // ...
    
        gp = getg()
        mysg := acquireSudog()
        mysg.elem = ep
        mysg.g = gp
        mysg.c = c
        gp.param = nil
    
        // 入队到待发送者队列中
        c.recvq.enqueue(mysg)
        // 挂起 goroutine,等待由关闭操作或者发送操作唤醒
        goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
        
        // 被唤醒,做一些检测,和清理操作
        
        // 根据 param 判断是否是由关闭唤醒的
        // 有 closed 唤醒时,param 会被置为 nil
        closed := gp.param == nil   
        pg.param = nil
        mysg.c = nil
        releaseSudog(mysg)
        
        // 可以被选中,但是 closed 反应是否接受到有效数据
        return true, !closed
    }
    

    被唤醒后会根据 param 字段,判断是否是由关闭操作唤醒


    select 与 接收操作

    单个接收 case
    select {
    case i <- ch:
    }
    // ====>
    
    if ch == nil{
        block()
    }
    i <- ch
    
    非阻塞接收
    select {
    case v <- ch: // case v, received <- ch:
        // ...
    default:
        // ...
    }
    // ===>
    
    // if ch != nil && selectnbrecv2(&v, &ok, ch) {
    if selectnbrecv(&v, ch) { 
        // ...
    } else {
        // ...
    }
    

    非阻塞接收会调用 selectnbrecvselectnbrecv2

    func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
        selected, _ = chanrecv(c, elem, false)
        return
    }
    func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
        elected, *received = chanrecv(c, elem, false)
        return
    }
    

    关闭 channel

    关闭 channel 直接调用 close 函数即可,但是贸然关闭 channel 会引发很多的问题

    ch := make(chan int)
    
    // 关闭 goroutine
    close(ch)
    

    关闭操作的特性

    • 关闭 nil channel 会 panic
    • 关闭已关闭的 channel 会 panic
    • 关闭操作会将待接收者的接收值置为空值,唤醒所有待发送者和待接收者

    关于如何优雅的关闭 channel,可以看一下 go101如何优雅地关闭通道

    深入源码

    关闭 nil channel 会panic

    func closechan(c *hchan) {
        // 关闭 nil channel 会 panic
        if c == nil{
            panic(plainError("close of nil channel"))
        }
    

    重复关闭 channel,也会 panic

        // 加锁
        lock(&c.lock)
        if c.closed != 0 {
            // 重复关闭会 panic
            unlock(&c.lock)
            panic(plainError("close of closed channel"))
        }
    

    需要注意关闭操作中,判断 channel 是否关闭前会加锁


    处理待接收者,如果有接收者,那么就置为空值

        c.closed = 1
    
        var glist gList
        // 处理待接收者
        for {
            sg := c.recvq.dequeue()
            if sg == nil {
                break
            }
            if sg.elem != nil {
                // 将待接收位置置为空值
                typedmemclr(c.elemtype, sg.elem)
                sg.elem = nil   //  清理 elem 指针
            }
            gp := sg.g
            // param 置为 nil,接收者被唤醒后会返回未接收到有效数据
            gp.param = nil
            glist.push(gp)
        }
    

    处理待发送者

        // 处理待发送的队列
        for {
            sg := c.sendq.dequeue()
            if sg == nil {
                // 没有待发送的goroutine了
                break
            }
            sg.elem = nil
            gp := sg.g
            // 将 param 置为 nil, 待发送者被唤醒后,会 panic
            gp.param = nil
            glist.push(gp)
        }
    

    解锁,唤醒所有待发送者和待接收者

        unlock(&c.lock)
        
        // 唤醒所有阻塞的 goroutine
        for !glist.empty(){
            gp := glist.pop()
            gpready(gp, 3)
        }
    }
    

    关闭操作唤醒 channel 中阻塞的 goroutine

    在处理待发送者和待接收者时,都会将 goroutine 的 param 字段置为 nil,然后当被唤醒后待发送者和待接收者就能区分如何被唤醒的

    发送操作

    // runtime/chan.go
    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        // ...
    
        // 阻塞,挂起 goroutine
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    
        if gp.param == nil {
            if c.closed = 0 {
                throw("chansend: suprious wakeup")
            }
            panic(plainError("send on closed channel"))
        }
        // ...
    

    可以看到发送操作被唤醒后会判断 param 字段
    如果是由于 channel 关闭导致被唤醒,那么直接 panic

    • 关闭操作唤醒,goroutine param 字段为 nil
    func closechan(c *hchan) {
        // ...
        for {
            sg := c.recvq.dequeue()
            // ...
            pg := sg.pg
            gp.param = nil
            // ...
        }
        // ... 唤醒 goroutine
    }
    
    • 接收操作唤醒,goroutine param 不为 nil
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // ... 数据复制
        
        pg := sg.g
        pg.param = unsafe.Pointer(sg)
        goready(gp, skip+1)
    }
        
    

    接收操作

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
        // ...
        
        // 阻塞,挂起当前 goroutine
        goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
        // 被唤醒
        // ...
        
        closed := gp.parma == nil
        // ...
        return true, !closed
    

    接收操作在关闭后并不会 panic,而是会作为 received 返回,表示是否接收到有效的数据

    参考资料

    深度解密Go语言之channel
    Go 语言设计与实现 —— Channel

    推荐阅读

    Go101 通道
    如何优雅地关闭通道
    浅谈 Go 语言 select 的实现原理
    图解Go的channel底层原理
    走进Golang之Channel的使用
    走进Golang之Channel的数据结构

    相关文章

      网友评论

          本文标题:Go 深入源码 —— channel

          本文链接:https://www.haomeiwen.com/subject/wbrbvktx.html