1. 原理
hchan
通过var声明或者make函数创建的channel变量是一个存储在函数栈帧上的指针,占用8个字节,指向堆上的hchan结构体
源码包中src/runtime/chan.go定义了hchan的数据结构如下:
data:image/s3,"s3://crabby-images/56ff8/56ff80848094afde613d36db43a843ed9633d41b" alt=""
type hchan struct {
qcount uint // total data in the queue 循环数组中的元素数量
dataqsiz uint // size of the circular queue 循环数组的长度
//channel分为无缓冲和有缓冲两种。
// 对于有缓冲的channel存储数据,使用了 ring buffer(环形缓冲区) 来缓存写入的数据,本质是循环数组
// 为啥是循环数组?普通数组不行吗,普通数组容量固定更适合指定的空间,弹出元素时,普通数组需要全部都前移
// 当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
buf unsafe.Pointer // points to an array of dataqsiz elements 指向底层循环数组的指针(环形缓冲区)
elemsize uint16 //元素的大小
closed uint32 //channel是否关闭的标志
elemtype *_type // element type channel中的元素类型
sendx uint // send index // 下一次写下标的位置
recvx uint // receive index // 下一次读下标的位置
recvq waitq // list of recv waiters // 读等待队列
sendq waitq // list of send waiters // 写等待队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //互斥锁,保证读写channel时不存在并发竞争问题
}
hchan结构体的主要组成部分有四个:
用来保存goroutine之间传递数据的循环数组:buf
用来记录此循环数组当前发送或接收数据的下标值:sendx和recvx
用于保存向该chan发送和从该chan接收数据被阻塞的goroutine队列: sendq 和 recvq
保证channel写入和读取数据时线程安全的锁:lock
环形数组
环形数组作为channel 的缓冲区 数组的长度就是定义channnel 时channel 的缓冲大小
data:image/s3,"s3://crabby-images/db4de/db4de04384a59a0ac2a6d83375f5651cab068bef" alt=""
等待队列 waitq
在hchan 中包括了读/写 等待队列, waitq是一个双向队列,包括了一个头结点和尾节点。 每个节点是一个sudog结构体变量
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
操作
-
创建
使用 make(chan T, cap) 来创建 channel,channel 可分为带缓冲和不带缓冲的, cap 就是缓冲区的大小// 带缓冲,缓冲大小为3 ch := make(chan int, 3) // 不带缓冲 ch := make(chan int)
make 语法会在编译时,转换为 makechan64 和 makechan
// 源码 func makechan64(t *chantype, size int64) *hchan { if int64(int(size)) != size { panic(plainError("makechan: size out of range")) } return makechan(t, int(size)) } func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }
创建时的会做一些检查:
- 元素大小不能超过 64K
- 元素的对齐大小不能超过 maxAlign 也就是 8 字节
- 计算出来的内存是否超过限制
创建时的策略:
- 如果是无缓冲的 channel,会直接给 hchan 分配内存
- 如果是有缓冲的 channel,并且元素不包含指针,那么会为 hchan 和底层数组分配一段连续的地址
- 如果是有缓冲的 channel,并且元素包含指针,那么会为 hchan 和底层数组分别分配地址
-
发送
发送操作,编译时转换为runtime.chansend函数func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
通过block 参数,确认是否是阻塞式发送
阻塞式:调用chansend 函数时 block=truech<- 1
非阻塞式:调用chansend 函数时 block=false
select { case ch <- 10: ... default }
向 channel 中发送数据时大概分为两大块:检查和数据发送,数据发送流程如下:
- 如果 channel 的读等待队列存在接收者goroutine
将数据直接发送给第一个等待的 goroutine, 唤醒接收的 goroutine - 如果 channel 的读等待队列不存在接收者goroutine
a. 如果循环数组buf未满,那么将会把数据发送到循环数组buf的队尾
b. 如果循环数组buf已满,这个时候就会走阻塞发送的流程,将当前 goroutine 加入写等待队列,并挂起等待唤醒
- 如果 channel 的读等待队列存在接收者goroutine
-
接收
接收操作,编译时转换为runtime.chanrecv函数func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
阻塞式:
调用chanrecv函数,并且block=true
<ch v := <ch v, ok := <ch // 当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的 channel,造成读到数据为通道所存储的数据类型的零值 for i := range ch { fmt.Println(i) }
非阻塞式:
调用chanrecv函数,并且block=false
select { case <-ch: ... default }
向 channel 中接收数据时大概分为两大块,检查和数据发送,而数据接收流程如下:
- 如果 channel 的写等待队列存在发送者goroutine
如果是无缓冲 channel,直接从第一个发送者goroutine那里把数据拷贝给接收变量,唤醒发送的 goroutine
如果是有缓冲 channel(已满),将循环数组buf的队首元素拷贝给接收变量,将第一个发送者goroutine的数据拷贝到 buf循环数组队尾,唤醒发送的 goroutine - 如果 channel 的写等待队列不存在发送者goroutine
如果循环数组buf非空,将循环数组buf的队首元素拷贝给接收变量
如果循环数组buf为空,这个时候就会走阻塞接收的流程,将当前 goroutine 加入读等待队列,并挂起等待唤醒
- 如果 channel 的写等待队列存在发送者goroutine
-
关闭
关闭操作,调用close函数,编译时转换为runtime.closechan函数
close(ch) func closechan(c *hchan)
2. 特点
channel有2种类型:无缓冲、有缓冲, 在创建时make(chan type cap)
通过cap 设定缓冲大小
channel有3种模式:写操作模式(单向通道)、读操作模式(单向通道)、读写操作模式(双向通道)
写操作模式 | 读操作模式 | 读写操作模式 | |
---|---|---|---|
创建 | make(chan<- int) | make(<-chan int) | make(chan int) |
channel有3种状态:未初始化、正常、关闭
未初始化 | 关闭 | 正常 | |
---|---|---|---|
关闭 | panic | panic | 正常关闭 |
发送 | 永远阻塞导致死锁 | panic | 阻塞或者成功发送 |
接收 | 永远阻塞导致死锁 | 缓冲区为空则为零值, 否则可以继续读 | 阻塞或者成功接收 |
如下几种状态会引发panic
1.关闭未初始化的channel 和已经关闭的channel
- 向已经关闭的channel 中发送数据
3. 线程安全
channel 是线程安全的,channel的底层实现中,hchan结构体中采用Mutex锁来保证数据读写安全。在对循环数组buf中的数据进行入队和出队操作时,必须先获取互斥锁,才能操作channel数据
网友评论