美文网首页go语言
Go语言——channel详解

Go语言——channel详解

作者: 陈先生_9e91 | 来源:发表于2018-11-01 12:54 被阅读0次

    Go语言——channel详解

    channel和goroutine是go语言最具特色是结构,有必要仔细研究。

    源码路径:go1.10\src\runtime\chan.go

    struct

    type hchan struct {
       qcount   uint           // total data in the queue
       dataqsiz uint           // size of the circular queue
       buf      unsafe.Pointer // points to an array of dataqsiz elements
       elemsize uint16
       closed   uint32
       elemtype *_type // element type
       sendx    uint   // send index
       recvx    uint   // receive index
       recvq    waitq  // list of recv waiters
       sendq    waitq  // list of send waiters
    
       lock mutex
    }
    
    • qcount:当前chann元素个数,len()
    • dataqsiz:chann容量,cap()
    • buf:元素缓冲区
    • elemsize:元素大小
    • closed:close标志位
    • elemtype:元素类型
    • sendx:下一个发送元素所在缓冲区索引
    • recvx:下一个接收元素所在缓冲区索引
    • recvq:接收G的等待队列
    • sendq:发送G的等待队列

    从上面可以看出,waitq非常重要

    type waitq struct {
       first *sudog
       last  *sudog
    }
    
    // sudog represents a g in a wait list, such as for sending/receiving
    // on a channel.
    //
    // sudogs are allocated from a special pool. Use acquireSudog and
    // releaseSudog to allocate and free them.
    type sudog struct {
        g *g
    
        isSelect bool
        next     *sudog
        prev     *sudog
        elem     unsafe.Pointer // data element (may point to stack)
    
        acquiretime int64
        releasetime int64
        ticket      uint32
        parent      *sudog // semaRoot binary tree
        waitlink    *sudog // g.waiting list or semaRoot
        waittail    *sudog // semaRoot
        c           *hchan // channel
    }
    
    

    waitq是一个queue,具有头尾指针。

    sudog封装了等待chann接收/发送的G。sudogs有一个特殊的池,提高性能。

    new

    chan不能直接new,只能make,所以观察makechan方法。make的时候,会根据size判断是缓冲chan还是非缓冲chan,这个逻辑需要注意。

    func makechan(t *chantype, size int) *hchan {
       elem := t.elem
    
       var c *hchan
       switch {
       case size == 0 || elem.size == 0:
          // Queue or element size is zero.
          c = (*hchan)(mallocgc(hchanSize, nil, true))
          // Race detector uses this location for synchronization.
          c.buf = unsafe.Pointer(c)
       case elem.kind&kindNoPointers != 0:
          // Elements do not contain pointers.
          // Allocate hchan and buf in one call.
          c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
          c.buf = add(unsafe.Pointer(c), hchanSize)
       default:
          // Elements contain pointers.
          c = new(hchan)
          c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
       }
    
       c.elemsize = uint16(elem.size)
       c.elemtype = elem
       c.dataqsiz = uint(size)
        
       return c
    }
    
    

    这里看到由于要对buf对象分配,所以没有提供new方法,只能make。除了对default情况比较容易理解,另外两只buf的内存分配都不是很理解=。=,先遗留。

    chansend

    send & revc
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
       lock(&c.lock)
    
       if c.closed != 0 {
          unlock(&c.lock)
          panic(plainError("send on closed channel"))
       }
    
       if sg := c.recvq.dequeue(); sg != nil {
          // Found a waiting receiver. We pass the value we want to send
          // directly to the receiver, bypassing the channel buffer (if any).
          send(c, sg, ep, func() { unlock(&c.lock) }, 3)
          return true
       }
    
    1. lock
    2. 如果chann close,就直接panic,因为不能往close chan send数据
    3. 如果有G在等待接收数据,就直接将数据发给它。
       if c.qcount < c.dataqsiz {
          // Space is available in the channel buffer. Enqueue the element to send.
          qp := chanbuf(c, c.sendx)
          if raceenabled {
             raceacquire(qp)
             racerelease(qp)
          }
          typedmemmove(c.elemtype, qp, ep)
          c.sendx++
          if c.sendx == c.dataqsiz {
             c.sendx = 0
          }
          c.qcount++
          unlock(&c.lock)
          return true
       }
    
    1. 如果buf还有剩余,就将数据保存在buf里面
    2. sendx++,跟上面的gif动图一样,指向下一个send位置
    3. sendx == 0表示buf满了
       // Block on the channel. Some receiver will complete our operation for us.
       gp := getg()
       mysg := acquireSudog()
    
       mysg.elem = ep
       mysg.g = gp
       c.sendq.enqueue(mysg)
    
       releaseSudog(mysg)
       return true
    }
    

    以上情况不满足的话,接下来要做的就是block发送数据的g,看看chan具体怎么做的

    1. 获取当前g,注意这里的g是往chan发数据的g
    2. 从pool中获取一个sudog,保存当前g信息,方便后面唤醒
    3. 将sudog入队到sendq发送等待队列
    4. 释放sudog这里不是很懂

    chanrecv

    没找到动图,就不贴了。

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
       lock(&c.lock)
        
        if c.closed != 0 && c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }    
    
       if sg := c.sendq.dequeue(); sg != nil {
          // Found a waiting sender. If buffer is size 0, receive value
          // directly from sender. Otherwise, receive from head of queue
          // and add sender's value to the tail of the queue (both map to
          // the same buffer slot because the queue is full).
          recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
          return true, true
       }
    
    1. lock
    2. 如果chan已经close,并且没有数据,就直接返回空值
    3. 如果有G正在等待发送数据,分情况
    4. 如果chan没有buf,就直接将G的数据发送给当前g
    5. 如果chan有buf, 就从buf头中取出数据G发送给当前g,并且唤醒G,然后把数据写入buf尾部

    这里有两个g,一个是G表示等待发送数据的g,一个g表示接受chan数据的g。

    这里ep对象有点意思,send中ep表示发送到的数据,揣测recv会从堆中取出数据,然后将ep指针指向这个堆数据。

        if c.qcount > 0 {
          // Receive directly from queue
          qp := chanbuf(c, c.recvx)
    
          if ep != nil {
             typedmemmove(c.elemtype, ep, qp)
          }
          typedmemclr(c.elemtype, qp)
          c.recvx++
          if c.recvx == c.dataqsiz {
             c.recvx = 0
          }
          c.qcount--
          unlock(&c.lock)
          return true, true
       }
    

    如果chan有数据,就直接从buf里面取出来,然后维护recvx和qcount

       // no sender available: block on this channel.
       gp := getg()
       mysg := acquireSudog()
       
       mysg.elem = ep
       mysg.g = gp
       c.recvq.enqueue(mysg)
    
       releaseSudog(mysg)
       return true, !closed
    }
    

    以上情况都不满足,即chan没有数据,也没有等待发送的g。那就需要block当前接收的g

    1. 获取当前等待接收数据的g
    2. 获取sudog
    3. 用sudog封装g信息
    4. 将sudog入队到recvq等待接收队列
    5. 释放sudog

    相关文章

      网友评论

        本文标题:Go语言——channel详解

        本文链接:https://www.haomeiwen.com/subject/rmtxxqtx.html