美文网首页
Go并发01-管道

Go并发01-管道

作者: 王侦 | 来源:发表于2022-11-07 15:26 被阅读0次

Go语言的并发模型基于CSP(Communicating Sequential Processes)理论。Go的并发哲学强调:

“Do not communicate by sharing memory; instead, share memory by communicating."

goroutine和channel是Go语言CSP并发模型的两大核心概念。


chan T // 声明一个双向通道
chan<- T // 声明一个只能用于发送的通道
<-chan T // 声明一个只能用于接收的通道

ch := make(chan int) //初始化一个无缓冲区的int类型通道
ch := make(chan int, 3) //初始化一个容量为3有缓冲区的int类型通道
func goroutineA(a <-chan int) {
    val := <- a
    fmt.Println("G1 received data: ", val)
    return
}

func goroutineB(b <-chan int) {
    val := <- b
    fmt.Println("G2 received data: ", val)
    return
}

func main() {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    ch <- 3
    time.Sleep(time.Second)
}

1.数据结构

// 管道
type hchan struct {
 qcount   uint           // 环形队列缓存的实际大小,即当前缓存的消息个数。
 dataqsiz uint           // 环形队列缓存的容量,即允许缓存的消息最大个数
 buf      unsafe.Pointer // 指向缓冲区的指针
 elemsize uint16 // 每个元素的大小
 closed   uint32 // 管道状态
 elemtype *_type // 元素类型
 sendx    uint   // 发送的索引
 recvx    uint   // 接收的索引
 recvq    waitq  // 当缓冲区已空且仍有recv请求,recv方将会阻塞,该双向链表存放阻塞sudog。
 sendq    waitq  // 当缓冲区已满且仍有send请求,send方将会阻塞,该双向链表存放阻塞sudog。

 lock mutex //互斥锁
}

// 等待队列
type waitq struct {
 first *sudog
 last  *sudog
}


type sudog struct {
 g *g

 next *sudog
 prev *sudog
 elem unsafe.Pointer // data element (may point to stack)
        ...

 isSelect bool
 
 ...
 c        *hchan // channel
}

channel的结构主要由以下几个部分组成:

  • 环形队列缓存:当channel缓冲区未满且仍有send请求的时候,该队列将用于缓存此时收到的消息。
  • 双向链表:当channel缓冲区已满且仍有send请求或者已空但仍有recv请求的时候,channel将进入阻塞状态。通过该双向链表存放阻塞的Goroutine信息,以sudog的形式包裹G。
  • 锁:go语言的channel基于悲观锁实现。

2.channel的创建

func main() {
    ch1 := make(chan struct{})     // 不带缓冲
    ch2 := make(chan struct{}, 10) // 带缓冲
    fmt.Println(len(ch1), len(ch2))
}

不带缓冲和带缓冲的本质区别其实就在于底层的环形队列缓存。对于带缓冲的情况而言,消息可以被暂时存放进队列缓存中,不会导致发送方的G直接进入阻塞状态。

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe. 
  // 算术溢出判断
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
  // 根据不同情况来分配内存:
  //     1. 不带缓冲:只给hchan分配内存
  //     2. 带缓冲且不包括指针类型:给hchan和环形队列缓存分配一段连续的内存空间
  //     3. 带缓冲且包括指针类型:给hchan和环形队列缓存分别分配内存空间
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

  // 更新以下字段的值
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

runtime.makechan()根据不同情况来分配内存:

  • 1)不带缓冲:只给hchan分配内存
  • 2)带缓冲且不包括指针类型:给hchan和环形队列缓存分配一段连续的内存空间
  • 3)带缓冲且包括指针类型:给hchan和环形队列缓存分别分配内存空间

3.发送和接收

channel的发送和接收必须位于不同的goroutine内,在相同的goroutine内同时给channel发送和接收数据则会引发死锁问题。

3.1 channel的发送和接收主要考虑三种情况

3.1.1 移动缓冲区索引

当channel缓冲区 未满 的时候,底层环形队列缓存将会保存消息,此时发送方和接收方都从队列缓存中直接操作消息。

1)发送-写入channel

当channel新增发送方,hchan首先会加锁,其次将发送方的消息体存放进sendx,接着移动sendx指针,最后释放锁。


2)接收-从channel获取

channel新增接收方,hchan首先会加锁,其次将recvx的消息体取出,接着移动recvx指针,最后释放锁。


3.1.2 阻塞

当channel缓冲区 已满且仍有发送方 或者 已空且仍有接收方 的时候,此时新增的发送方G/接收方G将进入阻塞状态。此时,阻塞的G将被包装进sudog结构体中,并以双向链表的形式保存下来。

1)发送-写入channel

当channel缓冲区已满且仍有发送方,新增的发送方G将进入阻塞状态,以sudog的形式加入sendq中。


2)接收-从channel获取

当channel缓冲区已空且仍有接收方,新增的接收方G将进入阻塞状态,以sudog的形式加入recvq中。


3.1.3 缓冲区复制 && 直接发送

1)发送-写入channel(满了,加一个接收方)

在上述第二种情况的基础上,当channel 存在阻塞发送方且新增了接收方 的时候,channel会将缓冲区的数据返回给接收方,同时将阻塞在双向链表队头的发送方唤醒,并将其消息复制进缓冲区。


2)接收-从channel获取(空了,加一个发送方)

当channel 存在阻塞接收方且新增了发送方 的时候,channel会唤醒阻塞的接收方,然后直接把数据从发送方复制到接收方,无需再经过缓冲区,即直接发送。


3.2 发送到channel

func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

具体看

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ... 省略不相关的部分

  // 直接发送:当存在阻塞的接收方,且新增了发送方,那么直接把发送方的消息复制给接收方
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

  // 缓冲区:当缓冲区未满,那么通过移动环形队列缓存的指针来存储消息
  if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            racenotify(c, c.sendx, nil)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++ // 移动指针
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

  if !block {
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.
  // 阻塞:当缓冲区已满,此时当前的发送方需要进入阻塞状态。
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 将goroutine阻塞
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

  ... 省略不相关的部分
}

3.从channel接收

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

具体看runtime.chanrecv:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ... 省略不相关的部分

  // 缓冲区复制:当存在阻塞的发送方,会从缓冲区读取消息,并且唤醒发送方将消息放入缓冲区。
  // 这里面同时存在一个临界状态,即当缓冲区长度是0的时候,则会将发送方的消息直接复制到接收方
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

  // 缓冲区:当缓冲区未空的时候,那么通过移动环形队列缓存的指针来发送消息
  if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++ // 移动指针
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

  if !block {
        unlock(&c.lock)
        return false, false
    }

    // no sender available: block on this channel.
  // 阻塞:当缓冲区已空,那么需要阻塞当前的接收方
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 将goroutine阻塞

  ... 省略不相关的部分
}

4.channel的关闭

channel的关闭并不复杂。但由于channel本身是悲观锁实现,因此channel在关闭的时候,为了减少悲观锁的占用时间,channel会创建一个glist队列,将当前仍然阻塞的sendq和recvq上面的sudog加入glist,然后快速释放掉锁。紧接着,channel会将glist上面阻塞的goroutine以此唤醒。

channel的关闭遵循两大原则:

  • channel并不一定要关闭。Go语言本身可以通过GC来回收channel的内存空间,因此channel不关闭的情况也经常存在。但当不关闭channel引发goroutine无法正常退出的时候,我们需要考虑尽可能地关闭channel以解决goroutine泄露问题。
  • 不要在接收方关闭channel,而要在发送方关闭channel,并发存在多个发送方的时候除外。

5.无锁channel

通常将锁的类型分为乐观锁悲观锁。而所谓的无锁channel更准确的说是基于乐观锁的实现,也即基于CAS实现。

Go社区曾在2014年提出无锁channel的提案,但是该提案虽然很早就提出,但至今仍然没有被接受。主要有如下原因:

  • 1)Go官方希望channel能够符合FIFO的顺序被唤醒,保证数据公平性,这是未使用无锁channel的最主要原因。
  • 2) 无锁channel并不是无等待算法,是否能够有效提高channel在大规模应用的性能并没有得到验证。一个社区的实现2甚至比基于futex的channel还要慢。
  • 3)无锁channel的可维护性非常差。

参考:

相关文章

  • Go并发01-管道

    Go语言的并发模型基于CSP(Communicating Sequential Processes)理论。Go的并...

  • 15 Go并发编程(二):管道 —— Go并发的通信机制

    Go管道 1.什么是管道? 管道最早由CSP模型提出,以点对点管道代替内存共享实现并发进程间的数据交互,相比内存共...

  • Go 并发实战--管道浅析

    在讲 channel 之前,有必要先提一下 CSP 模型,传统的并发模型主要分为 Actor模型和CSP模型,CS...

  • Go并发模式:管道和终止

    本文翻译自Sameer Ajmani的文章《Go Concurrency Patterns: Pipelines ...

  • Go管道初识

    Go管道初识 Go管道基础知识 管道分类 无缓冲(unbuffered channel)无缓冲的通道是指在接收前没...

  • Go语言并发

    Go语言并发 Go语言级别支持协程,叫做goroutine Go 语言从语言层面支持并发和并行的开发操作 Go并发...

  • Go基础语法(九)

    Go语言并发 Go 是并发式语言,而不是并行式语言。 并发是指立即处理多个任务的能力。 Go 编程语言原生支持并发...

  • Go-高并发之管道模型

    即多个函数同时从同一个channel里读取数据。直至channel被关闭可以更好的利用多核这里演示案例是从mysq...

  • Go并发模型:并发协程chan的优雅退出

    Go并发模型:并发协程chan的优雅退出 go chan的使用

  • Go并发

    并发和并行 Go是并发语言,而不是并行语言。(Go is a concurrent language and no...

网友评论

      本文标题:Go并发01-管道

      本文链接:https://www.haomeiwen.com/subject/udnmtdtx.html