美文网首页
Golang Channel实现

Golang Channel实现

作者: 梦工厂 | 来源:发表于2020-10-28 01:28 被阅读0次

    Channel是Golang实现CSP的核心。

    基于channel通信主要涉及buf(数据)和sendq、recvq(维护阻塞的G),lock保证并发访问安全;
    本质是一个基于环形缓存的有锁队列,但G的阻塞是在用户空间;

    图片来源:https://i6448038.github.io/2019/04/11/go-channel/

    目录
    新建channel
    发送数据
    协程直接发送数据
    接收数据
    协程直接接收数据
    关闭channel
    Select原理
    

    新建channel

    channel的运行时结构是runtime.hchan
    make chan在创建channel的时候会在该进程的heap区申请一块内存,创建一个hchan结构体,返回执行该内存的指针,所以获取的的ch变量本身就是一个指针,在函数之间传递的时候是同一个channel。

    type hchan struct {
       qcount   uint           // total data in the queue   长度
       dataqsiz uint           // size of the circular queue 容量
       buf      unsafe.Pointer // points to an array of dataqsiz  elements 环形队列
       elemsize uint16
       closed   uint32
       elemtype *_type // element type
       sendx    uint   // send index 环形数组的index
       recvx    uint   // receive index
       recvq    waitq  // list of recv waiters
       sendq    waitq  // list of send waiters
    
       // lock protects all fields in hchan, as well as several
       // fields in sudogs blocked on this channel.
       //
       // Do not change another G's status while holding this lock
       // (in particular, do not ready a G), as this can deadlock
       // with stack shrinking.
       lock mutex
    }
    
    //FIFO的队列
    type waitq struct {
       first *sudog //sudog represents a g in a wait list, such as for sending/receiving on a channel.
       last  *sudog
    }
    

    发送数据

    1. 加锁;
    2. 存在等待的接受者时,直接发给接收者;
    3. 缓冲区存在剩余空间时,写入缓冲区;
    4. 不存在缓冲区或者满了的情况下,挂在sendq上;
    5. 被阻塞的发送者,接收者会负责消息的传输,所以被唤醒后进行收尾工作;
    /*
     * generic single channel send/recv
     * If block is not nil,
     * then the protocol will not
     * sleep but return if it could
     * not complete.
     *
     * sleep can wake up with g.param == nil
     * when a channel involved in the sleep has
     * been closed.  it is easiest to loop and re-run
     * the operation; we'll see that it's now closed.
     */
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
       //向nil的channel发消息会持续阻塞
       if c == nil {
          if !block {
             return false
          }
          gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
          throw("unreachable")
       }
    
       var t0 int64
       if blockprofilerate > 0 {
          t0 = cputicks()
       }
    
       //获取channel的锁
       lock(&c.lock)
    
       //向close的channel发消息会Panic
       if c.closed != 0 {
          unlock(&c.lock)
          panic(plainError("send on closed channel"))
       }
    
       //已有g阻塞在接收队列,直接发消息,绕过channel的buf; (没有缓冲也就是这样了)
       if sg := c.recvq.dequeue(); sg != nil {
          // Found a waiting receiver. We pass the value we want to send
          // directly to the receiver, bypassing the channel buffer (if any).
          send(c, sg, ep, func() { unlock(&c.lock) }, 3)
          return true
       }
    
       // 没有阻塞的
       // 没满,加入buf,然后返回;
       if c.qcount < c.dataqsiz {
          // Space is available in the channel buffer. Enqueue the element to send.
          qp := chanbuf(c, c.sendx) //返回位置的指针
          typedmemmove(c.elemtype, qp, ep) //数据拷贝
          c.sendx++
          if c.sendx == c.dataqsiz {
             c.sendx = 0
          }
          c.qcount++
          unlock(&c.lock)
          return true
       }
    
       // 满了,发送方会阻塞
       // Block on the channel. Some receiver will complete our operation for us.
       gp := getg()
       mysg := acquireSudog()
       mysg.releasetime = 0
       if t0 != 0 {
          mysg.releasetime = -1
       }
       // No stack splits between assigning elem and enqueuing mysg
       // on gp.waiting where copystack can find it.
       mysg.elem = ep  //发送的数据地址
       mysg.waitlink = nil
       mysg.g = gp
       mysg.isSelect = false
       mysg.c = c
       gp.waiting = mysg
       gp.param = nil
       c.sendq.enqueue(mysg) //当前g+数据封装的mysg,挂在channel的发送队列上;
       //当前协程用户态阻塞,释放lock
       goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
       // Ensure the value being sent is kept alive until the
       // receiver copies it out. The sudog has a pointer to the
       // stack object, but sudogs aren't considered as roots of the
       // stack tracer.
       KeepAlive(ep)
    
       // someone woke us up.
       // 重新恢复调度,此时以及不需要传输数据了,因为数据以及被接受了,释放资源即可;
       if mysg != gp.waiting {
          throw("G waiting list is corrupted")
       }
       gp.waiting = nil
       if gp.param == nil {
          if c.closed == 0 {
             throw("chansend: spurious wakeup")
          }
          panic(plainError("send on closed channel"))
       }
       gp.param = nil
       if mysg.releasetime > 0 {
          blockevent(mysg.releasetime-t0, 2)
       }
       mysg.c = nil
       releaseSudog(mysg)
       return true
    }
    

    协程直接发送数据

    如果存在挂在channel的接收者时,发送者直接将数据传输给最早的接收者FIFO,绕过环形缓存;

    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    
    // send processes a send operation on an empty channel c.
    // The value ep sent by the sender is copied to the receiver sg.
    // The receiver is then woken up to go on its merry way.
    // Channel c must be empty and locked.  send unlocks c with unlockf.
    // sg must already be dequeued from c.
    // ep must be non-nil and point to the heap or the caller's stack.
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
       if sg.elem != nil { //接收者的变量
          sendDirect(c.elemtype, sg, ep)//直接拷贝过去
          sg.elem = nil
       }
       gp := sg.g
       unlockf()//拷贝完毕再释放channel锁,避免多个发送者;
       gp.param = unsafe.Pointer(sg)
       if sg.releasetime != 0 {
          sg.releasetime = cputicks()
       }
       goready(gp, skip+1)//唤醒接受者
    }
    

    接收数据

    1. 加锁;
    2. channel关闭&数据为空,返回零值;
    3. 如果有挂在sendq的发送者,从环形缓存拿到第一个数据,然后帮发送者将数据写入环形缓存的末尾;和发送时绕过缓存不同,保证消息FIFO,避免缓存的数据被饿死;
    4. 从环形缓存中接收数据;
    5. 数据为空,挂在recvq上;被唤醒,收尾工作;
    // entry points for <- c from compiled code
    //go:nosplit
    func chanrecv1(c *hchan, elem unsafe.Pointer) {
       chanrecv(c, elem, true)
    }
    // chanrecv receives on channel c and writes the received data to ep.
    // ep may be nil, in which case received data is ignored.
    // If block == false and no elements are available, returns (false, false).
    // Otherwise, if c is closed, zeros *ep and returns (true, false).
    // Otherwise, fills in *ep with an element and returns (true, true).
    // A non-nil ep must point to the heap or the caller's stack.
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
       // 向nil发消息普通会阻塞,select直接返回;
       if c == nil {
          if !block {
             return
          }
          gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
          throw("unreachable")
       }
    
    
       var t0 int64
       if blockprofilerate > 0 {
          t0 = cputicks()
       }
    
       // 获取channel的锁
       lock(&c.lock)
    
       // case1:channel关闭&数据为空,清空ep->拿到零值,返回;
       if c.closed != 0 && c.qcount == 0 {
          unlock(&c.lock)
          if ep != nil {
             typedmemclr(c.elemtype, ep)
          }
          return true, false
       }
        
       // channel关闭&数据不为空   ||  channel没关闭
    
       // channel已满的情况,直接接收阻塞的发送者消息,绕过channel;
       if sg := c.sendq.dequeue(); sg != nil {
          // Found a waiting sender. If buffer is size 0, receive value
          // directly from sender. Otherwise, receive from head of queue
          // and add sender's value to the tail of the queue (both map to
          // the same buffer slot because the queue is full).
          recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
          return true, true
       }
    
    //有数据
       if c.qcount > 0 {
          // Receive directly from queue
          qp := chanbuf(c, c.recvx) //位置
          if ep != nil {
             typedmemmove(c.elemtype, ep, qp) //数据copy
          }
          typedmemclr(c.elemtype, qp)//清楚buf的数据
          c.recvx++ //更改位置
          if c.recvx == c.dataqsiz {
             c.recvx = 0
          }
          c.qcount--
          unlock(&c.lock)
          return true, true
       }
    
    //没数据
       // no sender available: block on this channel.
       gp := getg()
       mysg := acquireSudog()
       mysg.releasetime = 0
       if t0 != 0 {
          mysg.releasetime = -1
       }
       // No stack splits between assigning elem and enqueuing mysg
       // on gp.waiting where copystack can find it.
       mysg.elem = ep
       mysg.waitlink = nil
       gp.waiting = mysg
       mysg.g = gp
       mysg.isSelect = false
       mysg.c = c
       gp.param = nil
       c.recvq.enqueue(mysg) //封装mysg信息,阻塞在recvq队列;
       //让出调度
       goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
       //恢复调度,此时已经接受了数据,做收尾工作。
       // someone woke us up
       if mysg != gp.waiting {
          throw("G waiting list is corrupted")
       }
       gp.waiting = nil
       if mysg.releasetime > 0 {
          blockevent(mysg.releasetime-t0, 2)
       }
       closed := gp.param == nil //没关闭会赋值mysg的地址
       gp.param = nil
       mysg.c = nil
       releaseSudog(mysg)
       return true, !closed
    }
    

    协程直接接收数据

    对于带缓冲的channel,此处接收者和发送者并没有直接数据传输。

    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    
    // recv processes a receive operation on a full channel c.
    // There are 2 parts:
    // 1) The value sent by the sender sg is put into the channel
    //    and the sender is woken up to go on its merry way.
    // 2) The value received by the receiver (the current G) is
    //    written to ep.
    // For synchronous channels, both values are the same.
    // For asynchronous channels, the receiver gets its data from
    // the channel buffer and the sender's data is put in the
    // channel buffer.
    // Channel c must be full and locked. recv unlocks c with unlockf.
    // sg must already be dequeued from c.
    // A non-nil ep must point to the heap or the caller's stack.
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
       if c.dataqsiz == 0 {
          if ep != nil {
             // copy data from sender
             recvDirect(c.elemtype, sg, ep)
          }
       } else {
          // Queue is full. Take the item at the
          // head of the queue. Make the sender enqueue
          // its item at the tail of the queue. Since the
          // queue is full, those are both the same slot.
          //接收先拿buf的数据,然后将发送者的数据放到buf中。
             //避免数据buf的数据被饿死;发的时候不用,因为buf是空的。
          qp := chanbuf(c, c.recvx)
          // copy data from queue to receiver
          if ep != nil {
             typedmemmove(c.elemtype, ep, qp)
          }
          // copy data from sender to queue
          typedmemmove(c.elemtype, qp, sg.elem)
          c.recvx++
          if c.recvx == c.dataqsiz {
             c.recvx = 0
          }
          c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
       }
       sg.elem = nil
       gp := sg.g
       unlockf() //释放锁
       gp.param = unsafe.Pointer(sg)
       if sg.releasetime != 0 {
          sg.releasetime = cputicks()
       }
       goready(gp, skip+1) //唤醒发送者
    }
    

    关闭channel

    主要是处理channel的recvq和sendq队列:recvq会拿到零值,sendq中的G都是在关闭之前阻塞的;

    //go:linkname reflect_chanclose reflect.chanclose
    func reflect_chanclose(c *hchan) {
       closechan(c)
    }
    
    func closechan(c *hchan) {
        // 关闭nil,panic
       if c == nil {
          panic(plainError("close of nil channel"))
       }
    
       // 加锁
       lock(&c.lock)
       if c.closed != 0 {
          unlock(&c.lock)
          panic(plainError("close of closed channel")) //重复关闭
       }
    
       c.closed = 1
    
       var glist gList
    
       // release all readers
        //如果有recvq,此时的buf肯定是空的,相当于给零值然后唤醒;
       for {
          sg := c.recvq.dequeue()
          if sg == nil {
             break
          }
          if sg.elem != nil {
             typedmemclr(c.elemtype, sg.elem)
             sg.elem = nil
          }
          if sg.releasetime != 0 {
             sg.releasetime = cputicks()
          }
          gp := sg.g
          gp.param = nil //此时才为nil,被唤醒的g就知道是否关闭了。
          glist.push(gp)
       }
    
       // release all writers (they will panic)
       // 如果有sendq,
       for {
          sg := c.sendq.dequeue()
          if sg == nil {
             break
          }
          sg.elem = nil 
          if sg.releasetime != 0 {
             sg.releasetime = cputicks()
          }
          gp := sg.g
          gp.param = nil
          glist.push(gp)
       }
    
       //释放锁
       unlock(&c.lock)
    
       // Ready all Gs now that we've dropped the channel lock. 唤醒
       for !glist.empty() {
          gp := glist.pop()
          gp.schedlink = 0
          goready(gp, 3)
       }
    }
    

    Select原理

    • 特点
      1. 可以在channel上进行非阻塞的收发操作;
      2. 遇到多个channel同时响应时,随机选择case执行,避免饥饿;
    • 实现 https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
      1. 随机生成一个遍历的轮询顺序 pollOrder 并根据 Channel 地址生成锁定顺序 lockOrder
      2. 根据 pollOrder 遍历所有的 case 查看是否有可以立刻处理的 Channel;
        1. 如果存在就直接获取 case 对应的索引并返回;
        2. 如果不存在就会创建 runtime.sudog 结构体,将当前 Goroutine 加入到所有相关 Channel 的收发队列,并调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒;
      3. 当调度器唤醒当前 Goroutine 时就会再次按照 lockOrder 遍历所有的 case,从中查找需要被处理的 runtime.sudog 结构对应的索引;
    // compiler implements
    //
    // select {
    // case v = <-c:
    //    ... foo
    // default:
    //    ... bar
    // }
    //
    // as
    //
    // if selectnbrecv(&v, c) {
    //    ... foo
    // } else {
    //    ... bar
    // }
    //
    func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
       selected, _ = chanrecv(c, elem, false)  //非阻塞
       return
    }
    

    资料
    图解Go的channel底层实现
    深入理解Golang Channel
    https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#64-channel

    相关文章

      网友评论

          本文标题:Golang Channel实现

          本文链接:https://www.haomeiwen.com/subject/omnfvktx.html