美文网首页Golang
golang并发三板斧系列之二:goroutine池用于并发

golang并发三板斧系列之二:goroutine池用于并发

作者: 白想519 | 来源:发表于2019-05-20 09:45 被阅读6次

    这是本系列文章的第二篇,第一篇在此golang并发三板斧系列之一:channel用于通信和同步

    前文描述了手工作坊的时代,即老师傅带着小学徒并发地做一项工作,现在我们准备进入工业时代。

    Pipeline模型

    Pipeline即流水线模型,这在现代工业是很常见的。模型分为数个阶段,每个阶段干不同的事情,但可以并行地去做。以造拖拉机为例来解释流水线的工作方式,假设装配一辆汽车需要四个步骤:

    • 第一步冲压:制作车身外壳和底盘等部件。
    • 第二步焊接:将冲压成形后的各部件焊接成车身。
    • 第三步涂装:将车身等主要部件清洗、化学处理、打磨、喷漆和烘干。
    • 第四步总装:将各部件(包括发动机和向外采购的零部件)组装成车。
    image

    比如要造一百辆拖拉机,如果每个阶段都等前一阶段的一百辆完成才开工,是对生产线的极大浪费。现代工业的流水线做法是每个阶段同时开工,在时间上并行起来。流水线的概念在计算机世界中也很普遍,拥有流水线的CPU可以在一个时钟周期内完成一条指令,而不是等待取指令、译码、取操作数、执行四个阶段才能完成一条指令:

    image

    golang的设计者们吸收了这一经典概念,使用channel把前后数个阶段串联起来,形成一个流水线:

    func gen(nums ...int) <-chan int {
        out := make(chan int)
        go func() {
            for _, n := range nums {
                out <- n
            }
            close(out)
        }()
        return out
    }
    
    func sq(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            for n := range in {
                out <- n * n
            }
            close(out)
        }()
        return out
    }
    
    func TestPipeline(t *testing.T) {
        c := gen(2, 3, 4)
        out := sq(c)
    
        for n := range out {
            log.Print(n)
        }
    }
    

    由于sq函数的入参和出参一样,故可以增加无数个阶段写为:

    func TestPipelines(t *testing.T) {
        for n := range sq(sq(sq(gen(1, 2, 3, 4)))) {
            log.Print(n)
        }
    }
    

    FAN模型

    可以说FAN模型是流水线的一种改进。可以观察到上述的流水线模型,每个阶段只起了一个goroutine,但是现实造拖拉机的时候,在组装轮子的时候可以4个工人一起上,这又是提高了并发。golang吸取了这种模型,在任务分发阶段,多个goroutine从同一个channel读取数据,直到关闭,称为FAN-OUT模型;在结果收集阶段,单个goroutine从多个channel读取数据,直到关闭,称为FAN-IN模型:

    [图片上传失败...(image-d2aaa-1558316671133)]

    func merge(cs ...<-chan int) <-chan int {
        var wg sync.WaitGroup
        out := make(chan int)
    
        output := func(c <-chan int) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }
        wg.Add(len(cs))
        for _, c := range cs {
            go output(c)
        }
    
        go func() {
            wg.Wait()
            close(out)
        }()
        return out
    }
    
    func TestFan(t *testing.T) {
        c := gen(2, 3, 4)
        out1 := sq(c)
        out2 := sq(c)
    
        for n := range merge(out1, out2) {
            log.Print(n)
        }
    }
    

    并行起来之后,最终结果的顺序是不可控的:

    2019/02/22 21:39:27 9
    2019/02/22 21:39:27 16
    2019/02/22 21:39:27 4
    
    Process finished with exit code 0
    

    以上模型能提升并行效率吗

    CPU密集型

    用浮点数的幂计算模拟CPU-BOUND,设计了如下模型用于比较:

    //功能函数
    func benchmarkList() []float64 {
        list := make([]float64, MAX)
        for n := 0; n < MAX; n++ {
            list[n] = float64(n)
        }
        return list
    }
    
    func cpubound(n float64) float64 {
        return math.Pow(3.1415926, n)
    }
    
    func genChan(nums []float64) <-chan float64 {
        out := make(chan float64)
        go func() {
            for _, n := range nums {
                out <- n
            }
            close(out)
        }()
        return out
    }
    
    func genChanBuffer(nums []float64) <-chan float64 {
        out := make(chan float64, BUFFERSIZE)
        go func() {
            for _, n := range nums {
                out <- n
            }
            close(out)
        }()
        return out
    }
    
    func cpuChan(in <-chan float64) <-chan float64 {
        out := make(chan float64)
        go func() {
            for n := range in {
                out <- cpubound(n)
            }
            close(out)
        }()
        return out
    }
    
    func cpuChanBuffer(in <-chan float64) <-chan float64 {
        out := make(chan float64, BUFFERSIZE)
        go func() {
            for n := range in {
                out <- cpubound(n)
            }
            close(out)
        }()
        return out
    }
    
    func mergeChan(cs ...<-chan float64) <-chan float64 {
        var wg sync.WaitGroup
        out := make(chan float64)
    
        output := func(c <-chan float64) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }
        wg.Add(len(cs))
        for _, c := range cs {
            go output(c)
        }
    
        go func() {
            wg.Wait()
            close(out)
        }()
        return out
    }
    
    func mergeChanBuffer(cs ...<-chan float64) <-chan float64 {
        var wg sync.WaitGroup
        out := make(chan float64, BUFFERSIZE)
    
        output := func(c <-chan float64) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }
        wg.Add(len(cs))
        for _, c := range cs {
            go output(c)
        }
    
        go func() {
            wg.Wait()
            close(out)
        }()
        return out
    }
    
    //测试函数
    func BenchmarkCPUSequential(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
    
            var sum float64
            for _, n := range list {
                sum += cpubound(n)
            }
        }
    }
    
    func BenchmarkCPUPipeline(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
    
            c := genChan(list)
            out := cpuChan(c)
    
            var sum float64
            for n := range out {
                sum += n
            }
        }
    }
    
    func BenchmarkCPUPipelineBuffer(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
    
            c := genChanBuffer(list)
            out := cpuChanBuffer(c)
    
            var sum float64
            for n := range out {
                sum += n
            }
        }
    }
    
    func BenchmarkCPUFan(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
    
            c := genChan(list)
            gonum := runtime.NumCPU() / 2
            outs := make([]<-chan float64, gonum)
            for i := 0; i < gonum; i++ {
                outs[i] = cpuChan(c)
            }
    
            var sum float64
            for n := range mergeChan(outs...) {
                sum += n
            }
        }
    }
    
    func BenchmarkCPUFanBuffer(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
    
            c := genChanBuffer(list)
            gonum := runtime.NumCPU() / 2
            outs := make([]<-chan float64, gonum)
            for i := 0; i < gonum; i++ {
                outs[i] = cpuChanBuffer(c)
            }
    
            var sum float64
            for n := range mergeChanBuffer(outs...) {
                sum += n
            }
        }
    }
    
    func BenchmarkCPUParallelize(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
            gonum := runtime.NumCPU()
    
            var sum float64
            num := len(list)
            stride := num / gonum
    
            var wg sync.WaitGroup
            wg.Add(gonum)
            var mux sync.Mutex
    
            for g := 0; g < gonum; g++ {
                go func(g int) {
                    start := g * stride
                    end := start + stride
                    if g == gonum-1 {
                        end = num
                    }
    
                    var sumin float64
                    for _, n := range list[start:end] {
                        sumin += cpubound(n)
                    }
    
                    mux.Lock()
                    sum += sumin
                    mux.Unlock()
    
                    wg.Done()
                }(g)
            }
    
            wg.Wait()
        }
    }
    

    设MAX=1000000,BUFFERSIZE=1000。结果让人大跌眼镜,无论是Pipeline模型还是Fan模型,都比不上普通的串行,只有普通的并发模型能有效提升:

    [baixiao@localhost go_concurrency]$ GOGC=off go test -cpu 1,8 -run none -bench CPU -benchtime 3s 
    goos: linux
    goarch: amd64
    BenchmarkCPUSequential                50          95148037 ns/op
    BenchmarkCPUSequential-8              30         101450384 ns/op
    BenchmarkCPUPipeline                  10         512093124 ns/op
    BenchmarkCPUPipeline-8                 5         864946495 ns/op
    BenchmarkCPUPipelineBuffer            20         219850707 ns/op
    BenchmarkCPUPipelineBuffer-8          10         370302165 ns/op
    BenchmarkCPUFan                        5         715223945 ns/op
    BenchmarkCPUFan-8                      5         913448396 ns/op
    BenchmarkCPUFanBuffer                 10         320494600 ns/op
    BenchmarkCPUFanBuffer-8               10         427863250 ns/op
    BenchmarkCPUParallelize              100          95482003 ns/op
    BenchmarkCPUParallelize-8            200          19398520 ns/op
    PASS
    ok      _/home/baixiao/go_concurrency   77.085s
    

    可以得出以下结论:

    • Sequential完爆Pipeline和Fan
    • Pipeline和Fan在多核下均弱于单核,因为系统瓶颈根本不在并行上,而是channel造成的阻塞
    • 给channel加了buffer之后,PipelineBuffer优于Pipeline、FanBuffer优于Fan,因为channel的阻塞减弱了
    • 多核下的Parallelize:在座的各位都是垃圾

    IO密集型

    用随机sleep模拟IO-BOUND,设计了如下模型用于比较:

    //功能函数
    func iobound(n float64) float64 {
        time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
        return n
    }
    
    func ioChan(in <-chan float64) <-chan float64 {
        out := make(chan float64)
        go func() {
            for n := range in {
                out <- iobound(n)
            }
            close(out)
        }()
        return out
    }
    
    func ioChanBuffer(in <-chan float64) <-chan float64 {
        out := make(chan float64, BUFFERSIZE)
        go func() {
            for n := range in {
                out <- iobound(n)
            }
            close(out)
        }()
        return out
    }
    
    //测试函数
    func BenchmarkIOSequential(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
    
            var sum float64
            for _, n := range list {
                sum += iobound(n)
            }
        }
    }
    
    func BenchmarkIOPipeline(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
    
            c := genChan(list)
            out := ioChan(c)
    
            var sum float64
            for n := range out {
                sum += n
            }
        }
    }
    
    func BenchmarkIOPipelineBuffer(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
    
            c := genChanBuffer(list)
            out := ioChanBuffer(c)
    
            var sum float64
            for n := range out {
                sum += n
            }
        }
    }
    
    func BenchmarkIOFan(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
    
            c := genChan(list)
            gonum := runtime.NumCPU() / 2
            outs := make([]<-chan float64, gonum)
            for i := 0; i < gonum; i++ {
                outs[i] = ioChan(c)
            }
    
            var sum float64
            for n := range mergeChan(outs...) {
                sum += n
            }
        }
    }
    
    func BenchmarkIOFanBuffer(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
    
            c := genChanBuffer(list)
            gonum := runtime.NumCPU() / 2
            outs := make([]<-chan float64, gonum)
            for i := 0; i < gonum; i++ {
                outs[i] = ioChanBuffer(c)
            }
    
            var sum float64
            for n := range mergeChanBuffer(outs...) {
                sum += n
            }
        }
    }
    
    func BenchmarkIOParallelize(b *testing.B) {
        for i := 0; i < b.N; i++ {
            list := benchmarkList()
            gonum := runtime.NumCPU()
    
            var sum float64
            num := len(list)
            stride := num / gonum
    
            var wg sync.WaitGroup
            wg.Add(gonum)
            var mux sync.Mutex
    
            for g := 0; g < gonum; g++ {
                go func(g int) {
                    start := g * stride
                    end := start + stride
                    if g == gonum-1 {
                        end = num
                    }
    
                    var sumin float64
                    for _, n := range list[start:end] {
                        sumin += iobound(n)
                    }
    
                    mux.Lock()
                    sum += sumin
                    mux.Unlock()
    
                    wg.Done()
                }(g)
            }
    
            wg.Wait()
        }
    }
    

    设MAX=100,BUFFERSIZE=1000。

    [baixiao@localhost go_concurrency]$ GOGC=off go test -cpu 1,8 -run none -bench IO -benchtime 3s
    goos: linux
    goarch: amd64
    BenchmarkIOSequential                 10         441609349 ns/op
    BenchmarkIOSequential-8               10         457200927 ns/op
    BenchmarkIOPipeline                   10         454147034 ns/op
    BenchmarkIOPipeline-8                 10         467264740 ns/op
    BenchmarkIOPipelineBuffer             10         456459492 ns/op
    BenchmarkIOPipelineBuffer-8           10         452286832 ns/op
    BenchmarkIOFan                       100          36995490 ns/op
    BenchmarkIOFan-8                     100          36950275 ns/op
    BenchmarkIOFanBuffer                 100          37555702 ns/op
    BenchmarkIOFanBuffer-8               100          36851459 ns/op
    BenchmarkIOParallelize                50          88653231 ns/op
    BenchmarkIOParallelize-8              50          87061568 ns/op
    PASS
    ok      _/home/baixiao/go_concurrency   54.007s
    

    可以得出以下结论:

    • 在IO密集的情况下,Fan模型吊打所有
    • channel带buffer没有效率提升
    • 所有的模型,在多核下都没有提升

    结论是?

    1. 不带buffer的channel由于「强同步」特性,无法提高并行,甚至拖累效率
    2. CPU密集型的场景,多核并行能提升效率
    3. IO密集型的场景,多核并行不能提升效率
    4. Pipeling模型有何用途?我没看出来
    5. Waitgroup模型在CPU密集型场景有优势
    6. Fan模型在IO密集型场景有优势

    坑:runtime.NumCPU()不会随着runtime.GOMAXPROCS()改变,前者代表的是系统全部的核数,后者代表的是可同时使用的核数

    goroutine池

    pool模型

    设计一个pool,需要考虑几个方面:输入是什么,做什么事情,多少worker一起执行?

    现抽象出一个goroutine pool的模型代码,可以自定义输入类型,执行函数,worker数量。

    func genPoolChanBuffer(nums []float64) <-chan interface{} {
        out := make(chan interface{}, BUFFERSIZE)
        go func() {
            for _, n := range nums {
                out <- n
            }
            close(out)
        }()
        return out
    }
    
    type Handler func(*Work)
    type Work struct {
        input interface{}
    }
    
    type Pool struct {
        channum   int
        workernum int
        wg        *sync.WaitGroup
        ch        chan Work
        Func      Handler
    }
    
    func (p Pool) RunWorker() {
        for i := 0; i < p.workernum; i++ {
            p.wg.Add(1)
            go func() {
                defer p.wg.Done()
                for work := range p.ch {
                    p.Func(&work)
                }
            }()
        }
    }
    
    func (p Pool) FeedWorker(in <-chan interface{}) {
        go func() {
            for n := range in {
                work := Work{
                    input: n,
                }
                p.ch <- work
            }
            close(p.ch)
        }()
    }
    
    func (p Pool) Wait() {
        p.wg.Wait()
    }
    
    func InitPool(channum, workernum int, f Handler) *Pool {
        return &Pool{
            channum:   channum,
            workernum: workernum,
            wg:        &sync.WaitGroup{},
            ch:        make(chan Work, channum),
            Func:      f,
        }
    }
    

    对比测试

    设计了四个对比测试,观察在CPU密集型和IO密集型的情况下该pool的表现,另外还旨在探索什么情况下worker的数量会越多越好?带Min的测试中设置gonum为系统核数的一半,带Max的测试中设置gonum为系统核数的十倍,gonum即为worker数量。

    //测试函数
    func BenchmarkCPUPool(b *testing.B) {
        channum := 100
        gonum := runtime.NumCPU()
        for i := 0; i < b.N; i++ {
            f := func(w *Work) {
                if v, ok := w.input.(float64); ok {
                    cpubound(v)
                }
            }
            list := benchmarkList()
            c := genPoolChanBuffer(list)
    
            p := InitPool(channum, gonum, f)
            p.RunWorker()
            p.FeedWorker(c)
            p.Wait()
        }
    }
    
    func BenchmarkCPUPoolMin(b *testing.B) {
        channum := 100
        gonum := runtime.NumCPU() / 2
        for i := 0; i < b.N; i++ {
            f := func(w *Work) {
                if v, ok := w.input.(float64); ok {
                    cpubound(v)
                }
            }
            list := benchmarkList()
            c := genPoolChanBuffer(list)
    
            p := InitPool(channum, gonum, f)
            p.RunWorker()
            p.FeedWorker(c)
            p.Wait()
        }
    }
    
    func BenchmarkCPUPoolMax(b *testing.B) {
        channum := 100
        gonum := 10 * runtime.NumCPU()
        for i := 0; i < b.N; i++ {
            f := func(w *Work) {
                if v, ok := w.input.(float64); ok {
                    cpubound(v)
                }
            }
            list := benchmarkList()
            c := genPoolChanBuffer(list)
    
            p := InitPool(channum, gonum, f)
            p.RunWorker()
            p.FeedWorker(c)
            p.Wait()
        }
    }
    
    func BenchmarkIOPool(b *testing.B) {
        channum := 100
        gonum := runtime.NumCPU()
        for i := 0; i < b.N; i++ {
            f := func(w *Work) {
                if v, ok := w.input.(float64); ok {
                    iobound(v)
                }
            }
            list := benchmarkList()
            c := genPoolChanBuffer(list)
    
            p := InitPool(channum, gonum, f)
            p.RunWorker()
            p.FeedWorker(c)
            p.Wait()
        }
    }
    
    func BenchmarkIOPoolMin(b *testing.B) {
        channum := 100
        gonum := runtime.NumCPU() / 2
        for i := 0; i < b.N; i++ {
            f := func(w *Work) {
                if v, ok := w.input.(float64); ok {
                    iobound(v)
                }
            }
            list := benchmarkList()
            c := genPoolChanBuffer(list)
    
            p := InitPool(channum, gonum, f)
            p.RunWorker()
            p.FeedWorker(c)
            p.Wait()
        }
    }
    
    func BenchmarkIOPoolMax(b *testing.B) {
        channum := 100
        gonum := 10 * runtime.NumCPU()
        for i := 0; i < b.N; i++ {
            f := func(w *Work) {
                if v, ok := w.input.(float64); ok {
                    iobound(v)
                }
            }
            list := benchmarkList()
            c := genPoolChanBuffer(list)
    
            p := InitPool(channum, gonum, f)
            p.RunWorker()
            p.FeedWorker(c)
            p.Wait()
        }
    }
    

    设MAX=100,BUFFERSIZE=1000。

    [baixiao@localhost go_concurrency]$ GOGC=off go test -cpu 1,8 -run none -bench Pool -benchtime 3s
    goos: linux
    goarch: amd64
    BenchmarkCPUPool                  100000             38070 ns/op
    BenchmarkCPUPool-8                 50000             77872 ns/op
    BenchmarkCPUPoolMin               100000             33286 ns/op
    BenchmarkCPUPoolMin-8              50000             71690 ns/op
    BenchmarkCPUPoolMax                30000            143250 ns/op
    BenchmarkCPUPoolMax-8              20000            281619 ns/op
    BenchmarkIOPool                      200          21382667 ns/op
    BenchmarkIOPool-8                    200          21467617 ns/op
    BenchmarkIOPoolMin                   100          37068480 ns/op
    BenchmarkIOPoolMin-8                 100          37350682 ns/op
    BenchmarkIOPoolMax                   500           9438800 ns/op
    BenchmarkIOPoolMax-8                 500           9519574 ns/op
    PASS
    ok      _/home/baixiao/go_concurrency   64.149s 
    

    可以得出以下结论:

    • CPU密集型场景中多核下均弱于单核
    • CPU密集型场景中worker数量太多只能起反作用
    • IO密集型场景中多核并行不能提升效率
    • IO密集型场景中worker数量在一定范围内能有效提升效率
    • Pool模型由于用到了channel,多核都不能提升效率

    channel是个好东西?

    在第一篇里,我们讲到channel是goroutine之间通信和同步的重要工具,也是golang中重要的关键字之一,说明golang的设计者们很看重这个特性。

    但是实际上channel的性能较一般,分析源码可知,channel中的数据无论读写都会加mutex锁,造成高并发时的较大瓶颈,这个从我们的对比测试中也都可以看出来。

    image

    上图来自文章

    另外,channel目前都还有整个社群都无法调优的问题,比如runtime: select on a shared channel is slow with many Ps


    本篇的模型可以类比为你现在是个企业主,开工厂进行工业生产了。


    所有代码都在https://github.com/baixiaoustc/go_concurrency/blob/master/second_post_test.go中能找到。

    相关文章

      网友评论

        本文标题:golang并发三板斧系列之二:goroutine池用于并发

        本文链接:https://www.haomeiwen.com/subject/rhshzqtx.html