美文网首页
Golang Channel实现

Golang Channel实现

作者: 梦工厂 | 来源:发表于2020-10-28 01:28 被阅读0次

Channel是Golang实现CSP的核心。

基于channel通信主要涉及buf(数据)和sendq、recvq(维护阻塞的G),lock保证并发访问安全;
本质是一个基于环形缓存的有锁队列,但G的阻塞是在用户空间;

图片来源:https://i6448038.github.io/2019/04/11/go-channel/

目录
新建channel
发送数据
协程直接发送数据
接收数据
协程直接接收数据
关闭channel
Select原理

新建channel

channel的运行时结构是runtime.hchan
make chan在创建channel的时候会在该进程的heap区申请一块内存,创建一个hchan结构体,返回执行该内存的指针,所以获取的的ch变量本身就是一个指针,在函数之间传递的时候是同一个channel。

type hchan struct {
   qcount   uint           // total data in the queue   长度
   dataqsiz uint           // size of the circular queue 容量
   buf      unsafe.Pointer // points to an array of dataqsiz  elements 环形队列
   elemsize uint16
   closed   uint32
   elemtype *_type // element type
   sendx    uint   // send index 环形数组的index
   recvx    uint   // receive index
   recvq    waitq  // list of recv waiters
   sendq    waitq  // list of send waiters

   // lock protects all fields in hchan, as well as several
   // fields in sudogs blocked on this channel.
   //
   // Do not change another G's status while holding this lock
   // (in particular, do not ready a G), as this can deadlock
   // with stack shrinking.
   lock mutex
}

//FIFO的队列
type waitq struct {
   first *sudog //sudog represents a g in a wait list, such as for sending/receiving on a channel.
   last  *sudog
}

发送数据

  1. 加锁;
  2. 存在等待的接受者时,直接发给接收者;
  3. 缓冲区存在剩余空间时,写入缓冲区;
  4. 不存在缓冲区或者满了的情况下,挂在sendq上;
  5. 被阻塞的发送者,接收者会负责消息的传输,所以被唤醒后进行收尾工作;
/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   //向nil的channel发消息会持续阻塞
   if c == nil {
      if !block {
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   //获取channel的锁
   lock(&c.lock)

   //向close的channel发消息会Panic
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

   //已有g阻塞在接收队列,直接发消息,绕过channel的buf; (没有缓冲也就是这样了)
   if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   // 没有阻塞的
   // 没满,加入buf,然后返回;
   if c.qcount < c.dataqsiz {
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx) //返回位置的指针
      typedmemmove(c.elemtype, qp, ep) //数据拷贝
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }

   // 满了,发送方会阻塞
   // Block on the channel. Some receiver will complete our operation for us.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep  //发送的数据地址
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg) //当前g+数据封装的mysg,挂在channel的发送队列上;
   //当前协程用户态阻塞,释放lock
   goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
   // Ensure the value being sent is kept alive until the
   // receiver copies it out. The sudog has a pointer to the
   // stack object, but sudogs aren't considered as roots of the
   // stack tracer.
   KeepAlive(ep)

   // someone woke us up.
   // 重新恢复调度,此时以及不需要传输数据了,因为数据以及被接受了,释放资源即可;
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if gp.param == nil {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   return true
}

协程直接发送数据

如果存在挂在channel的接收者时,发送者直接将数据传输给最早的接收者FIFO,绕过环形缓存;

send(c, sg, ep, func() { unlock(&c.lock) }, 3)

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if sg.elem != nil { //接收者的变量
      sendDirect(c.elemtype, sg, ep)//直接拷贝过去
      sg.elem = nil
   }
   gp := sg.g
   unlockf()//拷贝完毕再释放channel锁,避免多个发送者;
   gp.param = unsafe.Pointer(sg)
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1)//唤醒接受者
}

接收数据

  1. 加锁;
  2. channel关闭&数据为空,返回零值;
  3. 如果有挂在sendq的发送者,从环形缓存拿到第一个数据,然后帮发送者将数据写入环形缓存的末尾;和发送时绕过缓存不同,保证消息FIFO,避免缓存的数据被饿死;
  4. 从环形缓存中接收数据;
  5. 数据为空,挂在recvq上;被唤醒,收尾工作;
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
   chanrecv(c, elem, true)
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   // 向nil发消息普通会阻塞,select直接返回;
   if c == nil {
      if !block {
         return
      }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }


   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   // 获取channel的锁
   lock(&c.lock)

   // case1:channel关闭&数据为空,清空ep->拿到零值,返回;
   if c.closed != 0 && c.qcount == 0 {
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return true, false
   }
    
   // channel关闭&数据不为空   ||  channel没关闭

   // channel已满的情况,直接接收阻塞的发送者消息,绕过channel;
   if sg := c.sendq.dequeue(); sg != nil {
      // Found a waiting sender. If buffer is size 0, receive value
      // directly from sender. Otherwise, receive from head of queue
      // and add sender's value to the tail of the queue (both map to
      // the same buffer slot because the queue is full).
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }

//有数据
   if c.qcount > 0 {
      // Receive directly from queue
      qp := chanbuf(c, c.recvx) //位置
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp) //数据copy
      }
      typedmemclr(c.elemtype, qp)//清楚buf的数据
      c.recvx++ //更改位置
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
   }

//没数据
   // no sender available: block on this channel.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg) //封装mysg信息,阻塞在recvq队列;
   //让出调度
   goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
   //恢复调度,此时已经接受了数据,做收尾工作。
   // someone woke us up
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   closed := gp.param == nil //没关闭会赋值mysg的地址
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return true, !closed
}

协程直接接收数据

对于带缓冲的channel,此处接收者和发送者并没有直接数据传输。

recv(c, sg, ep, func() { unlock(&c.lock) }, 3)

// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
//    and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
//    written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if c.dataqsiz == 0 {
      if ep != nil {
         // copy data from sender
         recvDirect(c.elemtype, sg, ep)
      }
   } else {
      // Queue is full. Take the item at the
      // head of the queue. Make the sender enqueue
      // its item at the tail of the queue. Since the
      // queue is full, those are both the same slot.
      //接收先拿buf的数据,然后将发送者的数据放到buf中。
         //避免数据buf的数据被饿死;发的时候不用,因为buf是空的。
      qp := chanbuf(c, c.recvx)
      // copy data from queue to receiver
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // copy data from sender to queue
      typedmemmove(c.elemtype, qp, sg.elem)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   }
   sg.elem = nil
   gp := sg.g
   unlockf() //释放锁
   gp.param = unsafe.Pointer(sg)
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1) //唤醒发送者
}

关闭channel

主要是处理channel的recvq和sendq队列:recvq会拿到零值,sendq中的G都是在关闭之前阻塞的;

//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose(c *hchan) {
   closechan(c)
}

func closechan(c *hchan) {
    // 关闭nil,panic
   if c == nil {
      panic(plainError("close of nil channel"))
   }

   // 加锁
   lock(&c.lock)
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("close of closed channel")) //重复关闭
   }

   c.closed = 1

   var glist gList

   // release all readers
    //如果有recvq,此时的buf肯定是空的,相当于给零值然后唤醒;
   for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil //此时才为nil,被唤醒的g就知道是否关闭了。
      glist.push(gp)
   }

   // release all writers (they will panic)
   // 如果有sendq,
   for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
      }
      sg.elem = nil 
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil
      glist.push(gp)
   }

   //释放锁
   unlock(&c.lock)

   // Ready all Gs now that we've dropped the channel lock. 唤醒
   for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}

Select原理

  • 特点
    1. 可以在channel上进行非阻塞的收发操作;
    2. 遇到多个channel同时响应时,随机选择case执行,避免饥饿;
  • 实现 https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
    1. 随机生成一个遍历的轮询顺序 pollOrder 并根据 Channel 地址生成锁定顺序 lockOrder
    2. 根据 pollOrder 遍历所有的 case 查看是否有可以立刻处理的 Channel;
      1. 如果存在就直接获取 case 对应的索引并返回;
      2. 如果不存在就会创建 runtime.sudog 结构体,将当前 Goroutine 加入到所有相关 Channel 的收发队列,并调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒;
    3. 当调度器唤醒当前 Goroutine 时就会再次按照 lockOrder 遍历所有的 case,从中查找需要被处理的 runtime.sudog 结构对应的索引;
// compiler implements
//
// select {
// case v = <-c:
//    ... foo
// default:
//    ... bar
// }
//
// as
//
// if selectnbrecv(&v, c) {
//    ... foo
// } else {
//    ... bar
// }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
   selected, _ = chanrecv(c, elem, false)  //非阻塞
   return
}

资料
图解Go的channel底层实现
深入理解Golang Channel
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#64-channel

相关文章

网友评论

      本文标题:Golang Channel实现

      本文链接:https://www.haomeiwen.com/subject/omnfvktx.html