美文网首页
golang - channel

golang - channel

作者: husky_1 | 来源:发表于2022-04-15 12:04 被阅读0次

1. 原理

hchan

通过var声明或者make函数创建的channel变量是一个存储在函数栈帧上的指针,占用8个字节,指向堆上的hchan结构体
源码包中src/runtime/chan.go定义了hchan的数据结构如下:


hchan
type hchan struct {
    qcount   uint           // total data in the queue   循环数组中的元素数量
    dataqsiz uint           // size of the circular queue  循环数组的长度
        //channel分为无缓冲和有缓冲两种。
       // 对于有缓冲的channel存储数据,使用了 ring buffer(环形缓冲区) 来缓存写入的数据,本质是循环数组
       // 为啥是循环数组?普通数组不行吗,普通数组容量固定更适合指定的空间,弹出元素时,普通数组需要全部都前移
        // 当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
    buf      unsafe.Pointer // points to an array of dataqsiz elements  指向底层循环数组的指针(环形缓冲区)
    elemsize uint16     //元素的大小
    closed   uint32       //channel是否关闭的标志
    elemtype *_type // element type  channel中的元素类型
    sendx    uint   // send index   // 下一次写下标的位置
    recvx    uint   // receive index    // 下一次读下标的位置
    recvq    waitq  // list of recv waiters  // 读等待队列
    sendq    waitq  // list of send waiters  // 写等待队列

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex    //互斥锁,保证读写channel时不存在并发竞争问题
}

hchan结构体的主要组成部分有四个:
用来保存goroutine之间传递数据的循环数组:buf
用来记录此循环数组当前发送或接收数据的下标值:sendx和recvx
用于保存向该chan发送和从该chan接收数据被阻塞的goroutine队列: sendq 和 recvq
保证channel写入和读取数据时线程安全的锁:lock

环形数组

环形数组作为channel 的缓冲区 数组的长度就是定义channnel 时channel 的缓冲大小


等待队列 waitq

在hchan 中包括了读/写 等待队列, waitq是一个双向队列,包括了一个头结点和尾节点。 每个节点是一个sudog结构体变量

type waitq struct {
    first *sudog
    last  *sudog
}


type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool

    // success indicates whether communication over channel c
    // succeeded. It is true if the goroutine was awoken because a
    // value was delivered over channel c, and false if awoken
    // because c was closed.
    success bool

    parent   *sudog // semaRoot binary tree
    waitlink *sudog // g.waiting list or semaRoot
    waittail *sudog // semaRoot
    c        *hchan // channel
}
操作
  • 创建
    使用 make(chan T, cap) 来创建 channel,channel 可分为带缓冲和不带缓冲的, cap 就是缓冲区的大小

    // 带缓冲,缓冲大小为3
    ch := make(chan int, 3)
    // 不带缓冲
    ch := make(chan int)
    

    make 语法会在编译时,转换为 makechan64 和 makechan

    // 源码
    func makechan64(t *chantype, size int64) *hchan {
      if int64(int(size)) != size {
          panic(plainError("makechan: size out of range"))
      }
    
      return makechan(t, int(size))
    }
    
    func makechan(t *chantype, size int) *hchan {
      elem := t.elem
    
      // compiler checks this but be safe.
      if elem.size >= 1<<16 {
          throw("makechan: invalid channel element type")
      }
      if hchanSize%maxAlign != 0 || elem.align > maxAlign {
          throw("makechan: bad alignment")
      }
    
      mem, overflow := math.MulUintptr(elem.size, uintptr(size))
      if overflow || mem > maxAlloc-hchanSize || size < 0 {
          panic(plainError("makechan: size out of range"))
      }
    
      // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
      // buf points into the same allocation, elemtype is persistent.
      // SudoG's are referenced from their owning thread so they can't be collected.
      // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
      var c *hchan
      switch {
      case mem == 0:
          // Queue or element size is zero.
          c = (*hchan)(mallocgc(hchanSize, nil, true))
          // Race detector uses this location for synchronization.
          c.buf = c.raceaddr()
      case elem.ptrdata == 0:
          // Elements do not contain pointers.
          // Allocate hchan and buf in one call.
          c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
          c.buf = add(unsafe.Pointer(c), hchanSize)
      default:
          // Elements contain pointers.
          c = new(hchan)
          c.buf = mallocgc(mem, elem, true)
      }
    
      c.elemsize = uint16(elem.size)
      c.elemtype = elem
      c.dataqsiz = uint(size)
      lockInit(&c.lock, lockRankHchan)
    
      if debugChan {
          print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
      }
      return c
    }
    

    创建时的会做一些检查:

    • 元素大小不能超过 64K
    • 元素的对齐大小不能超过 maxAlign 也就是 8 字节
    • 计算出来的内存是否超过限制

    创建时的策略:

    • 如果是无缓冲的 channel,会直接给 hchan 分配内存
    • 如果是有缓冲的 channel,并且元素不包含指针,那么会为 hchan 和底层数组分配一段连续的地址
    • 如果是有缓冲的 channel,并且元素包含指针,那么会为 hchan 和底层数组分别分配地址
  • 发送
    发送操作,编译时转换为runtime.chansend函数

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 
    

    通过block 参数,确认是否是阻塞式发送
    阻塞式:调用chansend 函数时 block=true

        ch<- 1
    

    非阻塞式:调用chansend 函数时 block=false

    select {
     case ch <- 10:
     ...
    
    default
    }
    

    向 channel 中发送数据时大概分为两大块:检查和数据发送,数据发送流程如下:

    • 如果 channel 的读等待队列存在接收者goroutine
      将数据直接发送给第一个等待的 goroutine, 唤醒接收的 goroutine
    • 如果 channel 的读等待队列不存在接收者goroutine
      a. 如果循环数组buf未满,那么将会把数据发送到循环数组buf的队尾
      b. 如果循环数组buf已满,这个时候就会走阻塞发送的流程,将当前 goroutine 加入写等待队列,并挂起等待唤醒
  • 接收
    接收操作,编译时转换为runtime.chanrecv函数

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 
    

    阻塞式:

    调用chanrecv函数,并且block=true

    <ch
    
    v := <ch
    
    v, ok := <ch
    
    // 当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的  channel,造成读到数据为通道所存储的数据类型的零值
    for i := range ch {
        fmt.Println(i)
    }
    

    非阻塞式:

    调用chanrecv函数,并且block=false

    select {
        case <-ch:
        ...
    
      default
    }
    

    向 channel 中接收数据时大概分为两大块,检查和数据发送,而数据接收流程如下:

    • 如果 channel 的写等待队列存在发送者goroutine
      如果是无缓冲 channel,直接从第一个发送者goroutine那里把数据拷贝给接收变量,唤醒发送的 goroutine
      如果是有缓冲 channel(已满),将循环数组buf的队首元素拷贝给接收变量,将第一个发送者goroutine的数据拷贝到 buf循环数组队尾,唤醒发送的 goroutine
    • 如果 channel 的写等待队列不存在发送者goroutine
      如果循环数组buf非空,将循环数组buf的队首元素拷贝给接收变量
      如果循环数组buf为空,这个时候就会走阻塞接收的流程,将当前 goroutine 加入读等待队列,并挂起等待唤醒
  • 关闭

    关闭操作,调用close函数,编译时转换为runtime.closechan函数

    close(ch)
    func closechan(c *hchan) 
    

2. 特点

channel有2种类型:无缓冲、有缓冲, 在创建时make(chan type cap) 通过cap 设定缓冲大小
channel有3种模式:写操作模式(单向通道)、读操作模式(单向通道)、读写操作模式(双向通道)

写操作模式 读操作模式 读写操作模式
创建 make(chan<- int) make(<-chan int) make(chan int)

channel有3种状态:未初始化、正常、关闭

未初始化 关闭 正常
关闭 panic panic 正常关闭
发送 永远阻塞导致死锁 panic 阻塞或者成功发送
接收 永远阻塞导致死锁 缓冲区为空则为零值, 否则可以继续读 阻塞或者成功接收

如下几种状态会引发panic

1.关闭未初始化的channel 和已经关闭的channel

  1. 向已经关闭的channel 中发送数据
3. 线程安全

channel 是线程安全的,channel的底层实现中,hchan结构体中采用Mutex锁来保证数据读写安全。在对循环数组buf中的数据进行入队和出队操作时,必须先获取互斥锁,才能操作channel数据

相关文章

网友评论

      本文标题:golang - channel

      本文链接:https://www.haomeiwen.com/subject/uhntertx.html