美文网首页
GO阅读-同步编程-channel和select

GO阅读-同步编程-channel和select

作者: 温岭夹糕 | 来源:发表于2022-08-11 10:42 被阅读0次

    非原创,搬运工

    环境

    go1.20
    dlv1.9.x

    1.创建通道makechan

    后续就以该demo代码进行逻辑梳理

    func main() {
        c := make(chan int)
        c1 := make(chan int, 5)
        close(c)
        close(c1)
        fmt.Println("vim-go")
    }
    

    对变量c定义的那一行打断点

    dlv debug main.go
    b main.go:6
    c
    disass
    
    channel汇编代码
    通过查看汇编代码我们发现实际上是调用runtime.makechan方法(在之前学习make方法的时候我们就知道底层是这样调用的,现在是验证)
    runtime/chan.go/makechan函数的定义
    func makechan(t *chantype, size int) *hchan {
    }
    

    这个方法有点难看懂,但是看签名可以知道,最后得到了一个hchan的结构体(1.17和1.20没区别)

    type hchan struct {
            qcount   uint           // total data in the queue
            dataqsiz uint           // size of the circular queue
            buf      unsafe.Pointer // points to an array of dataqsiz elements
            elemsize uint16
            closed   uint32
            elemtype *_type // element type
            sendx    uint   // send index
            recvx    uint   // receive index
            recvq    waitq  // list of recv waiters
            sendq    waitq  // list of send waiters
    
            lock mutex
    }
    
    • qcount chan中元素个数
    • datasize chan中的循环链表长度,就是make创建指定的长度
    • buf chan的缓冲区数据指针
    • elemtype 接受元素类型
    • elemtsize 接收单个元素大小
    • sendx 发送指针位置
    • recvx 接收指针位置
    • sendq和recvq 使用双向链(环状)表用于存储等待的gorutine

    我们再回头看makechan做的事,根据传入通道元素和通道大小,分配hchan和缓冲区内存,再初始化hchan成员

    func makechan(t *chantype, size int) *hchan {
    // elem是channel元素,这里我们是chan int
            elem := t.elem
    
            ...一系列参数的边界和限制校验...
    //math.MulUintptr是判断申请的内存空间会不会超过最大申请限制
            mem, overflow := math.MulUintptr(elem.size, uintptr(size))
           ......  
    
            var c *hchan
            switch {
    //无缓冲区情况分支,只给hchan分配一段内存
            case mem == 0:
                    c = (*hchan)(mallocgc(hchanSize, nil, true))
                    c.buf = c.raceaddr()
    //channel不包含指针的情况
    //给hchan和缓冲区c.buf分配内存
            case elem.ptrdata == 0:
                    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
                    c.buf = add(unsafe.Pointer(c), hchanSize)
    //上面两种情况内存都是连续的
    //默认是单独给hchan和缓冲区分配内存
            default:
                    c = new(hchan)
                    c.buf = mallocgc(mem, elem, true)
            }
    
            c.elemsize = uint16(elem.size)
            c.elemtype = elem
            c.dataqsiz = uint(size)
    //locakinit不太重要,根据函数名字可以推测是锁初始化
            lockInit(&c.lock, lockRankHchan)
    
            return c
    }
    
    func (c *hchan) raceaddr() unsafe.Pointer {
            return unsafe.Pointer(&c.buf)
    }
    

    1.1小结

    makechan无非是干了两件数:1.参数校验和初始化hchan结构体,其中一共初始化了4个字段:buf,elemsize,elemtype,dataqsize

    1.2另一种方法查看汇编

    ssa是编译器经过优化生成的中间代码,在网页写代码之后直接查看,跟dlv一样,只是多一种同类型工具

    2.数据发送chansend

    查看底层汇编


    image.png ssa网页查看结果
    chan的发送实际上是调用runtime.chansend1方法(是chansend的封装)
    func chansend1(c *hchan, elem unsafe.Pointer) {
            chansend(c, elem, true, getcallerpc())
    }
    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
    

    block为true时表示发送操作是阻塞的
    chansend方法:

    前面一堆参数验证
         lock(&c.lock)
    //发送数据前加锁(所以channel发送是线程安全的),
    //验证通道是否关闭
    //这里可以得知往已经关闭的通道发送会引发panic
            if c.closed != 0 {
                    unlock(&c.lock)
                    panic(plainError("send on closed channel"))
            }
    

    后面代码是分别讨论发送数据时遇到的情况

    2.1 直接发送

         if sg := c.recvq.dequeue(); sg != nil {
                    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
                    return true
            }
    

    这种情况是投递数据时,刚好有人在等待,那么直接调用send方法投递给他
    关于recvq是用于存储等待的gorutine的,是waitq类型,可以看到waitq是一个链表/队列结构

    type waitq struct {
            first *sudog
            last  *sudog
    }
    

    waitq.dequeue逻辑很简单,就是递归读取链表

    func (q *waitq) dequeue() *sudog {
            for {
    //先获取头部元素,这个是最后要被弹出去的
    //头部都没有就说明是个空链表
                    sgp := q.first
                    if sgp == nil {
                            return nil
                    }
                    y := sgp.next
    //弹出头部后整理链表,更新first
    //没有下一个就意味着弹出头部后是个空链表
    //有就将下一个作为头部元素
                    if y == nil {
                            q.first = nil
                            q.last = nil
                    } else {
                            y.prev = nil
                            q.first = y
                            sgp.next = nil 
                    }
    //如果第一个不满足条件就扔掉重新来
                    if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
                            continue
                    }
    
                    return sgp
            }
    }
    

    回到我们场景1调用send方法,主要逻辑是以下部分

    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    //拷贝变量地址到接收端的内存地址
            if sg.elem != nil {
                    sendDirect(c.elemtype, sg, ep)
                    sg.elem = nil
            }
            gp := sg.g
            unlockf()
            gp.param = unsafe.Pointer(sg)
            sg.success = true
            if sg.releasetime != 0 {
                    sg.releasetime = cputicks()
            }
            goready(gp, skip+1)
    

    主要干了以下两件事情:

    • 调用sendDirect将发送数据直接拷贝到 接收channel通道元素的变量内存地址上去
      即 x = c <-a ,拷贝到x内存地址上
    func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    //memove是内存拷贝
            memmove(dst, src, t.size)
    }
    
    • 调用goready
    //这里进行协程调度是为了唤醒那个在等待数据的协程
    func goready(gp *g, traceskip int) {
        // 切换到g0的栈
        systemstack(func() {
            ready(gp, traceskip, true)
        })
    }
    

    该函数主要功能就是切换的g0栈执行ready方法,核心方法是gorutine状态切换到runable,放入队列等待P调度,这个在之前学习GMP的时候已经很熟悉了,这就是唤醒协程(这里是数据接收方)的方法(注意是放入runnext,并没有立即执行,要等待P调度)

    至此发送数据的第一种情况分析完毕:若有等待协程则直接发送,具体发送过程是将元素拷贝给接收变量内存地址,唤醒数据接收方进入协程调度 发送过程

    2.2缓冲区未满

    回到chansend
    若没有等待协程则放入缓冲区

    //datasize缓冲区大小,qcount为放入元素数量
    //进入该分支时此时缓冲区未满,且接收端没人等
            if c.qcount < c.dataqsiz {
    //计算出下一个存储数据位置,sendx是发送指针位置
                    qp := chanbuf(c, c.sendx)
    //将刚进入channel的元素发送到计算出的位置
                    typedmemmove(c.elemtype, qp, ep)
    //发送指针自增
                    c.sendx++
    // 缓冲区满了,
    //重置sendx
                    if c.sendx == c.dataqsiz {
                            c.sendx = 0
                    }
                    c.qcount++
                    unlock(&c.lock)
                    return true
            }
    

    qcount<datasize即buf中还有剩余空间,利用chanbuf计算出下一个存储数据的位置,再利用typedmemove将发送数据拷贝到计算出的位置,发送指针sendx+1,通道元素数量qcount+1

    2.3关于阻塞发送

    走到这里说明是条件都不满足:
    1.没有缓冲区,没人消费
    2.缓冲区大小不够,没人消费
    需要阻塞当前协程

    //block是传入的参数,是否要阻塞
    //上面那两条件出现明显说明要休眠等待元素被消费了
    // 如果block还是为false ,要求不阻塞
    //那就直接返回
        if !block {
            unlock(&c.lock)
            return false
        }
    //func getg() *g尝试获取发送数据使用的协程
    //就是自己了没错
        gp := getg()
    //获取sudog结构体并设置一次阻塞发送的配置
    //将要发送的元素进行单独特例包装
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
        mysg.elem = ep
        mysg.waitlink = nil
        mysg.g = gp
        mysg.isSelect = false
        mysg.c = c
        gp.waiting = mysg
        gp.param = nil
    //扔到sendq链表里面去
        c.sendq.enqueue(mysg)
        atomic.Store8(&gp.parkingOnChan, 1)
    //你个协程睡觉去吧你
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    

    acquireSudog返回一个sudog结构体,并设置这一次阻塞发送的相关信息,包括发送的协程,是否在select中发送等
    sendq.enqueue就是和recvq的dequeue相反,是将其入队,等待条件满足唤醒
    调用gopark切走协程主动让出cpu执行权限
    剩下代码就是被唤醒后做一些收尾的工作,如释放内存,return true表示成功发送数据

    小结

    至此通过阅读chansend源码,数据发送(chan<-i)的三种情况已经分析完毕:
    1.刚好有协程在recvq中等待则直接给他
    2.缓冲区有空位置(qcount<datasize)就放上去
    3.都不满足就创建sudo结构体,包装元素,扔到sendq链表中,阻塞当前协程等待被唤醒
    发送数据时触发Gorutine调度的几个时机:
    1.直接发送时会唤醒协程(send调用goready),等待调度
    2.发送数据时没找到接收方且缓存已满就将自己入队等待调度(gopark)

    3.接收数据

    ssa截图

    本质上是调用runtime.chanrecv1(是对chanrecv的封装),
    函数签名

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
    

    chanrecv先对通道进行验证

        if c == nil {
            if !block {
                return
            }
            gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
    
        lock(&c.lock)
    
        if c.closed != 0 {
            if c.qcount == 0 {
                if raceenabled {
                    raceacquire(c.raceaddr())
                }
                unlock(&c.lock)
                if ep != nil {
                    typedmemclr(c.elemtype, ep)
                }
                return true, false
            }
        } else {
    //这个是1.20新增的代码,但是总体逻辑还是一样的
    //如果通道被关闭且缓冲区还有数据
    //还能继续接收
            if sg := c.sendq.dequeue(); sg != nil {
                recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
                return true, true
            }
        }
    

    如果c已经被关闭,且不存在任何数据就返回true和false,也就是说有这样代码

    data,ok := chan<-
    

    ok = false 代表的是关闭且没有数据 ,ok = true代表关闭但是可能还有数据
    同理接收数据分上面三种情况

    3.1直接接收

        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    

    这里要注意recv会根据缓冲区大小分别处理

    • 不存在则调用recvDirect从sendq队列上取
    • 存在则将数据拷贝到内存,将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方

    3.2缓冲区寻找

        if c.qcount > 0 {
            qp := chanbuf(c, c.recvx)
    
            if ep != nil {
                typedmemmove(c.elemtype, ep, qp)
            }
            typedmemclr(c.elemtype, qp)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.qcount--
            unlock(&c.lock)
            return true, true
        }
    

    逻辑类似上面发送代码,存到缓冲区中

    3.3阻塞接收

        if !block {
            unlock(&c.lock)
            return false, false
        }
    
        gp := getg()
        mysg := acquireSudog()
        mysg.elem = ep
        gp.waiting = mysg
        mysg.g = gp
        mysg.c = c
        c.recvq.enqueue(mysg)
        goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
    

    还是获取接收协程信息,sudog包装入队然后阻塞,最后收尾工作

    4.关闭通道

    调用closechan方法 image.png

    前面是异常处理,跳过

    c.closed = 1
    
        var glist gList
        for {
            sg := c.recvq.dequeue()
            if sg == nil {
                break
            }
            if sg.elem != nil {
                typedmemclr(c.elemtype, sg.elem)
                sg.elem = nil
            }
            gp := sg.g
            gp.param = nil
            glist.push(gp)
        }
    
        for {
            sg := c.sendq.dequeue()
            ...
        }
        for !glist.empty() {
            gp := glist.pop()
            gp.schedlink = 0
            goready(gp, 3)
        }
    }
    

    清除sendq上的未被处理协程,最后为所有被阻塞的调用goready触发调度

    5.select

    在上面分析阻塞接收和阻塞发送时,我们都遇到结构体sudo(进入recvq或sendq队列的结构体,保存协程信息),它有一个字段叫做isSelect,判断当前执行环境是否在select中,其实在学习select的时候我们就知道了,我们常将select和channel搭配使用,这也说明了两者代码是存在一定关系的。
    但是当我们想使用dlv对select打断点时发现有些没有效果,这段代码会在编译阶段被优化成其他!

    5.1无case的select

    select{}
    
    image.png

    这个可以通过dlv捕捉到,内部实现是runtime.block方法

    func block() {
        gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // 永久阻塞
    }
    

    同时内部也调用gopark(让出当前 Goroutine 对处理器的使用权并传入等待原因 waitReasonSelectNoCases,上文的非阻塞发送有遇到过)进行永久阻塞

    5.2单一channel

    demo.go

    func main() {
        c := make(chan int)
        x := 0
        select {
        case c <- x:
            fmt.Println("hi")
        }
    }
    
    image.png

    实际上是直接被编译器优化成接收方法,并没有select的痕迹

    5.3单一channel+default

    demo.go

        select {
        case c <- x:
            fmt.Println("hi")
        default:
            fmt.Println("bye")
        }
    
    image.png

    调用selectnbsend,将c<-x改为 <-c


    image.png

    变成了调用selectnbrecv
    阻塞发送的情况

    func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
            return chansend(c, elem, false, getcallerpc())
    }
    

    内部是对chansend和chanrecv的封装,区别在于传入的block=false,还记得上文分析过,当刚好没有等待协程且缓冲区已满时会进入block判断分支:

    //chansend函数
    if sg:= c.recvq.dequeue
       ....
    if c.qucount < c.dataqsize
      ...
    
            if !block {
                    unlock(&c.lock)
                    return false
            }
    

    也就是说遇到上面情况(没有消费协程且没缓冲区)会快速失败,而不是继续向下执行阻塞协程,来让select后面的代码有可执行机会

    再复杂点,多路复用(多个channel)

    demo.go

        select {
        case c <- x:
            fmt.Println("hi")
        case <-c:
            fmt.Println(x)
        default:
            fmt.Println(x)
        }
    
    
    image.png

    变成了selectgo函数,这是select里面的重头戏(位于runtime/select.go)
    先来看函数签名

    func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)
    
    • case0是一个类型为[ncases]scase的数组
    • order0 是一个指向[2*ncases]uint16数组,值都为0

    为什么 selectgo 还需要传递一个 order0,而不是直接根据 ncase 直接分配呢
    编译转换会使用 temp 函数来构造生成数组的语句,而这个语句便可以保证数据会分配到栈上,而不是堆上,避免了不必要的堆分配

    • selectgo 会返回选中的序号,如果是个接收,还会返回是否接收到一个值

    select在go语言不存在相应的结构体,但是使用的分支case在go中使用scase结构体表示

    type scase struct {
        c    *hchan         // chan
        elem unsafe.Pointer // data element
    }
    
    • c 类型hchan已经很熟悉了,就是一个通道的结构体,来存储case中使用的channel
      回到selectgo函数
    //将case0数组和order0转为slice结构
        cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
        order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
       // [:n:n]的方式会让slice的len和cap相等
        ncases := nsends + nrecvs
        scases := cas1[:ncases:ncases]
        pollorder := order1[:ncases:ncases]
        lockorder := order1[ncases:][:ncases:ncases]
    

    下面看不懂没关系,我也看不懂索性直接copy,只要知道它是随机选取分支就好
    首先会进行执行必要的初始化,决定处理case的两个顺序 :

    • 轮询顺序pollorder
    • 加锁顺序 lockorder

    order1会被分为pollorder和lockorder,这两个slice将会真正决定select的随机选择以及死锁问题

        norder := 0
        for i := range scases {
            cas := &scases[i]
    //对于channel为nil的收发操作,将他们的elem设置为nil
            if cas.c == nil {
                cas.elem = nil // allow GC
                continue
            }
    // fastrabdb为生成随机数函数
    //porder刚开始是0,循环结束后是随机顺序的scases索引
            j := fastrandn(uint32(norder + 1))
            pollorder[norder] = pollorder[j]
            pollorder[j] = uint16(i)
            norder++
        }
        pollorder = pollorder[:norder]
        lockorder = lockorder[:norder]
    
         ....
         sellock(scases, lockorder)
         ....
    

    这里的轮询顺序pollorder是随机的,避免channel的饥饿问题,保证公平性,之后根据channel的地址顺序确定加锁顺序来避免死锁发生(sellock函数)

    如果多个 goroutine 都需要锁定 ch1 ch2,而他们加锁的顺序不固定,那么很可能会出现死锁问题
    这个时候,对加锁的顺序就有要求了,按照同样的顺序的话,没有竞争到 ch1.lock 的 goroutine,会等待加锁 ch1.lcok,而不会直接去加锁 ch2.lock

    func sellock(scases []scases, lockorder []int16) {
        var c *hchan
        for _, o := range lockorder {
            c0 := scases[0].c // 根据加锁顺序获取 case
    
            // c 记录了上次加锁的 hchan 地址,如果和当前 *hchan 相同,那么就不会再次加锁
            if c0 != nil && c0 != c {
                c = c0
                lock(&c.lock)
            }
        }
    }
    

    加锁完成,进入selectgo主循环逻辑
    第一阶段的主要职责是查找所有 case 中是否有可以立刻被处理的 Channel。无论是在等待的 Goroutine 上还是缓冲区中,只要存在数据满足条件就会立刻处理

    func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
        ...
        gp = getg()
        nextp = &gp.waiting
        for _, casei := range lockorder {
            casi = int(casei)
            cas = &scases[casi]
            c = cas.c
            sg := acquireSudog()
            sg.g = gp
            sg.c = c
    
            if casi < nsends {
                c.sendq.enqueue(sg)
            } else {
                c.recvq.enqueue(sg)
            }
        }
    
        gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
        ...
    }
    

    如果不能立刻找到活跃的 Channel 就会进入循环的下一阶段,按照需要将当前 Goroutine 加入到 Channel 的 sendq 或者 recvq 队列中
    除了将当前 Goroutine 对应的 runtime.sudog 结构体加入队列之外,这些结构体都会被串成链表附着在 Goroutine 上。在入队之后会调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒。
    等到 select 中的一些 Channel 准备就绪之后,当前 Goroutine 就会被调度器唤醒。这时会继续执行 runtime.selectgo 函数的第三部分,从 runtime.sudog 中读取数据:

    func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
        ...
        sg = (*sudog)(gp.param)
        gp.param = nil
    
        casi = -1
        cas = nil
        sglist = gp.waiting
        for _, casei := range lockorder {
            k = &scases[casei]
            if sg == sglist {
                casi = int(casei)
                cas = k
            } else {
                c = k.c
                if int(casei) < nsends {
                    c.sendq.dequeueSudoG(sglist)
                } else {
                    c.recvq.dequeueSudoG(sglist)
                }
            }
            sgnext = sglist.waitlink
            sglist.waitlink = nil
            releaseSudog(sglist)
            sglist = sgnext
        }
    
        c = cas.c
        goto retc
        ...
    }
    

    第三次遍历全部 case 时,我们会先获取当前 Goroutine 接收到的参数 sudog 结构,我们会依次对比所有 case 对应的 sudog 结构找到被唤醒的 case,获取该 case 对应的索引并返回。

    由于当前的 select 结构找到了一个 case 执行,那么剩下 case 中没有被用到的 sudog 就会被忽略并且释放掉。为了不影响 Channel 的正常使用,我们还是需要将这些废弃的 sudog 从 Channel 中出队。

    当我们在循环中发现缓冲区中有元素或者缓冲区未满时就会通过 goto 关键字跳转到 bufrecv 和 bufsend 两个代码段,这两段代码的执行过程都很简单,它们只是向 Channel 中发送数据或者从缓冲区中获取新数据:

    bufrecv:
        recvOK = true
        qp = chanbuf(c, c.recvx)
        if cas.elem != nil {
            typedmemmove(c.elemtype, cas.elem, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        selunlock(scases, lockorder)
        goto retc
    
    bufsend:
        typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        selunlock(scases, lockorder)
        goto retc
    

    这里在缓冲区进行的操作和直接调用 runtime.chansendruntime.chanrecv 差不多,上述两个过程在执行结束之后都会直接跳到 retc 字段。

    两个直接收发 Channel 的情况会调用运行时函数 runtime.sendruntime.recv,这两个函数会与处于休眠状态的 Goroutine 打交道:

    recv:
        recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
        recvOK = true
        goto retc
    
    send:
        send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
        goto retc
    

    两个直接收发 Channel 的情况会调用运行时函数 runtime.sendruntime.recv,这两个函数会与处于休眠状态的 Goroutine 打交道:

    recv:
        recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
        recvOK = true
        goto retc
    
    send:
        send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
        goto retc
    

    不过如果向关闭的 Channel 发送数据或者从关闭的 Channel 中接收数据,情况就稍微有一点复杂了:

    • 从一个关闭 Channel 中接收数据会直接清除 Channel 中的相关内容(1.20版本需要判断有无缓冲区);
    • 向一个关闭的 Channel 发送数据就会直接 panic 造成程序崩溃:
    rclose:
        selunlock(scases, lockorder)
        recvOK = false
        if cas.elem != nil {
            typedmemclr(c.elemtype, cas.elem)
        }
        goto retc
    
    sclose:
        selunlock(scases, lockorder)
        panic(plainError("send on closed channel"))
    

    总体来看,select 语句中的 Channel 收发操作和直接操作 Channel 没有太多出入,只是由于 select 多出了 default 关键字所以会支持非阻塞的收发。

    参考

    1.go语言channel
    2.go 深入刨析
    3.go ready函数
    4.go夜读

    相关文章

      网友评论

          本文标题:GO阅读-同步编程-channel和select

          本文链接:https://www.haomeiwen.com/subject/nxyxgrtx.html