Go语言的并发模型基于CSP(Communicating Sequential Processes)理论。Go的并发哲学强调:
“Do not communicate by sharing memory; instead, share memory by communicating."
goroutine和channel是Go语言CSP并发模型的两大核心概念。
chan T // 声明一个双向通道
chan<- T // 声明一个只能用于发送的通道
<-chan T // 声明一个只能用于接收的通道
ch := make(chan int) //初始化一个无缓冲区的int类型通道
ch := make(chan int, 3) //初始化一个容量为3有缓冲区的int类型通道
func goroutineA(a <-chan int) {
val := <- a
fmt.Println("G1 received data: ", val)
return
}
func goroutineB(b <-chan int) {
val := <- b
fmt.Println("G2 received data: ", val)
return
}
func main() {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
ch <- 3
time.Sleep(time.Second)
}
1.数据结构
// 管道
type hchan struct {
qcount uint // 环形队列缓存的实际大小,即当前缓存的消息个数。
dataqsiz uint // 环形队列缓存的容量,即允许缓存的消息最大个数
buf unsafe.Pointer // 指向缓冲区的指针
elemsize uint16 // 每个元素的大小
closed uint32 // 管道状态
elemtype *_type // 元素类型
sendx uint // 发送的索引
recvx uint // 接收的索引
recvq waitq // 当缓冲区已空且仍有recv请求,recv方将会阻塞,该双向链表存放阻塞sudog。
sendq waitq // 当缓冲区已满且仍有send请求,send方将会阻塞,该双向链表存放阻塞sudog。
lock mutex //互斥锁
}
// 等待队列
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
...
isSelect bool
...
c *hchan // channel
}
channel的结构主要由以下几个部分组成:
- 环形队列缓存:当channel缓冲区未满且仍有send请求的时候,该队列将用于缓存此时收到的消息。
- 双向链表:当channel缓冲区已满且仍有send请求或者已空但仍有recv请求的时候,channel将进入阻塞状态。通过该双向链表存放阻塞的Goroutine信息,以sudog的形式包裹G。
- 锁:go语言的channel基于悲观锁实现。
2.channel的创建
func main() {
ch1 := make(chan struct{}) // 不带缓冲
ch2 := make(chan struct{}, 10) // 带缓冲
fmt.Println(len(ch1), len(ch2))
}
不带缓冲和带缓冲的本质区别其实就在于底层的环形队列缓存。对于带缓冲的情况而言,消息可以被暂时存放进队列缓存中,不会导致发送方的G直接进入阻塞状态。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
// 算术溢出判断
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
// 根据不同情况来分配内存:
// 1. 不带缓冲:只给hchan分配内存
// 2. 带缓冲且不包括指针类型:给hchan和环形队列缓存分配一段连续的内存空间
// 3. 带缓冲且包括指针类型:给hchan和环形队列缓存分别分配内存空间
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 更新以下字段的值
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
runtime.makechan()根据不同情况来分配内存:
- 1)不带缓冲:只给hchan分配内存
- 2)带缓冲且不包括指针类型:给hchan和环形队列缓存分配一段连续的内存空间
- 3)带缓冲且包括指针类型:给hchan和环形队列缓存分别分配内存空间
3.发送和接收
channel的发送和接收必须位于不同的goroutine内,在相同的goroutine内同时给channel发送和接收数据则会引发死锁问题。
3.1 channel的发送和接收主要考虑三种情况
3.1.1 移动缓冲区索引
当channel缓冲区 未满 的时候,底层环形队列缓存将会保存消息,此时发送方和接收方都从队列缓存中直接操作消息。
1)发送-写入channel
当channel新增发送方,hchan首先会加锁,其次将发送方的消息体存放进sendx,接着移动sendx指针,最后释放锁。
2)接收-从channel获取
channel新增接收方,hchan首先会加锁,其次将recvx的消息体取出,接着移动recvx指针,最后释放锁。
3.1.2 阻塞
当channel缓冲区 已满且仍有发送方 或者 已空且仍有接收方 的时候,此时新增的发送方G/接收方G将进入阻塞状态。此时,阻塞的G将被包装进sudog结构体中,并以双向链表的形式保存下来。
1)发送-写入channel
当channel缓冲区已满且仍有发送方,新增的发送方G将进入阻塞状态,以sudog的形式加入sendq中。
2)接收-从channel获取
当channel缓冲区已空且仍有接收方,新增的接收方G将进入阻塞状态,以sudog的形式加入recvq中。
3.1.3 缓冲区复制 && 直接发送
1)发送-写入channel(满了,加一个接收方)
在上述第二种情况的基础上,当channel 存在阻塞发送方且新增了接收方 的时候,channel会将缓冲区的数据返回给接收方,同时将阻塞在双向链表队头的发送方唤醒,并将其消息复制进缓冲区。
2)接收-从channel获取(空了,加一个发送方)
当channel 存在阻塞接收方且新增了发送方 的时候,channel会唤醒阻塞的接收方,然后直接把数据从发送方复制到接收方,无需再经过缓冲区,即直接发送。
3.2 发送到channel
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
具体看
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
... 省略不相关的部分
// 直接发送:当存在阻塞的接收方,且新增了发送方,那么直接把发送方的消息复制给接收方
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 缓冲区:当缓冲区未满,那么通过移动环形队列缓存的指针来存储消息
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++ // 移动指针
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
// 阻塞:当缓冲区已满,此时当前的发送方需要进入阻塞状态。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 将goroutine阻塞
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
... 省略不相关的部分
}
3.从channel接收
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
具体看runtime.chanrecv:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
... 省略不相关的部分
// 缓冲区复制:当存在阻塞的发送方,会从缓冲区读取消息,并且唤醒发送方将消息放入缓冲区。
// 这里面同时存在一个临界状态,即当缓冲区长度是0的时候,则会将发送方的消息直接复制到接收方
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 缓冲区:当缓冲区未空的时候,那么通过移动环形队列缓存的指针来发送消息
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++ // 移动指针
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
// 阻塞:当缓冲区已空,那么需要阻塞当前的接收方
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 将goroutine阻塞
... 省略不相关的部分
}
4.channel的关闭
channel的关闭并不复杂。但由于channel本身是悲观锁实现,因此channel在关闭的时候,为了减少悲观锁的占用时间,channel会创建一个glist队列,将当前仍然阻塞的sendq和recvq上面的sudog加入glist,然后快速释放掉锁。紧接着,channel会将glist上面阻塞的goroutine以此唤醒。
channel的关闭遵循两大原则:
- channel并不一定要关闭。Go语言本身可以通过GC来回收channel的内存空间,因此channel不关闭的情况也经常存在。但当不关闭channel引发goroutine无法正常退出的时候,我们需要考虑尽可能地关闭channel以解决goroutine泄露问题。
- 不要在接收方关闭channel,而要在发送方关闭channel,并发存在多个发送方的时候除外。
5.无锁channel
通常将锁的类型分为乐观锁和悲观锁。而所谓的无锁channel更准确的说是基于乐观锁的实现,也即基于CAS实现。
Go社区曾在2014年提出无锁channel的提案,但是该提案虽然很早就提出,但至今仍然没有被接受。主要有如下原因:
- 1)Go官方希望channel能够符合FIFO的顺序被唤醒,保证数据公平性,这是未使用无锁channel的最主要原因。
- 2) 无锁channel并不是无等待算法,是否能够有效提高channel在大规模应用的性能并没有得到验证。一个社区的实现2甚至比基于futex的channel还要慢。
- 3)无锁channel的可维护性非常差。
参考:
网友评论