美文网首页
go-channel初识

go-channel初识

作者: GGBond_8488 | 来源:发表于2020-03-16 23:21 被阅读0次

    了解过go的都知道,go最为突出的优点就是它天然支持高并发,但是所有高并发情况都面临着一个很明显的问题,就是并发的多线程或多协程之间如何通信,而channel就是go中goroutine通信的‘管道’。

    channel在go中时如何使用的

    package main
    
    import (
      "fmt"
      "os"
      "os/signal"
      "syscall"
      "time"
    )
    
    var exit = make(chan string, 1)
    
    func main() {
      go dealSignal()
      exited := make(chan struct{}, 1)
      go channel1(exited)
      count := 0
      t := time.Tick(time.Second)
    Loop:
      for {
        select {
        case <-t:
          count++
          fmt.Printf("main run %d\n", count)
        case <-exited:
          fmt.Println("main exit begin")
          break Loop
        }
      }
      fmt.Println("main exit end")
    }
    
    func dealSignal() {
      c := make(chan os.Signal, 1)
      signal.Notify(c, os.Interrupt, syscall.SIGTERM)
      go func() {
        <-c
        exit <- "shutdown"
      }()
    }
    
    func channel1(exited chan<- struct{}) {
      t := time.Tick(time.Second)
      count := 0
      for {
        select {
        case <-t:
          count++
          fmt.Printf("channel1 run %d\n", count)
        case <-exit:
          fmt.Println("channel1 exit")
          close(exited)
          return
        }
      }
    }
    

    这个例子首先并发出一个dealsign方法,用来接收关闭信号,如果接收到关闭信号后往exit channel发送一条消息,然后并发运行channel1,channel1中定了一个ticker,正常情况下channel1每秒打印第一个case语句,如果接收到exit的信号,进入第二个case,然后关闭传入的exited channel,那么main中的Loop,接收到exited关闭的信号后,打印“main exit begin”, 然后退出循环,进程成功退出。这个例子演示了channel在goroutine中起到的传递消息的作用。这个例子是为了向大家展示channel在多个goroutine之间进行通信。

    Channel在底层是什么样的

    
    type hchan struct {
      qcount   uint           // total data in the queue;chan中的元素总数
      dataqsiz uint           // size of the circular queue;底层循环数组的size
      buf      unsafe.Pointer // points to an array of dataqsiz elements,指向底层循环数组的指针,只针对有缓冲的channel
      elemsize uint16  //chan中元素的大小
      closed   uint32  //chan是否关闭
      elemtype *_type // element type;元素类型
      sendx    uint   // send index;已发送元素在循环数组中的索引
      recvx    uint   // receive index;已接收元素在循环数组中的索引
      recvq    waitq  // list of recv waiters,等待接收消息的goroutine队列
      sendq    waitq  // list of send waiters,等待发送消息的goroutine队列
    
      // lock protects all fields in hchan, as well as several
      // fields in sudogs blocked on this channel.
      //
      // Do not change another G's status while holding this lock
      // (in particular, do not ready a G), as this can deadlock
      // with stack shrinking.
      lock mutex
    }
    
    type waitq struct {
      first *sudog
      last  *sudog
    }
    

    创建一个底层数组容量为5,元素类型为int,那么channel的数据结构如下图所示:


    创建channel的时候到底发生了什么

    创建channel的时候,其实底层是调用makechan方法,我们来看下源码:

    func makechan(t *chantype, size int) *hchan {
      elem := t.elem
    
      // compiler checks this but be safe.
      if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
      }
      if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
      }
    
      mem, overflow := math.MulUintptr(elem.size, uintptr(size))
      if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
      }
    
      // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
      // buf points into the same allocation, elemtype is persistent.
      // SudoG's are referenced from their owning thread so they can't be collected.
      // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
      var c *hchan
      switch {
      case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
      case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
      default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
      }
    
      c.elemsize = uint16(elem.size)
      c.elemtype = elem
      c.dataqsiz = uint(size)
    
      if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
      }
      return c
    }
    

    从函数原型来看,创建的 chan 是一个指针。所以我们能在函数间直接传递 channel,而不用传递 channel 的指针。

    具体来看下代码:
    可以看出makechan中其实主要的代码就是一个switch,针对不同的情况:

    1、case mem == 0代表无缓冲型channel,只分配hchan本身结构体大小的内存
    2、case elem.ptrdata==0 代表元素类型不含指针,只分配hchan本身结构体大小+元素大小*个数的内存,是连续的内存空间
    3、default元素类型包括指针,两次分配内存的操作

    channel的接收与发送

    func goroutineA(a <-chan int) {
        val := <- a
        fmt.Println("G1 received data: ", val)
        return
    }
    
    func goroutineB(b <-chan int) {
        val := <- b
        fmt.Println("G2 received data: ", val)
        return
    }
    
    func main() {
        ch := make(chan int)
        go goroutineA(ch)
        go goroutineB(ch)
        ch <- 3
        time.Sleep(time.Second)
    }
    

    首先创建了一个无缓冲型的channel,然后启动两个goroutine去消费channel的数据,紧接着向channel中发送数据。我们一步一步来分析channel是如何接收和发送数据的,首先来看接收,golang中接收channel数据有两种方式:

    i <- ch
    i, ok <- ch

    // 位于 src/runtime/chan.go
    
    // chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
    // 如果 ep 是 nil,说明忽略了接收值。
    // 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
    // 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
    // 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
    // 如果 ep 非空,则应该指向堆或者函数调用者的栈
    
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
        // 省略 debug 内容 …………
    
        // 如果是一个 nil 的 channel
        if c == nil {
            // 如果不阻塞,直接返回 (false, false)
            if !block {
                return
            }
            // 否则,接收一个 nil 的 channel,goroutine 挂起
            gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
            // 不会执行到这里
            throw("unreachable")
        }
    
        // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
        // 当我们观察到 channel 没准备好接收:
        // 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
        // 2. 缓冲型,但 buf 里没有元素
        // 之后,又观察到 closed == 0,即 channel 未关闭。
        // 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
        // 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
        if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
            c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
            atomic.Load(&c.closed) == 0 {
            return
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        // 加锁
        lock(&c.lock)
    
        // channel 已关闭,并且循环数组 buf 里没有元素
        // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
        // 也就是说即使是关闭状态,但在缓冲型的 channel,
        // buf 里有元素的情况下还能接收到元素
        if c.closed != 0 && c.qcount == 0 {
            if raceenabled {
                raceacquire(unsafe.Pointer(c))
            }
            // 解锁
            unlock(&c.lock)
            if ep != nil {
                // 从一个已关闭的 channel 执行接收操作,且未忽略返回值
                // 那么接收的值将是一个该类型的零值
                // typedmemclr 根据类型清理相应地址的内存
                typedmemclr(c.elemtype, ep)
            }
            // 从一个已关闭的 channel 接收,selected 会返回true
            return true, false
        }
    
        // 等待发送队列里有 goroutine 存在,说明 buf 是满的
        // 这有可能是:
        // 1. 非缓冲型的 channel
        // 2. 缓冲型的 channel,但 buf 满了
        // 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
        // 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
        if sg := c.sendq.dequeue(); sg != nil {
            // Found a waiting sender. If buffer is size 0, receive value
            // directly from sender. Otherwise, receive from head of queue
            // and add sender's value to the tail of the queue (both map to
            // the same buffer slot because the queue is full).
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    
        // 缓冲型,buf 里有元素,可以正常接收
        if c.qcount > 0 {
            // 直接从循环数组里找到要接收的元素
            qp := chanbuf(c, c.recvx)
    
            // …………
    
            // 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
            if ep != nil {
                typedmemmove(c.elemtype, ep, qp)
            }
            // 清理掉循环数组里相应位置的值
            typedmemclr(c.elemtype, qp)
            // 接收游标向前移动
            c.recvx++
            // 接收游标归零
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            // buf 数组里的元素个数减 1
            c.qcount--
            // 解锁
            unlock(&c.lock)
            return true, true
        }
    
        if !block {
            // 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
            unlock(&c.lock)
            return false, false
        }
    
        // 接下来就是要被阻塞的情况了
        // 构造一个 sudog
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
    
        // 待接收数据的地址保存下来
        mysg.elem = ep
        mysg.waitlink = nil
        gp.waiting = mysg
        mysg.g = gp
        mysg.selectdone = nil
        mysg.c = c
        gp.param = nil
        // 进入channel 的等待接收队列
        c.recvq.enqueue(mysg)
        // 将当前 goroutine 挂起
        goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
    
        // 被唤醒了,接着从这里继续执行一些扫尾工作
        if mysg != gp.waiting {
            throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        if mysg.releasetime > 0 {
            blockevent(mysg.releasetime-t0, 2)
        }
        closed := gp.param == nil
        gp.param = nil
        mysg.c = nil
        releaseSudog(mysg)
        return true, !closed
    }
    
    Step1

    如果channel是nil:如果是非阻塞模式,直接返回(false,false);如果是阻塞模式,调用goprak挂起goroutine,会阻塞下去。

    if c == nil {
        if !block {
          return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
      }
    
    Step2

    快速操作(不用获取锁,快速返回),三组条件全部满足,快速返(false,false)

    条件1:首先是在非阻塞模式下
    条件2:如果是非缓冲型(datasiz=0)并且等待发送goroutine队列为空(sendq.first=nil,就是没人往channel写数据),或者缓冲型channel(datasiz>0)并且buf中没有数据;
    条件3:channel未关闭

    
    //##################step2####################
      if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
      }
    
    Step3

    首先加锁,如果channel已经关闭,并且buf中没有元素,返回对应类型的0值,但是received为false;两种情况

    情形1:非缓冲型,channel已关闭
    情形2:缓冲型,channel已关闭,并且buf无元素

    也就是说即使是关闭状态,但在缓冲型的 channel,
    buf 里有元素的情况下还能接收到元素

    
    //##################step3####################
      lock(&c.lock)
    
      if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
          raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
          typedmemclr(c.elemtype, ep)
        }
        return true, false
      }
    
    step4

    如果等待发送队列中有元素,证明channel已经满了,两种情形

    情形1:非缓冲型,无buf
    情形2:缓冲型,buf满了

    //##################step4####################
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
      }
    

    两种情形都正常进入recv方法,我们来看下源码:

    
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
      //##################step4-1####################
      if c.dataqsiz == 0 {
        if raceenabled {
          racesync(c, sg)
        }
        if ep != nil {
          // copy data from sender
          recvDirect(c.elemtype, sg, ep)
        }
      } else {
         //##################step4-2####################
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        qp := chanbuf(c, c.recvx)
        if raceenabled {
          raceacquire(qp)
          racerelease(qp)
          raceacquireg(sg.g, qp)
          racereleaseg(sg.g, qp)
        }
        // copy data from queue to receiver
        if ep != nil {
          typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
          c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
      }
      sg.elem = nil
      gp := sg.g
      unlockf()
      gp.param = unsafe.Pointer(sg)
      if sg.releasetime != 0 {
        sg.releasetime = cputicks()
      }
      goready(gp, skip+1)
    }
    

    针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)(从发送者的栈copy到接收者的栈)
    针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部.
    然后唤醒等待发送队列中的goroutine,等待调度器调度。

    step5

    没有等待发送的队列,并且buf中有元素,直接把接收游标处的数据copy到接收数据的地址,然后改变hchan中元素数据。

    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
          raceacquire(qp)
          racerelease(qp)
        }
        if ep != nil {
          typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
          c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
      }
    
    step6

    如果是非阻塞,那么直接返回;如果是阻塞的,构造sudog,保存各种值;将sudog保存到channel的recvq中,调用goparkunlock将goroutine挂起

    if !block {
        unlock(&c.lock)
        return false, false
      }
    // no sender available: block on this channel.
      gp := getg()
      mysg := acquireSudog()
      mysg.releasetime = 0
      if t0 != 0 {
        mysg.releasetime = -1
      }
      // No stack splits between assigning elem and enqueuing mysg
      // on gp.waiting where copystack can find it.
      mysg.elem = ep
      mysg.waitlink = nil
      gp.waiting = mysg
      mysg.g = gp
      mysg.isSelect = false
      mysg.c = c
      gp.param = nil
      c.recvq.enqueue(mysg)
      goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
    
      // someone woke us up
      if mysg != gp.waiting {
        throw("G waiting list is corrupted")
      }
      gp.waiting = nil
      if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
      }
      closed := gp.param == nil
      gp.param = nil
      mysg.c = nil
      releaseSudog(mysg)
      return true, !closed
    

    非阻塞接收,解锁。selected 返回 false,因为没有接收到值

    我们继续之前的例子。前面说到第 14 行,创建了一个非缓冲型的 channel,接着,第 15、16 行分别创建了一个 goroutine,各自执行了一个接收操作。通过前面的源码分析,我们知道,这两个 goroutine (后面称为 G1 和 G2 好了)都会被阻塞在接收操作。G1 和 G2 会挂在 channel 的 recq 队列中,形成一个双向循环链表。

    在程序的 17 行之前,chan 的整体数据结构如下:


    buf 指向一个长度为 0 的数组,qcount 为 0,表示 channel 中没有元素。重点关注 recvq 和 sendq,它们是 waitq 结构体,而 waitq 实际上就是一个双向链表,链表的元素是 sudog,里面包含 g 字段,g 表示一个 goroutine,所以 sudog 可以看成一个 goroutine。recvq 存储那些尝试读取 channel 但被阻塞的 goroutine,sendq 则存储那些尝试写入 channel,但被阻塞的 goroutine。

    此时,我们可以看到,recvq 里挂了两个 goroutine,也就是前面启动的 G1 和 G2。因为没有 goroutine 接收,而 channel 又是无缓冲类型,所以 G1 和 G2 被阻塞。sendq 没有被阻塞的 goroutine。

    再从整体上来看一下 chan 此时的状态:


    当一个channel关闭后,我们依然可以从中读出数据,如果chan的buf中有元素,则读出的是chan中buf的数据,如果buf为空,则输出对应元素类型的零值。那么我们来看下如下的一段程序:

    package main
    
    import (
      "fmt"
      "os"
      "os/signal"
      "syscall"
      "time"
    )
    
    var exit1 = make(chan struct{}, 1)
    
    func main() {
      go dealSignal1()
      count := 0
      t := time.Tick(time.Second)
      for {
        select {
        case <-t:
          count++
          fmt.Printf("main run %d\n", count)
        case <-exit1:
          fmt.Println("main exit begin")
        }
      }
      fmt.Println("main exit over")
    }
    
    func dealSignal1() {
      c := make(chan os.Signal, 2)
      signal.Notify(c, os.Interrupt, syscall.SIGTERM)
      go func() {
        <-c
        close(exit1)
      }()
    }
    

    发送

    接着上面的例子,G1 和 G2 现在都在 recvq 队列里了。
    17 行向 channel 发送了一个元素 3。

    发送操作最终转化为 chansend 函数,直接上源码,同样大部分都注释了,可以看懂主流程:

    // 位于 src/runtime/chan.go
    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        // 如果 channel 是 nil
        if c == nil {
            // 不能阻塞,直接返回 false,表示未发送成功
            if !block {
                return false
            }
            // 当前 goroutine 被挂起
            gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
            throw("unreachable")
        }
    
        // 省略 debug 相关……
    
        // 对于不阻塞的 send,快速检测失败场景
        //
        // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
        // 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
        // 2. channel 是缓冲型的,但循环数组已经装满了元素
        if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
            (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
            return false
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        // 锁住 channel,并发安全
        lock(&c.lock)
    
        // 如果 channel 关闭了
        if c.closed != 0 {
            // 解锁
            unlock(&c.lock)
            // 直接 panic
            panic(plainError("send on closed channel"))
        }
    
        // 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
        if sg := c.recvq.dequeue(); sg != nil {
            send(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true
        }
    
        // 对于缓冲型的 channel,如果还有缓冲空间
        if c.qcount < c.dataqsiz {
            // qp 指向 buf 的 sendx 位置
            qp := chanbuf(c, c.sendx)
    
            // ……
    
            // 将数据从 ep 处拷贝到 qp
            typedmemmove(c.elemtype, qp, ep)
            // 发送游标值加 1
            c.sendx++
            // 如果发送游标值等于容量值,游标值归 0
            if c.sendx == c.dataqsiz {
                c.sendx = 0
            }
            // 缓冲区的元素数量加一
            c.qcount++
    
            // 解锁
            unlock(&c.lock)
            return true
        }
    
        // 如果不需要阻塞,则直接返回错误
        if !block {
            unlock(&c.lock)
            return false
        }
    
        // channel 满了,发送方会被阻塞。接下来会构造一个 sudog
    
        // 获取当前 goroutine 的指针
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
    
        mysg.elem = ep
        mysg.waitlink = nil
        mysg.g = gp
        mysg.selectdone = nil
        mysg.c = c
        gp.waiting = mysg
        gp.param = nil
    
        // 当前 goroutine 进入发送等待队列
        c.sendq.enqueue(mysg)
    
        // 当前 goroutine 被挂起
        goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
    
        // 从这里开始被唤醒了(channel 有机会可以发送了)
        if mysg != gp.waiting {
            throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        if gp.param == nil {
            if c.closed == 0 {
                throw("chansend: spurious wakeup")
            }
            // 被唤醒后,channel 关闭了。坑爹啊,panic
            panic(plainError("send on closed channel"))
        }
        gp.param = nil
        if mysg.releasetime > 0 {
            blockevent(mysg.releasetime-t0, 2)
        }
        // 去掉 mysg 上绑定的 channel
        mysg.c = nil
        releaseSudog(mysg)
        return true
    }
    

    我们继续往下走,G1、G2被挂起后,往channel中发送一个数据3,其实调用的是chansend方法,我们还是逐步的去讲解

    step1

    如果channel=nil,当前goroutine会被挂起

    
    if c == nil {
        if !block {
          return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
      }
    
    step2

    依然是一个不加锁的快速操作,三组条件

    条件1:非阻塞
    条件2:channel未关闭
    条件3:channel是非缓冲型,并且等待接收队列为空;或者缓冲型,并且循环数组已经满了

    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
      }
    
    step3

    加锁,如果channel已经关闭,直接panic

    lock(&c.lock)
    
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    
    step4

    如果等待接收队列不为空,说明什么?

    情形1:非缓冲型,等待接收队列不为空
    情形2:缓冲型,等待接收队列不为空(说明buf为空)

    两种情形,都是直接将待发送数据直接copy到接收处

    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)//直接从ep copy到sg
        return true
    }
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
      if raceenabled {
        if c.dataqsiz == 0 {
          racesync(c, sg)
        } else {
          // Pretend we go through the buffer, even though
          // we copy directly. Note that we need to increment
          // the head/tail locations only when raceenabled.
          qp := chanbuf(c, c.recvx)
          raceacquire(qp)
          racerelease(qp)
          raceacquireg(sg.g, qp)
          racereleaseg(sg.g, qp)
          c.recvx++
          if c.recvx == c.dataqsiz {
            c.recvx = 0
          }
          c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
      }
      if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
      }
      gp := sg.g
      unlockf()
      gp.param = unsafe.Pointer(sg)
      if sg.releasetime != 0 {
        sg.releasetime = cputicks()
      }
      goready(gp, skip+1)
    }
    

    两种情形,都直接从一个用一个goroutine操作另一个goroutine的栈,因此在sendDirect方法中会有一次写屏障

    step5

    如果等待队列为空,并且缓冲区未满,肯定是缓冲型的channel

    
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
          raceacquire(qp)
          racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
          c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
      }
    

    将元素放在sendx处,然后sendx加1,channel总量加1

    step6

    如果以上情况都没有命中,说明什么?说明channel已经满了,如果是非阻塞的直接返回,否则需要调用gopack将这个goroutine挂起,等待被唤醒。

    if !block {
        unlock(&c.lock)
        return false
      }
    
      // Block on the channel. Some receiver will complete our operation for us.
      gp := getg()
      mysg := acquireSudog()
      mysg.releasetime = 0
      if t0 != 0 {
        mysg.releasetime = -1
      }
      // No stack splits between assigning elem and enqueuing mysg
      // on gp.waiting where copystack can find it.
      mysg.elem = ep
      mysg.waitlink = nil
      mysg.g = gp
      mysg.isSelect = false
      mysg.c = c
      gp.waiting = mysg
      gp.param = nil
      c.sendq.enqueue(mysg)
      goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
      // Ensure the value being sent is kept alive until the
      // receiver copies it out. The sudog has a pointer to the
      // stack object, but sudogs aren't considered as roots of the
      // stack tracer.
      KeepAlive(ep)
    

    我们对照程序分析下,在前一个小节G1、G2被挂起来了,等待sender的解救;这时候往ch中发送了一个3,(step4)这时sender发现ch的等待接收队列recvq中有receiver,就会出队一个sudog,然后将元素直接copy到sudog的elem处,然后调用goready将G1唤醒,继续执行G1原来的代码,打印出结果。如下图:



    当调度器光顾 G1 时,将 G1 变成 running 状态,执行 goroutineA 接下来的代码。G 表示其他可能有的 goroutine。

    这里其实涉及到一个协程写另一个协程栈的操作。有两个 receiver 在 channel 的一边虎视眈眈地等着,这时 channel 另一边来了一个 sender 准备向 channel 发送数据,为了高效,用不着通过 channel 的 buf “中转”一次,直接从源地址把数据 copy 到目的地址就可以了,效率高啊!

    关闭

    close一个channel会调用closechan方法,比较简单,我们也来看下

    func closechan(c *hchan) {
        // 关闭一个 nil channel,panic
        if c == nil {
            panic(plainError("close of nil channel"))
        }
    
        // 上锁
        lock(&c.lock)
        // 如果 channel 已经关闭
        if c.closed != 0 {
            unlock(&c.lock)
            // panic
            panic(plainError("close of closed channel"))
        }
    
        // …………
    
        // 修改关闭状态
        c.closed = 1
    
        var glist *g
    
        // 将 channel 所有等待接收队列的里 sudog 释放
        for {
            // 从接收队列里出队一个 sudog
            sg := c.recvq.dequeue()
            // 出队完毕,跳出循环
            if sg == nil {
                break
            }
    
            // 如果 elem 不为空,说明此 receiver 未忽略接收数据
            // 给它赋一个相应类型的零值
            if sg.elem != nil {
                typedmemclr(c.elemtype, sg.elem)
                sg.elem = nil
            }
            if sg.releasetime != 0 {
                sg.releasetime = cputicks()
            }
            // 取出 goroutine
            gp := sg.g
            gp.param = nil
            if raceenabled {
                raceacquireg(gp, unsafe.Pointer(c))
            }
            // 相连,形成链表
            gp.schedlink.set(glist)
            glist = gp
        }
    
        // 将 channel 等待发送队列里的 sudog 释放
        // 如果存在,这些 goroutine 将会 panic
        for {
            // 从发送队列里出队一个 sudog
            sg := c.sendq.dequeue()
            if sg == nil {
                break
            }
    
            // 发送者会 panic
            sg.elem = nil
            if sg.releasetime != 0 {
                sg.releasetime = cputicks()
            }
            gp := sg.g
            gp.param = nil
            if raceenabled {
                raceacquireg(gp, unsafe.Pointer(c))
            }
            // 形成链表
            gp.schedlink.set(glist)
            glist = gp
        }
        // 解锁
        unlock(&c.lock)
    
        // Ready all Gs now that we've dropped the channel lock.
        // 遍历链表
        for glist != nil {
            // 取最后一个
            gp := glist
            // 向前走一步,下一个唤醒的 g
            glist = glist.schedlink.ptr()
            gp.schedlink = 0
            // 唤醒相应 goroutine
            goready(gp, 3)
        }
    }
    
    step1

    如果channel为nil,会直接panic

    if c == nil {
        panic(plainError("close of nil channel"))
      }
    
    step2

    加锁,如果channel已经关闭,再次关闭会panic

    
    lock(&c.lock)
      if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
      }
    
    step3

    首选将hchan对应close标志置为1,然后声明一个链表;将等待接收队列中的所有sudog加入到链表,并将其elem赋予一个相应类型的0值;

    
    c.closed = 1
    
      var glist gList
    
      // release all readers
      for {
        sg := c.recvq.dequeue()
        if sg == nil {
          break
        }
        if sg.elem != nil {
          typedmemclr(c.elemtype, sg.elem)
          sg.elem = nil
        }
        if sg.releasetime != 0 {
          sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
          raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
      }
    
    step4

    向所有等待发送队列的sudog加入链表

    
    // release all writers (they will panic)
      for {
        sg := c.sendq.dequeue()
        if sg == nil {
          break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
          sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
          raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
      }
      unlock(&c.lock)
    
    step5

    唤醒sudog所有goroutine

    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
      }
    

    close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中分别保存了阻塞的发送者和接收者。关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。所以,在不了解 channel 还有没有接收者的情况下,不能贸然关闭 channel。

    close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒。

    唤醒之后,该干嘛干嘛。sender 会继续执行 chansend 函数里 goparkunlock 函数之后的代码,很不幸,检测到 channel 已经关闭了,panic。receiver 则比较幸运,进行一些扫尾工作后,返回。这里,selected 返回 true,而返回值 received 则要根据 channel 是否关闭,返回不同的值。如果 channel 关闭,received 为 false,否则为 true。

    总结

    总结一下,发生 panic 的情况有三种:

    1.向一个关闭的 channel 进行写操作;

    1. 关闭一个 nil 的 channel;
    2. 重复关闭一个 channel。

    读、写一个 nil channel 都会被阻塞。

    channel发送和接收元素的本质还是值得拷贝
    channel是并发安全的(加锁)

    参考:博客园-深度解密Go语言只channel
    好未来Golang源码系列三:Channel实现原理分析

    相关文章

      网友评论

          本文标题:go-channel初识

          本文链接:https://www.haomeiwen.com/subject/bfxaehtx.html