美文网首页
Go Chan 源码解析

Go Chan 源码解析

作者: litesky | 来源:发表于2021-11-30 17:01 被阅读0次

    本篇文章内容基于go1.14.2分析

    golang的chan是一个内置类型,作为csp编程的核心数据结构,其底层数据结构是一个叫hchan的struct:

    type hchan struct {
        qcount   uint           // 队列中的元素数量
        dataqsiz uint           // (环形)队列的大小
        buf      unsafe.Pointer // 队列的指针
        elemsize uint16 // 元素大小
        closed   uint32 // 是否已close
        elemtype *_type // 元素类型
        sendx    uint   // 环形队列中,send的位置
        recvx    uint   // 环形队列中 recv的位置
        recvq    waitq  // 读取等待队列
        sendq    waitq  // 发送等待队列
        lock mutex // 互斥锁
    }
    
    image

    如图所示,chan最核心的部分由一个环形队列和2个waitq组成,环形队列用于存放数据(带缓冲的情况下),waitq用于实现阻塞和恢复goroutine。

    chan的相关操作

    对chan的操作有:make、读、写、close,当然还有select,这里只讨论前面四个操作。

    创建 chan

    当在代码中使用make创建chan时,编译器会根据情况自动替换成makechan64 或者makechan,makechan64 其实还是调用了makechan函数。

    func makechan(t *chantype, size int) *hchan {
        elem := t.elem
        
      // 确保元素类型的size < 2^16,
        if elem.size >= 1<<16 {
            throw("makechan: invalid channel element type")
        }
      // 检查内存对齐
        if hchanSize%maxAlign != 0 || elem.align > maxAlign {
            throw("makechan: bad alignment")
        }
    
      // 计算缓冲区所需分配内存大小
        mem, overflow := math.MulUintptr(elem.size, uintptr(size))
        if overflow || mem > maxAlloc-hchanSize || size < 0 {
            panic(plainError("makechan: size out of range"))
        }
    
        var c *hchan
        switch {
        case mem == 0:
            // 即不带缓冲区的情况,只需要调用mallocgc分配
            c = (*hchan)(mallocgc(hchanSize, nil, true))
            // 理解为空地址
            c.buf = c.raceaddr()
        case elem.ptrdata == 0:
            // 元素类型不包含指针的情况
            c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
            c.buf = add(unsafe.Pointer(c), hchanSize)
        default:
            // 默认情况下:包含指针
            c = new(hchan)
            c.buf = mallocgc(mem, elem, true)
        }
    
        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        c.dataqsiz = uint(size)
    
        if debugChan {
            print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
        }
        return c
    }
    

    chan 写操作

    当对chan进行写入“ch <- interface{}” 时,会被编译器替换成chansend1函数的调用,最终还是调用了chansend函数:

    image
    //elem 是待写入元素的地址
    func chansend1(c *hchan, elem unsafe.Pointer) {
        chansend(c, elem, true, getcallerpc())
    }
    

    先看看chansend的函数签名,只需关注ep和block这个两个参数即可,ep是要写入数据的地址,block表示是否阻塞式的调用

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 
    

    chansend有以下几种处理流程:

    1. 当对一个nil chan进行写操作时,如果是非阻塞调用,直接返回;否则将当前协程挂起

      // chansend 对一个 nil chan发送数据时,如果是非阻塞则直接返回,否则将当前协程挂起
      if c == nil {
           if !block {
               return false
           }
           gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
           throw("unreachable")
       }
      
    2. 非阻塞模式且chan未close,没有缓冲区且没有等待接收或者缓冲区满的情况下,直接return false。

      // 1. 非阻塞模式且chan未close
        // 2. 没有缓冲区且没有等待接收 或者 缓冲区满的情况下
        // 满足以上条件直接return false
      if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
           (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
           return false
       }
      
    3. c.recvq中有等待读的接收者,将其出队,将数据直接copy给接收者,并唤醒接收者。

      // 有等待的接收的goroutine
       // 出队,传递数据
       if sg := c.recvq.dequeue(); sg != nil {
           // Found a waiting receiver. We pass the value we want to send
           // directly to the receiver, bypassing the channel buffer (if any).
           send(c, sg, ep, func() { unlock(&c.lock) }, 3)
           return true
       }
      

      recvq是一个双向链表,每个sudog会关联上一个reader(被阻塞的g)

      image

      当sudog出队后,会调用send方法,通过sendDirect 实现数据在两个地址之间拷贝,最后调用goready唤醒reader(被阻塞的g)

      func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
       // ... 剔除无关代码
       if sg.elem != nil {
           // 直接将数据拷贝到变量ep所在的地址
           sendDirect(c.elemtype, sg, ep)
           sg.elem = nil
       }
       gp := sg.g
       unlockf()
       gp.param = unsafe.Pointer(sg)
       if sg.releasetime != 0 {
           sg.releasetime = cputicks()
       }
       //将reader的goroutine唤起
       goready(gp, skip+1)
      }
      
      
    4. 缓冲区未满的情况下,数据放入环形缓冲区即可。

       // 缓冲区未满
       // 将数据放到缓冲区
       if c.qcount < c.dataqsiz {
           // Space is available in the channel buffer. Enqueue the element to send.
           // 存放位置
           qp := chanbuf(c, c.sendx)
           if raceenabled {
               raceacquire(qp)
               racerelease(qp)
           }
           typedmemmove(c.elemtype, qp, ep)
           // 指针自增
           c.sendx++
           if c.sendx == c.dataqsiz {
               c.sendx = 0
           }
           c.qcount++
           unlock(&c.lock)
           return true
       }
      
    1. 缓冲区已满,阻塞模式下关联一个sudog数据结构并进入c.sendq队列,挂起当前协程。

       // 阻塞的情况
       gp := getg() //拿到当前g
       mysg := acquireSudog() // 获取一个sudog
       mysg.releasetime = 0
       if t0 != 0 {
           mysg.releasetime = -1
       
       mysg.elem = ep //关联ep,即待写入的数据地址
       mysg.waitlink = nil
       mysg.g = gp
       mysg.isSelect = false
       mysg.c = c
       gp.waiting = mysg
       gp.param = nil
       c.sendq.enqueue(mysg) // 入队
       // Signal to anyone trying to shrink our stack that we're about
       // to park on a channel. The window between when this G's status
       // changes and when we set gp.activeStackChans is not safe for
       // stack shrinking.
       atomic.Store8(&gp.parkingOnChan, 1)
       // 将g休眠,让出cpu
        // gopark后,需等待reader来唤醒它
       gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
       // 唤醒过后
       // Ensure the value being sent is kept alive until the
       // receiver copies it out. The sudog has a pointer to the
       // stack object, but sudogs aren't considered as roots of the
       // stack tracer.
       // 保持数据不被回收
       KeepAlive(ep)
      
       // someone woke us up.
       if mysg != gp.waiting {
           throw("G waiting list is corrupted")
       }
       gp.waiting = nil
       gp.activeStackChans = false
       if gp.param == nil {
           if c.closed == 0 {
               throw("chansend: spurious wakeup")
           }
           panic(plainError("send on closed channel"))
       }
       gp.param = nil
       if mysg.releasetime > 0 {
           blockevent(mysg.releasetime-t0, 2)
       }
       mysg.c = nil
       releaseSudog(mysg)
       return true
      

    chan 读操作

    当对chan进行读操作时,编译器会替换成 chanrecv1或者chanrecv2函数,最终会调用chanrecv函数处理读取

    image
    // v := <- ch
    func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
    }
    // v, ok := <- ch
    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
    }
    

    和chansend一样,chanrecv也是支持非阻塞式的调用

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 
    

    chanrecv有以下几种处理流程:

    1. 读nil chan,如果是非阻塞,直接返回;如果是阻塞式,将当前协程挂起。

       // 读阻塞
       if c == nil {
           if !block {
               return
           }
           gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
           throw("unreachable")
       }
      
    2. 非阻塞模式下,没有缓冲区且没有等待写的writer或者缓冲区没数据,直接返回。

       if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
           c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
           atomic.Load(&c.closed) == 0 {
           return
       }
      
    3. chan已经被close,并且队列中没有数据时,会将存放值的变量清零,然后返回。

       // c已经被close 并且 没有数据
       // 清除ep指针
       if c.closed != 0 && c.qcount == 0 {
           if raceenabled {
               raceacquire(c.raceaddr())
           }
           unlock(&c.lock)
           if ep != nil {
               typedmemclr(c.elemtype, ep)
           }
           return true, false
       }
      
    4. sendq中有等待的writer,writer出队,并调用recv函数

      // 从sendq中取出sender
       if sg := c.sendq.dequeue(); sg != nil {
           // Found a waiting sender. If buffer is size 0, receive value
           // directly from sender. Otherwise, receive from head of queue
           // and add sender's value to the tail of the queue (both map to
           // the same buffer slot because the queue is full).
           // 从sender中读取数据
           recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
           return true, true
       }
      
      image

      recv在这分两种处理:如果ch不带缓冲区的话,直接将writer的sg.elem数据拷贝到ep;如果带缓冲区的话,此时缓冲区肯定满了,那么就从缓冲区队列头部取出数据拷贝至ep,然后将writer的sg.elem数据拷贝到缓冲区中,最后唤醒writer(g)

      func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
         // 不带缓冲区的情况
         // 直接copy from sender
         if c.dataqsiz == 0 {
            if raceenabled {
               racesync(c, sg)
            }
            if ep != nil {
               // copy data from sender
               recvDirect(c.elemtype, sg, ep)
            }
         } else {
            // Queue is full. Take the item at the
            // head of the queue. Make the sender enqueue
            // its item at the tail of the queue. Since the
            // queue is full, those are both the same slot.
            // 队列已满
            // 队列元素出队
            qp := chanbuf(c, c.recvx)
            if raceenabled {
               raceacquire(qp)
               racerelease(qp)
               raceacquireg(sg.g, qp)
               racereleaseg(sg.g, qp)
            }
            // copy data from queue to receiver
            // 数据拷贝给ep
            if ep != nil {
               typedmemmove(c.elemtype, ep, qp)
            }
            // copy data from sender to queue
            // 将sender的数据拷贝到这个槽中
            typedmemmove(c.elemtype, qp, sg.elem)
            c.recvx++
            if c.recvx == c.dataqsiz {
               c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
         }
         // 置空
         sg.elem = nil
         gp := sg.g
         unlockf()
         gp.param = unsafe.Pointer(sg)
         if sg.releasetime != 0 {
            sg.releasetime = cputicks()
         }
         // 唤醒sender协程
         goready(gp, skip+1)
      }
      
    5. 直接从缓冲队列中读数。

       // 带缓冲区
       if c.qcount > 0 {
           // Receive directly from queue
           // 直接buf中取
           qp := chanbuf(c, c.recvx)
           if raceenabled {
               raceacquire(qp)
               racerelease(qp)
           }
           // 拷贝数据到ep指针
           if ep != nil {
               typedmemmove(c.elemtype, ep, qp)
           }
           // 清除qp
           typedmemclr(c.elemtype, qp)
           c.recvx++
           if c.recvx == c.dataqsiz {
               c.recvx = 0
           }
           c.qcount--
           unlock(&c.lock)
           return true, true
       }
      
    6. 阻塞的情况,缓冲区没有数据,且没有writer

      
       // 阻塞
       gp := getg() //拿到当前的goroutine
       mysg := acquireSudog() // 获取一个sudog
       mysg.releasetime = 0
       if t0 != 0 {
           mysg.releasetime = -1
       }
       
       //sudog 关联
       mysg.elem = ep
       mysg.waitlink = nil
       gp.waiting = mysg
       mysg.g = gp
       mysg.isSelect = false
       mysg.c = c
       gp.param = nil
       c.recvq.enqueue(mysg) //入队
       // Signal to anyone trying to shrink our stack that we're about
       // to park on a channel. The window between when this G's status
       // changes and when we set gp.activeStackChans is not safe for
       // stack shrinking.
       atomic.Store8(&gp.parkingOnChan, 1)
        // 挂起当前goroutine,等待writer唤醒
       gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
      
       // 唤醒后
       if mysg != gp.waiting {
           throw("G waiting list is corrupted")
       }
       gp.waiting = nil
       gp.activeStackChans = false
       if mysg.releasetime > 0 {
           blockevent(mysg.releasetime-t0, 2)
       }
       closed := gp.param == nil
       gp.param = nil
       // sudog解除关联
       mysg.c = nil
        // 释放sudog
       releaseSudog(mysg)
      
      

    close 关闭操作

    当close一个chan时,编译器会替换成对closechan函数的调用,将closed字段置为1,并将recvq和sendq中的goroutine释放唤醒,对sendq中未写入的数据做清除,且writer会发生panic异常。

    func closechan(c *hchan) {
        if c == nil {
            panic(plainError("close of nil channel"))
        }
        
      // 加锁
        lock(&c.lock)
      // 不可重复close
        if c.closed != 0 {
            unlock(&c.lock)
            panic(plainError("close of closed channel"))
        }
    
        if raceenabled {
            callerpc := getcallerpc()
            racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
            racerelease(c.raceaddr())
        }
    
        c.closed = 1
    
        var glist gList
    
        // 释放所有的
        for {
            // 出队
            sg := c.recvq.dequeue()
            if sg == nil {
                break
            }
            // 清零
            if sg.elem != nil {
                typedmemclr(c.elemtype, sg.elem)
                sg.elem = nil
            }
            if sg.releasetime != 0 {
                sg.releasetime = cputicks()
            }
            gp := sg.g
            gp.param = nil
            if raceenabled {
                raceacquireg(gp, c.raceaddr())
            }
            glist.push(gp)
        }
    
        // 释放所有writer
        for {
            // 出队
            sg := c.sendq.dequeue()
            if sg == nil {
                break
            }
            // 丢弃数据
            sg.elem = nil
            if sg.releasetime != 0 {
                sg.releasetime = cputicks()
            }
            gp := sg.g
            gp.param = nil
            if raceenabled {
                raceacquireg(gp, c.raceaddr())
            }
            glist.push(gp)
        }
        unlock(&c.lock)
    
        // 唤醒所有g
        for !glist.empty() {
            gp := glist.pop()
            gp.schedlink = 0
            goready(gp, 3)
        }
    }
    

    chan使用小技巧

    1. 避免read、write一个nil chan

      func main() {
       ch := make(chan int,1)
      
       go func() {
           time.Sleep(1*time.Second)
           ch = nil
       }()
      
       ch<-1 
       ch<-1 // 协程直接挂起
      }
      
    2. 从chan中read时,使用带指示的访问方式,读取的时候无法感知到close的关闭

      func main() {
       ch := make(chan int)
      
       go func() {
           ch <- 10
           close(ch)
       }()
      
       for {
           select {
            // case i, ok := <-ch:
            // if ok {
            //  break
            //}
               case i := <-ch:
                   fmt.Println(i)
                   time.Sleep(100 * time.Millisecond)
           }
       }
      }
      
    3. 从chan中read时,不要使用已存在变量接收, chan close之后,缓冲区没有数据的话,使用存在变量读取时,会将变量清零

      func main() {
       a := 10
       ch := make(chan int,1)
      
       fmt.Println("before close a is: ", a) // a is 10
       close(ch)
       a = <-ch 
       fmt.Println("after close a is: ", a) // a is 0
      }
      
    1. 使用select+default可以实现 chan的无阻塞读取

      // 使用select反射包实现无阻塞读写
      func tryRead(ch chan int) (int, bool) {
       var cases []reflect.SelectCase
       caseRead := reflect.SelectCase{
           Dir:  reflect.SelectRecv,
           Chan: reflect.ValueOf(ch),
       }
      
       cases = append(cases, caseRead)
       cases = append(cases, reflect.SelectCase{
           Dir: reflect.SelectDefault,
       })
      
       _, v, ok := reflect.Select(cases)
      
       if ok {
      
           return (v.Interface()).(int), ok
       }
      
       return 0, ok
      }
      
      func tryWrite(ch chan int, data int) bool {
       var cases []reflect.SelectCase
       caseWrite := reflect.SelectCase{
           Dir:  reflect.SelectSend,
           Chan: reflect.ValueOf(ch),
           Send: reflect.ValueOf(data),
       }
      
       cases = append(cases, caseWrite)
       cases = append(cases, reflect.SelectCase{
           Dir: reflect.SelectDefault,
       })
       chosen, _, _ := reflect.Select(cases)
      
       return chosen == 0
      }
      
      // 使用select + default实现无阻塞读写
      func tryRead2(ch chan int) (int, bool) {
       select {
       case v, ok := <-ch:
           return v, ok
       default:
           return 0, false
       }
      }
      
      func tryWrite2(ch chan int, data int) bool {
       select {
       case ch <- data:
           return true
       default:
           return false
       }
      }
      
      

      原因是如果select的case中存在default,对chan的读写会使用无阻塞的方法

      func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
       return chansend(c, elem, false, getcallerpc())
      }
      
      func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
       selected, _ = chanrecv(c, elem, false)
       return
      }
      

    相关文章

      网友评论

          本文标题:Go Chan 源码解析

          本文链接:https://www.haomeiwen.com/subject/efmdxrtx.html