美文网首页
golang笔记——channel底层原理

golang笔记——channel底层原理

作者: 无昵称啊 | 来源:发表于2022-08-25 22:09 被阅读0次

    一、什么是CSP

    Do not communicate by sharing memory; instead, share memory by communicating.

    不要通过共享内存来通信,而要通过通信来实现内存共享。
    这就是 Go 的并发哲学,它依赖 CSP 模型,基于 channel 实现。

    CSP 经常被认为是 Go 在并发编程上成功的关键因素。CSP 全称是 “Communicating Sequential Processes”,这也是 Tony Hoare 在 1978 年发表在 ACM 的一篇论文。论文里指出一门编程语言应该重视 input 和 output 的原语,尤其是并发编程的代码。

    Go 一开始就把 CSP 的思想融入到语言的核心里,所以并发编程成为 Go 的一个独特的优势,而且很容易理解。

    Go 的并发原则非常优秀,目标就是简单:尽量使用 channel;把 goroutine 当作免费的资源,随便用。

    二、channel底层数据结构

    底层数据结构源码:

    type hchan struct {
        // chan 里元素数量
        qcount   uint
        // chan 底层循环数组的长度
        dataqsiz uint
        // 指向底层循环数组的指针
        // 只针对有缓冲的 channel
        buf      unsafe.Pointer
        // chan 中元素大小
        elemsize uint16
        // chan 是否被关闭的标志
        closed   uint32
        // chan 中元素类型
        elemtype *_type // element type
        //有缓冲channel内的缓冲数组会被作为一个“环型”来使用。
        //当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
        sendx    uint   // 下一次发送数据的下标位置
        recvx    uint   // 下一次读取数据的下标位置
        //当循环数组中没有数据时,收到了接收请求,那么接收数据的变量地址将会写入读等待队列
        //当循环数组中数据已满时,收到了发送请求,那么发送数据的变量地址将写入写等待队列
        recvq    waitq  // 读等待队列
        sendq    waitq  // 写等待队列
    
        // 保护 hchan 中所有字段
        lock mutex
    }
    

    waitqsudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装:

    type waitq struct {
        first *sudog
        last  *sudog
    }
    

    例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下 :


    总结hchan结构体的主要组成部分有四个:

    • 用来保存goroutine之间传递数据的循环链表。=====> buf。
    • 用来记录此循环链表当前发送或接收数据的下标值。=====> sendx和recvx。
    • 用于保存向该chan发送和从改chan接收数据的goroutine的队列。=====> sendq 和 recvq
    • 保证channel写入和读取数据时线程安全的锁。 =====> lock

    创建

    我们知道,通道有两个方向,发送和接收。理论上来说,我们可以创建一个只发送或只接收的通道,但是这种通道创建出来后,怎么使用呢?一个只能发的通道,怎么接收呢?同样,一个只能收的通道,如何向其发送数据呢?

    创建 chan 的函数是 makechan:

    const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    
    func makechan(t *chantype, size int64) *hchan {
        elem := t.elem
    
        // 省略了检查 channel size,align 的代码
        // ……
    
        var c *hchan
        // 如果元素类型不含指针 或者 size 大小为 0(无缓冲类型)
        // 只进行一次内存分配
        if elem.kind&kindNoPointers != 0 || size == 0 {
            // 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
            // 只分配 "hchan 结构体大小 + 元素大小*个数" 的内存
            c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
            // 如果是缓冲型 channel 且元素大小不等于 0(大小等于 0的元素类型:struct{})
            if size > 0 && elem.size != 0 {
                c.buf = add(unsafe.Pointer(c), hchanSize)
            } else {
                // race detector uses this location for synchronization
                // Also prevents us from pointing beyond the allocation (see issue 9401).
                // 1. 非缓冲型的,buf 没用,直接指向 chan 起始地址处
                // 2. 缓冲型的,能进入到这里,说明元素无指针且元素类型为 struct{},也无影响
                // 因为只会用到接收和发送游标,不会真正拷贝东西到 c.buf 处(这会覆盖 chan的内容)
                c.buf = unsafe.Pointer(c)
            }
        } else {
            // 进行两次内存分配操作
            c = new(hchan)
            c.buf = newarray(elem, int(size))
        }
        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        // 循环数组长度
        c.dataqsiz = uint(size)
    
        // 返回 hchan 指针
        return c
    }
    

    从函数原型来看,创建的 chan 是一个指针。所以我们能在函数间直接传递 channel,而不用传递 channel 的指针。

    新建一个 chan 后,内存在堆上分配,大概长这样:


    三、向channel发送数据

    源码分析

    发送操作最终转化为 chansend 函数,直接上源码,同样大部分都注释了,可以看懂主流程:

    // 位于 src/runtime/chan.go
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        // 如果 channel 是 nil
        if c == nil {
            // 不能阻塞,直接返回 false,表示未发送成功
            if !block {
                return false
            }
            // 当前 goroutine 被挂起
            gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
            throw("unreachable")
        }
    
        // 省略 debug 相关……
    
        // 对于不阻塞的 send,快速检测失败场景
        //
        // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
        // 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
        // 2. channel 是缓冲型的,但循环数组已经装满了元素
        if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
            (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
            return false
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        // 锁住 channel,并发安全
        lock(&c.lock)
    
        // 如果 channel 关闭了
        if c.closed != 0 {
            // 解锁
            unlock(&c.lock)
            // 直接 panic
            panic(plainError("send on closed channel"))
        }
    
        // 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
        if sg := c.recvq.dequeue(); sg != nil {
            send(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true
        }
    
        // 对于缓冲型的 channel,如果还有缓冲空间
        if c.qcount < c.dataqsiz {
            // qp 指向 buf 的 sendx 位置
            qp := chanbuf(c, c.sendx)
    
            // ……
    
            // 将数据从 ep 处拷贝到 qp
            typedmemmove(c.elemtype, qp, ep)
            // 发送游标值加 1
            c.sendx++
            // 如果发送游标值等于容量值,游标值归 0
            if c.sendx == c.dataqsiz {
                c.sendx = 0
            }
            // 缓冲区的元素数量加一
            c.qcount++
    
            // 解锁
            unlock(&c.lock)
            return true
        }
    
        // 如果不需要阻塞,则直接返回错误
        if !block {
            unlock(&c.lock)
            return false
        }
    
        // channel 满了,发送方会被阻塞。接下来会构造一个 sudog
    
        // 获取当前 goroutine 的指针
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
    
        mysg.elem = ep
        mysg.waitlink = nil
        mysg.g = gp
        mysg.selectdone = nil
        mysg.c = c
        gp.waiting = mysg
        gp.param = nil
    
        // 当前 goroutine 进入发送等待队列
        c.sendq.enqueue(mysg)
    
        // 当前 goroutine 被挂起
        goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
    
        // 从这里开始被唤醒了(channel 有机会可以发送了)
        if mysg != gp.waiting {
            throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        if gp.param == nil {
            if c.closed == 0 {
                throw("chansend: spurious wakeup")
            }
            // 被唤醒后,channel 关闭了。坑爹啊,panic
            panic(plainError("send on closed channel"))
        }
        gp.param = nil
        if mysg.releasetime > 0 {
            blockevent(mysg.releasetime-t0, 2)
        }
        // 去掉 mysg 上绑定的 channel
        mysg.c = nil
        releaseSudog(mysg)
        return true
    }
    

    上面的代码注释地比较详细了,我们来详细看看。

    • 如果检测到 channel 是空的,当前 goroutine 会被挂起。
    • 对于不阻塞的发送操作,如果 channel 未关闭并且没有多余的缓冲空间(说明:a. channel 是非缓冲型的,且等待接收队列里没有 goroutine;b. channel 是缓冲型的,但循环数组已经装满了元素)

    对于这一点,runtime 源码里注释了很多。这一条判断语句是为了在不阻塞发送的场景下快速检测到发送失败,好快速返回。

    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }
    
    1. 如果检测到 channel 已经关闭,直接 panic。
    2. 如果能从等待接收队列 recvq 里出队一个 sudog(代表一个 goroutine),说明此时 channel 是空的,没有元素,所以才会有等待接收者。这时会调用 send 函数将元素直接从发送者的栈拷贝到接收者的栈,关键操作由 sendDirect 函数完成。
    // send 函数处理向一个空的 channel 发送操作
    
    // ep 指向被发送的元素,会被直接拷贝到接收的 goroutine
    // 之后,接收的 goroutine 会被唤醒
    // c 必须是空的(因为等待队列里有 goroutine,肯定是空的)
    // c 必须被上锁,发送操作执行完后,会使用 unlockf 函数解锁
    // sg 必须已经从等待队列里取出来了
    // ep 必须是非空,并且它指向堆或调用者的栈
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // 省略一些用不到的
        // ……
    
        // sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
        if sg.elem != nil {
            // 直接拷贝内存(从发送者到接收者)
            sendDirect(c.elemtype, sg, ep)
            sg.elem = nil
        }
        // sudog 上绑定的 goroutine
        gp := sg.g
        // 解锁
        unlockf()
        gp.param = unsafe.Pointer(sg)
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        // 唤醒接收的 goroutine. skip 和打印栈相关,暂时不理会
        goready(gp, skip+1)
    }
    

    继续看 sendDirect 函数:

    // 向一个非缓冲型的 channel 发送数据、从一个无元素的(非缓冲型或缓冲型但空)的 channel
    // 接收数据,都会导致一个 goroutine 直接操作另一个 goroutine 的栈
    // 由于 GC 假设对栈的写操作只能发生在 goroutine 正在运行中并且由当前 goroutine 来写
    // 所以这里实际上违反了这个假设。可能会造成一些问题,所以需要用到写屏障来规避
    func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
        // src 在当前 goroutine 的栈上,dst 是另一个 goroutine 的栈
    
        // 直接进行内存"搬迁"
        // 如果目标地址的栈发生了栈收缩,当我们读出了 sg.elem 后
        // 就不能修改真正的 dst 位置的值了
        // 因此需要在读和写之前加上一个屏障
        dst := sg.elem
        typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
        memmove(dst, src, t.size)
    }
    

    这里涉及到一个 goroutine 直接写另一个 goroutine 栈的操作,一般而言,不同 goroutine 的栈是各自独有的。而这也违反了 GC 的一些假设。为了不出问题,写的过程中增加了写屏障,保证正确地完成写操作。这样做的好处是减少了一次内存 copy:不用先拷贝到 channel 的 buf,直接由发送者到接收者,没有中间商赚差价,效率得以提高,完美。

    然后,解锁、唤醒接收者,等待调度器的光临,接收者也得以重见天日,可以继续执行接收操作之后的代码了。

    • 如果 c.qcount < c.dataqsiz,说明缓冲区可用(肯定是缓冲型的 channel)。先通过函数取出待发送元素应该去到的位置:
    qp := chanbuf(c, c.sendx)
    
    // 返回循环队列里第 i 个元素的地址处
    func chanbuf(c *hchan, i uint) unsafe.Pointer {
        return add(c.buf, uintptr(i)*uintptr(c.elemsize))
    }
    

    c.sendx 指向下一个待发送元素在循环数组中的位置,然后调用 typedmemmove 函数将其拷贝到循环数组中。之后 c.sendx 加 1,元素总量加 1 :c.qcount++,最后,解锁并返回。

    • 如果没有命中以上条件的,说明 channel 已经满了。不管这个 channel 是缓冲型的还是非缓冲型的,都要将这个 sender “关起来”(goroutine 被阻塞)。如果 block 为 false,直接解锁,返回 false。
    • 最后就是真的需要被阻塞的情况。先构造一个 sudog,将其入队(channel 的 sendq 字段)。然后调用 goparkunlock 将当前 goroutine 挂起,并解锁,等待合适的时机再唤醒。

    唤醒之后,从 goparkunlock 下一行代码开始继续往下执行。

    这里有一些绑定操作,sudog 通过 g 字段绑定 goroutine,而 goroutine 通过 waiting 绑定 sudog,sudog 还通过 elem 字段绑定待发送元素的地址,以及 c 字段绑定被“坑”在此处的 channel。

    所以,待发送的元素地址其实是存储在 sudog 结构体里,也就是当前 goroutine 里。

    四、从channel接收数据

    接收操作有两种写法,一种带 “ok”,反应 channel 是否关闭;一种不带 “ok”,这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。两种写法,都有各自的应用场景。

    经过编译器的处理后,这两种写法最后对应源码里的这两个函数:

    // entry points for <- c from compiled code
    func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
    }
    
    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
    }
    

    chanrecv1 函数处理不带 “ok” 的情形,chanrecv2 则通过返回 “received” 这个字段来反应 channel 是否被关闭。接收值则比较特殊,会“放到”参数 elem 所指向的地址了,这很像 C/C++ 里的写法。如果代码里忽略了接收值,这里的 elem 为 nil。

    无论如何,最终转向了 chanrecv 函数:

    // chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
    // 如果 ep 是 nil,说明忽略了接收值。
    // 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
    // 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
    // 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
    // 如果 ep 非空,则应该指向堆或者函数调用者的栈
    
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
        // 省略 debug 内容 …………
    
        // 如果是一个 nil 的 channel
        if c == nil {
            // 如果不阻塞,直接返回 (false, false)
            if !block {
                return
            }
            // 否则,接收一个 nil 的 channel,goroutine 挂起
            gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
            // 不会执行到这里
            throw("unreachable")
        }
    
        // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
        // 当我们观察到 channel 没准备好接收:
        // 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
        // 2. 缓冲型,但 buf 里没有元素
        // 之后,又观察到 closed == 0,即 channel 未关闭。
        // 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
        // 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
        if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
            c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
            atomic.Load(&c.closed) == 0 {
            return
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        // 加锁
        lock(&c.lock)
    
        // channel 已关闭,并且循环数组 buf 里没有元素
        // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
        // 也就是说即使是关闭状态,但在缓冲型的 channel,
        // buf 里有元素的情况下还能接收到元素
        if c.closed != 0 && c.qcount == 0 {
            if raceenabled {
                raceacquire(unsafe.Pointer(c))
            }
            // 解锁
            unlock(&c.lock)
            if ep != nil {
                // 从一个已关闭的 channel 执行接收操作,且未忽略返回值
                // 那么接收的值将是一个该类型的零值
                // typedmemclr 根据类型清理相应地址的内存
                typedmemclr(c.elemtype, ep)
            }
            // 从一个已关闭的 channel 接收,selected 会返回true
            return true, false
        }
    
        // 等待发送队列里有 goroutine 存在,说明 buf 是满的
        // 这有可能是:
        // 1. 非缓冲型的 channel
        // 2. 缓冲型的 channel,但 buf 满了
        // 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
        // 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
        if sg := c.sendq.dequeue(); sg != nil {
            // Found a waiting sender. If buffer is size 0, receive value
            // directly from sender. Otherwise, receive from head of queue
            // and add sender's value to the tail of the queue (both map to
            // the same buffer slot because the queue is full).
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    
        // 缓冲型,buf 里有元素,可以正常接收
        if c.qcount > 0 {
            // 直接从循环数组里找到要接收的元素
            qp := chanbuf(c, c.recvx)
    
            // …………
    
            // 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
            if ep != nil {
                typedmemmove(c.elemtype, ep, qp)
            }
            // 清理掉循环数组里相应位置的值
            typedmemclr(c.elemtype, qp)
            // 接收游标向前移动
            c.recvx++
            // 接收游标归零
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            // buf 数组里的元素个数减 1
            c.qcount--
            // 解锁
            unlock(&c.lock)
            return true, true
        }
    
        if !block {
            // 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
            unlock(&c.lock)
            return false, false
        }
    
        // 接下来就是要被阻塞的情况了
        // 构造一个 sudog
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
    
        // 待接收数据的地址保存下来
        mysg.elem = ep
        mysg.waitlink = nil
        gp.waiting = mysg
        mysg.g = gp
        mysg.selectdone = nil
        mysg.c = c
        gp.param = nil
        // 进入channel 的等待接收队列
        c.recvq.enqueue(mysg)
        // 将当前 goroutine 挂起
        goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
    
        // 被唤醒了,接着从这里继续执行一些扫尾工作
        if mysg != gp.waiting {
            throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        if mysg.releasetime > 0 {
            blockevent(mysg.releasetime-t0, 2)
        }
        closed := gp.param == nil
        gp.param = nil
        mysg.c = nil
        releaseSudog(mysg)
        return true, !closed
    }
    
    1. 如果 channel 是一个空值(nil),在非阻塞模式下,会直接返回。在阻塞模式下,会调用 gopark 函数挂起 goroutine,这个会一直阻塞下去。因为在 channel 是 nil 的情况下,要想不阻塞,只有关闭它,但关闭一个 nil 的 channel 又会发生 panic,所以没有机会被唤醒了。更详细地可以在 closechan 函数的时候再看。
    2. 和发送函数一样,接下来搞了一个在非阻塞模式下,不用获取锁,快速检测到失败并且返回的操作。顺带插一句,我们平时在写代码的时候,找到一些边界条件,快速返回,能让代码逻辑更清晰,因为接下来的正常情况就比较少,更聚焦了,看代码的人也更能专注地看核心代码逻辑了。
    // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回 (false, false)
        if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
            c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
            atomic.Load(&c.closed) == 0 {
            return
        }
    

    当我们观察到 channel 没准备好接收:

    • 非缓冲型,等待发送列队里没有 goroutine 在等待
    • 缓冲型,但 buf 里没有元素

    之后,又观察到 closed == 0,即 channel 未关闭。
    因为 channel 不可能被重复打开,所以前一个观测的时候, channel 也是未关闭的,因此在这种情况下可以直接宣布接收失败,快速返回。因为没被选中,也没接收到数据,所以返回值为 (false, false)。

    接下来的操作,首先会上一把锁,粒度比较大。如果 channel 已关闭,并且循环数组 buf 里没有元素。对应非缓冲型关闭和缓冲型关闭但 buf 无元素的情况,返回对应类型的零值,但 received 标识是 false,告诉调用者此 channel 已关闭,你取出来的值并不是正常由发送者发送过来的数据。但是如果处于 select 语境下,这种情况是被选中了的。很多将 channel 用作通知信号的场景就是命中了这里。

    接下来,如果有等待发送的队列,说明 channel 已经满了,要么是非缓冲型的 channel,要么是缓冲型的 channel,但 buf 满了。这两种情况下都可以正常接收数据。

    于是,调用 recv 函数:

    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // 如果是非缓冲型的 channel
        if c.dataqsiz == 0 {
            if raceenabled {
                racesync(c, sg)
            }
            // 未忽略接收的数据
            if ep != nil {
                // 直接拷贝数据,从 sender goroutine -> receiver goroutine
                recvDirect(c.elemtype, sg, ep)
            }
        } else {
            // 缓冲型的 channel,但 buf 已满。
            // 将循环数组 buf 队首的元素拷贝到接收数据的地址
            // 将发送者的数据入队。实际上这时 revx 和 sendx 值相等
            // 找到接收游标
            qp := chanbuf(c, c.recvx)
            // …………
            // 将接收游标处的数据拷贝给接收者
            if ep != nil {
                typedmemmove(c.elemtype, ep, qp)
            }
    
            // 将发送者数据拷贝到 buf
            typedmemmove(c.elemtype, qp, sg.elem)
            // 更新游标值
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx
        }
        sg.elem = nil
        gp := sg.g
    
        // 解锁
        unlockf()
        gp.param = unsafe.Pointer(sg)
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
    
        // 唤醒发送的 goroutine。需要等到调度器的光临
        goready(gp, skip+1)
    }
    

    如果是非缓冲型的,就直接从发送者的栈拷贝到接收者的栈。

    func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
        // dst is on our stack or the heap, src is on another stack.
        src := sg.elem
        typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
        memmove(dst, src, t.size)
    }
    

    否则,就是缓冲型 channel,而 buf 又满了的情形。说明发送游标和接收游标重合了,因此需要先找到接收游标:

    // chanbuf(c, i) is pointer to the i'th slot in the buffer.
    func chanbuf(c *hchan, i uint) unsafe.Pointer {
        return add(c.buf, uintptr(i)*uintptr(c.elemsize))
    }
    

    将该处的元素拷贝到接收地址。然后将发送者待发送的数据拷贝到接收游标处。这样就完成了接收数据和发送数据的操作。接着,分别将发送游标和接收游标向前进一,如果发生“环绕”,再从 0 开始。

    最后,取出 sudog 里的 goroutine,调用 goready 将其状态改成 “runnable”,待发送者被唤醒,等待调度器的调度。

    然后,如果 channel 的 buf 里还有数据,说明可以比较正常地接收。注意,这里,即使是在 channel 已经关闭的情况下,也是可以走到这里的。这一步比较简单,正常地将 buf 里接收游标处的数据拷贝到接收数据的地址。

    到了最后一步,走到这里来的情形是要阻塞的。当然,如果 block 传进来的值是 false,那就不阻塞,直接返回就好了。

    先构造一个 sudog,接着就是保存各种值了。注意,这里会将接收数据的地址存储到了 elem 字段,当被唤醒时,接收到的数据就会保存到这个字段指向的地址。然后将 sudog 添加到 channel 的 recvq 队列里。调用 goparkunlock 函数将 goroutine 挂起。

    接下来的代码就是 goroutine 被唤醒后的各种收尾工作了。

    channel操作总结:


    注意:关闭已经关闭的channel也会引发panic。


    References:
    https://cloud.tencent.com/developer/article/1750350
    https://golang.design/go-questions/channel
    https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/channel.html
    https://juejin.cn/post/7037656471210819614

    相关文章

      网友评论

          本文标题:golang笔记——channel底层原理

          本文链接:https://www.haomeiwen.com/subject/agjmgrtx.html