美文网首页
Go并发01-管道

Go并发01-管道

作者: 王侦 | 来源:发表于2022-11-07 15:26 被阅读0次

    Go语言的并发模型基于CSP(Communicating Sequential Processes)理论。Go的并发哲学强调:

    “Do not communicate by sharing memory; instead, share memory by communicating."
    

    goroutine和channel是Go语言CSP并发模型的两大核心概念。


    chan T // 声明一个双向通道
    chan<- T // 声明一个只能用于发送的通道
    <-chan T // 声明一个只能用于接收的通道
    
    ch := make(chan int) //初始化一个无缓冲区的int类型通道
    ch := make(chan int, 3) //初始化一个容量为3有缓冲区的int类型通道
    
    func goroutineA(a <-chan int) {
        val := <- a
        fmt.Println("G1 received data: ", val)
        return
    }
    
    func goroutineB(b <-chan int) {
        val := <- b
        fmt.Println("G2 received data: ", val)
        return
    }
    
    func main() {
        ch := make(chan int)
        go goroutineA(ch)
        go goroutineB(ch)
        ch <- 3
        time.Sleep(time.Second)
    }
    

    1.数据结构

    // 管道
    type hchan struct {
     qcount   uint           // 环形队列缓存的实际大小,即当前缓存的消息个数。
     dataqsiz uint           // 环形队列缓存的容量,即允许缓存的消息最大个数
     buf      unsafe.Pointer // 指向缓冲区的指针
     elemsize uint16 // 每个元素的大小
     closed   uint32 // 管道状态
     elemtype *_type // 元素类型
     sendx    uint   // 发送的索引
     recvx    uint   // 接收的索引
     recvq    waitq  // 当缓冲区已空且仍有recv请求,recv方将会阻塞,该双向链表存放阻塞sudog。
     sendq    waitq  // 当缓冲区已满且仍有send请求,send方将会阻塞,该双向链表存放阻塞sudog。
    
     lock mutex //互斥锁
    }
    
    // 等待队列
    type waitq struct {
     first *sudog
     last  *sudog
    }
    
    
    type sudog struct {
     g *g
    
     next *sudog
     prev *sudog
     elem unsafe.Pointer // data element (may point to stack)
            ...
    
     isSelect bool
     
     ...
     c        *hchan // channel
    }
    

    channel的结构主要由以下几个部分组成:

    • 环形队列缓存:当channel缓冲区未满且仍有send请求的时候,该队列将用于缓存此时收到的消息。
    • 双向链表:当channel缓冲区已满且仍有send请求或者已空但仍有recv请求的时候,channel将进入阻塞状态。通过该双向链表存放阻塞的Goroutine信息,以sudog的形式包裹G。
    • 锁:go语言的channel基于悲观锁实现。

    2.channel的创建

    func main() {
        ch1 := make(chan struct{})     // 不带缓冲
        ch2 := make(chan struct{}, 10) // 带缓冲
        fmt.Println(len(ch1), len(ch2))
    }
    

    不带缓冲和带缓冲的本质区别其实就在于底层的环形队列缓存。对于带缓冲的情况而言,消息可以被暂时存放进队列缓存中,不会导致发送方的G直接进入阻塞状态。

    func makechan(t *chantype, size int) *hchan {
        elem := t.elem
    
        // compiler checks this but be safe. 
      // 算术溢出判断
        if elem.size >= 1<<16 {
            throw("makechan: invalid channel element type")
        }
        if hchanSize%maxAlign != 0 || elem.align > maxAlign {
            throw("makechan: bad alignment")
        }
    
        mem, overflow := math.MulUintptr(elem.size, uintptr(size))
        if overflow || mem > maxAlloc-hchanSize || size < 0 {
            panic(plainError("makechan: size out of range"))
        }
    
        // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
        // buf points into the same allocation, elemtype is persistent.
        // SudoG's are referenced from their owning thread so they can't be collected.
        // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
      // 根据不同情况来分配内存:
      //     1. 不带缓冲:只给hchan分配内存
      //     2. 带缓冲且不包括指针类型:给hchan和环形队列缓存分配一段连续的内存空间
      //     3. 带缓冲且包括指针类型:给hchan和环形队列缓存分别分配内存空间
        var c *hchan
        switch {
        case mem == 0:
            // Queue or element size is zero.
            c = (*hchan)(mallocgc(hchanSize, nil, true))
            // Race detector uses this location for synchronization.
            c.buf = c.raceaddr()
        case elem.ptrdata == 0:
            // Elements do not contain pointers.
            // Allocate hchan and buf in one call.
            c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
            c.buf = add(unsafe.Pointer(c), hchanSize)
        default:
            // Elements contain pointers.
            c = new(hchan)
            c.buf = mallocgc(mem, elem, true)
        }
    
      // 更新以下字段的值
        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        c.dataqsiz = uint(size)
        lockInit(&c.lock, lockRankHchan)
    
        if debugChan {
            print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
        }
        return c
    }
    

    runtime.makechan()根据不同情况来分配内存:

    • 1)不带缓冲:只给hchan分配内存
    • 2)带缓冲且不包括指针类型:给hchan和环形队列缓存分配一段连续的内存空间
    • 3)带缓冲且包括指针类型:给hchan和环形队列缓存分别分配内存空间

    3.发送和接收

    channel的发送和接收必须位于不同的goroutine内,在相同的goroutine内同时给channel发送和接收数据则会引发死锁问题。

    3.1 channel的发送和接收主要考虑三种情况

    3.1.1 移动缓冲区索引

    当channel缓冲区 未满 的时候,底层环形队列缓存将会保存消息,此时发送方和接收方都从队列缓存中直接操作消息。

    1)发送-写入channel

    当channel新增发送方,hchan首先会加锁,其次将发送方的消息体存放进sendx,接着移动sendx指针,最后释放锁。


    2)接收-从channel获取

    channel新增接收方,hchan首先会加锁,其次将recvx的消息体取出,接着移动recvx指针,最后释放锁。


    3.1.2 阻塞

    当channel缓冲区 已满且仍有发送方 或者 已空且仍有接收方 的时候,此时新增的发送方G/接收方G将进入阻塞状态。此时,阻塞的G将被包装进sudog结构体中,并以双向链表的形式保存下来。

    1)发送-写入channel

    当channel缓冲区已满且仍有发送方,新增的发送方G将进入阻塞状态,以sudog的形式加入sendq中。


    2)接收-从channel获取

    当channel缓冲区已空且仍有接收方,新增的接收方G将进入阻塞状态,以sudog的形式加入recvq中。


    3.1.3 缓冲区复制 && 直接发送

    1)发送-写入channel(满了,加一个接收方)

    在上述第二种情况的基础上,当channel 存在阻塞发送方且新增了接收方 的时候,channel会将缓冲区的数据返回给接收方,同时将阻塞在双向链表队头的发送方唤醒,并将其消息复制进缓冲区。


    2)接收-从channel获取(空了,加一个发送方)

    当channel 存在阻塞接收方且新增了发送方 的时候,channel会唤醒阻塞的接收方,然后直接把数据从发送方复制到接收方,无需再经过缓冲区,即直接发送。


    3.2 发送到channel

    func chansend1(c *hchan, elem unsafe.Pointer) {
        chansend(c, elem, true, getcallerpc())
    }
    

    具体看

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ... 省略不相关的部分
    
      // 直接发送:当存在阻塞的接收方,且新增了发送方,那么直接把发送方的消息复制给接收方
        if sg := c.recvq.dequeue(); sg != nil {
            // Found a waiting receiver. We pass the value we want to send
            // directly to the receiver, bypassing the channel buffer (if any).
            send(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true
        }
    
      // 缓冲区:当缓冲区未满,那么通过移动环形队列缓存的指针来存储消息
      if c.qcount < c.dataqsiz {
            // Space is available in the channel buffer. Enqueue the element to send.
            qp := chanbuf(c, c.sendx)
            if raceenabled {
                racenotify(c, c.sendx, nil)
            }
            typedmemmove(c.elemtype, qp, ep)
            c.sendx++ // 移动指针
            if c.sendx == c.dataqsiz {
                c.sendx = 0
            }
            c.qcount++
            unlock(&c.lock)
            return true
        }
    
      if !block {
            unlock(&c.lock)
            return false
        }
    
        // Block on the channel. Some receiver will complete our operation for us.
      // 阻塞:当缓冲区已满,此时当前的发送方需要进入阻塞状态。
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
        // No stack splits between assigning elem and enqueuing mysg
        // on gp.waiting where copystack can find it.
        mysg.elem = ep
        mysg.waitlink = nil
        mysg.g = gp
        mysg.isSelect = false
        mysg.c = c
        gp.waiting = mysg
        gp.param = nil
        c.sendq.enqueue(mysg)
        // Signal to anyone trying to shrink our stack that we're about
        // to park on a channel. The window between when this G's status
        // changes and when we set gp.activeStackChans is not safe for
        // stack shrinking.
        atomic.Store8(&gp.parkingOnChan, 1)
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 将goroutine阻塞
        // Ensure the value being sent is kept alive until the
        // receiver copies it out. The sudog has a pointer to the
        // stack object, but sudogs aren't considered as roots of the
        // stack tracer.
        KeepAlive(ep)
    
      ... 省略不相关的部分
    }
    

    3.从channel接收

    func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
    }
    
    //go:nosplit
    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
    }
    

    具体看runtime.chanrecv:

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      ... 省略不相关的部分
    
      // 缓冲区复制:当存在阻塞的发送方,会从缓冲区读取消息,并且唤醒发送方将消息放入缓冲区。
      // 这里面同时存在一个临界状态,即当缓冲区长度是0的时候,则会将发送方的消息直接复制到接收方
        if sg := c.sendq.dequeue(); sg != nil {
            // Found a waiting sender. If buffer is size 0, receive value
            // directly from sender. Otherwise, receive from head of queue
            // and add sender's value to the tail of the queue (both map to
            // the same buffer slot because the queue is full).
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    
      // 缓冲区:当缓冲区未空的时候,那么通过移动环形队列缓存的指针来发送消息
      if c.qcount > 0 {
            // Receive directly from queue
            qp := chanbuf(c, c.recvx)
            if raceenabled {
                racenotify(c, c.recvx, nil)
            }
            if ep != nil {
                typedmemmove(c.elemtype, ep, qp)
            }
            typedmemclr(c.elemtype, qp)
            c.recvx++ // 移动指针
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.qcount--
            unlock(&c.lock)
            return true, true
        }
    
      if !block {
            unlock(&c.lock)
            return false, false
        }
    
        // no sender available: block on this channel.
      // 阻塞:当缓冲区已空,那么需要阻塞当前的接收方
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
        // No stack splits between assigning elem and enqueuing mysg
        // on gp.waiting where copystack can find it.
        mysg.elem = ep
        mysg.waitlink = nil
        gp.waiting = mysg
        mysg.g = gp
        mysg.isSelect = false
        mysg.c = c
        gp.param = nil
        c.recvq.enqueue(mysg)
        // Signal to anyone trying to shrink our stack that we're about
        // to park on a channel. The window between when this G's status
        // changes and when we set gp.activeStackChans is not safe for
        // stack shrinking.
        atomic.Store8(&gp.parkingOnChan, 1)
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 将goroutine阻塞
    
      ... 省略不相关的部分
    }
    

    4.channel的关闭

    channel的关闭并不复杂。但由于channel本身是悲观锁实现,因此channel在关闭的时候,为了减少悲观锁的占用时间,channel会创建一个glist队列,将当前仍然阻塞的sendq和recvq上面的sudog加入glist,然后快速释放掉锁。紧接着,channel会将glist上面阻塞的goroutine以此唤醒。

    channel的关闭遵循两大原则:

    • channel并不一定要关闭。Go语言本身可以通过GC来回收channel的内存空间,因此channel不关闭的情况也经常存在。但当不关闭channel引发goroutine无法正常退出的时候,我们需要考虑尽可能地关闭channel以解决goroutine泄露问题。
    • 不要在接收方关闭channel,而要在发送方关闭channel,并发存在多个发送方的时候除外。

    5.无锁channel

    通常将锁的类型分为乐观锁悲观锁。而所谓的无锁channel更准确的说是基于乐观锁的实现,也即基于CAS实现。

    Go社区曾在2014年提出无锁channel的提案,但是该提案虽然很早就提出,但至今仍然没有被接受。主要有如下原因:

    • 1)Go官方希望channel能够符合FIFO的顺序被唤醒,保证数据公平性,这是未使用无锁channel的最主要原因。
    • 2) 无锁channel并不是无等待算法,是否能够有效提高channel在大规模应用的性能并没有得到验证。一个社区的实现2甚至比基于futex的channel还要慢。
    • 3)无锁channel的可维护性非常差。

    参考:

    相关文章

      网友评论

          本文标题:Go并发01-管道

          本文链接:https://www.haomeiwen.com/subject/udnmtdtx.html