美文网首页
【译】goroutine调度器,三:并发

【译】goroutine调度器,三:并发

作者: 豆腐匠 | 来源:发表于2019-01-18 17:51 被阅读0次

    [toc]

    原文:Scheduling In Go : Part III - Concurrency

    前言

    这是本系列的第三篇文章,本篇文章重点在并发。

    简介

    当我解决问题时,尤其是新问题,我并不会一开始就考虑是否使用并发进行处理,我会首先找寻一个顺序执行的解决方法并确保可行,然后在可读性与技术方案review后,我会开始考虑并发是否适合并可行,有时候并发是个很好的解决方案,但有些时候却并不一定。

    在此系列的第一部分,我解释了OS调度器的机制和语义,我觉得理解这些对多线程编程很重要,在第二部分,我解释了GO调度器的语义,我认为这个对于理解如何使用GO编写并发很有意义。本篇文章中,我会将OS和GO调度器的机制和语义结合起来,以便更深入的理解什么并发。

    本篇文章的目标是:

    • 提供必要的语义的指导,来帮助你决定是否你的服务需要使用并发

    • 向你展示不同类型的服务如何改变语义,因此你在工程上的决策要如何转变

    什么是并发

    并发意味着“无序”执行。给定一组指令,既可以按顺序执行,也可以找一种方式无序执行,但仍能得到同样的结果。那么你面前的问题是,显然无序执行能够提供额外的价值。我所说的价值,是考虑复杂性的成本,能得到到足够的性能收益。考虑你的实际问题,可能无序执行根本无法实现。

    理解并发并不等于并行也很重要。并行表示同一时间执行两个或更多的指令。这里有一个不同与并发的概念,只有当您拥有至少2个操作系统(OS)和硬件线程并且至少有2个Goroutines时才能实现并行性,每个Goroutines在每个OS /硬件线程上独立执行指令。

    图1:并发与并行

    96_figure1.png

    上图中,你看到有两个逻辑处理器(P),每个都分配了一个独立的系统线程(M),线程被关联到了独立的 Core上。你可以看到两个Goroutines(G1和G2)并行执行,同时在各自的OS /硬件线程上执行它们的指令,在每个逻辑处理器中,三个Goroutines轮流共享各自的OS线程,所有这些Goroutines同时运行,无序执行他们的指令并在OS线程上共享时间。

    这里有个问题,有时没有利用并行性的并发实际会降低你的性能,有趣的是利用了并行性的并发有时也不会得到如期的更好的效果。

    工作负载

    你要如何知道什么时候无序执行是可行的?理解你正在处理的工作负载类型是个很好的开始。下面是两种类型的工作负载,当考虑并发的时候理解这些很有帮助。

    • CPU-Bound:这种类型的工作负载,Goroutine永远不会自然切换到等待状态,这是不断进行计算的工作,一个线程计算π的第n位数就是这种类型。

    • IO-Bound:这种类型的工作负载,Goroutine会自然切换到等待状态,这类事件包括通过网络请求资源,或进行系统调用,或等待事件发生。一个Goroutine读取文件是IO-Bound。我会把同步事件(互斥,原子)导致Goroutine进入等待,也归到此类里。

    使用CPU-Bound工作负载,你需要并行来利用并发性。处理多个Goroutines的单个OS /硬件线程效率不高,因为Goroutines不会进入和退出等待状态。拥有比系统/硬件线程更多的Goroutine会降低性能,因为在操作系统线程上移动和关闭Goroutines存在延迟成本(花费的时间)。上下文切换正在为你的工作负载创建“停止世界”的事件,因为在切换期间您的工作都不会被执行。

    使用IO-Bound工作负载,你不需要并行来使用并发。单个OS /硬件线程可以高效地处理多个Goroutines,因为Goroutines作为其工作负载的一部分自然地进出等待状态。拥有比操作系统/硬件线程更多的Goroutines可以加快工作执行速度,因为在操作系统线程上移动和移除Goroutines的延迟成本并不会产生“停止世界”事件。您的工作负载自然停止,这可以使不同的Goroutine有效地利用相同的OS /硬件线程,而不是让OS /硬件线程闲置。

    您如何知道每个硬件线程有多少Goroutines可以提供最佳吞吐量?太少的Goroutine导致你有太多的空闲资源,太多的Goroutine导致你有太多的延迟时间。这是你需要考虑的事情,但是超过了这篇文章的讨论范围。

    现在,更重要的是要通过仔细推敲代码来帮助我们准确识别什么情况需要并发,什么情况不能用并发,以及是否需要并行。

    加法

    我们不需要复杂的代码来搞懂这个机制。看下面的 add 函数,功能就是对一组数字求和。

    代码1

    36 func add(numbers []int) int {
    37     var v int
    38     for _, n := range numbers {
    39         v += n
    40     }
    41     return v
    42 }
    

    在代码1的第36行,声明了一个add函数,作用是接受一个整数集合并返回集合的总和。37行声明了一个变量v用来记录总和。然后在第38行,函数线性地遍历集合,并且每个数字被添加到第39行的当前总和。最后在第41行,函数将最终的总和返回给调用者。

    问题:这个add函数适合无序执行吗?我相信答案是肯定的,这个整数集合可以被分解为小的列表,并且可以同时处理这些列表。一旦所有的小列表相加,这个结果的集合加在一起,就可以产生和顺序执行相同的答案。

    但是,还有另一个问题。应该独立创建和处理多少个较小的列表以获得最佳吞吐量?要回答此问题,你必须知道add所属的工作负载类型。该add函数正在执行CPU-Bound,因为该算法正在执行纯数学运算,并且它不会导致goroutine进入等待状态。这意味着每个OS/硬件线程使用一个Goroutine就可以获得良好的吞吐量。

    下面的代码2是我的并发版本add

    注意:编写并发版本的add时,可以采用多种方法和选项。暂时不要过分关注我的特定实现。如果你有一个更易读的版本,表现相同或更好,我希望你能分享它。

    代码2

    44 func addConcurrent(goroutines int, numbers []int) int {
    45     var v int64
    46     totalNumbers := len(numbers)
    47     lastGoroutine := goroutines - 1
    48     stride := totalNumbers / goroutines
    49
    50     var wg sync.WaitGroup
    51     wg.Add(goroutines)
    52
    53     for g := 0; g < goroutines; g++ {
    54         go func(g int) {
    55             start := g * stride
    56             end := start + stride
    57             if g == lastGoroutine {
    58                 end = totalNumbers
    59             }
    60
    61             var lv int
    62             for _, n := range numbers[start:end] {
    63                 lv += n
    64             }
    65
    66             atomic.AddInt64(&v, int64(lv))
    67             wg.Done()
    68         }(g)
    69     }
    70
    71     wg.Wait()
    72
    73     return int(v)
    74 }
    

    在代码2中,addConcurrent方法就是add的并发版本,对比非并发版的5行代码并发版使用了26行代码,这里代码较多我只选几行重要的来解释一下。

    第48行:每个Goroutine都会得到他们自己唯一的但更小的数字列表。列表的大小是通过获取集合的大小除以Goroutines的数量来计算的。

    第53行:创建Goroutines池以执行添加工作。

    第57-59行:最后一个Goroutine将添加剩余的数字列表,这些数字可能比其他Goroutines更大。

    第66行:将较小列表的总和加在一起作为最终总和。

    并发版本肯定比顺序版本更复杂,但复杂性值得吗?回答这个问题的最好方法是创建一个基准。对于这些基准测试,我使用了一个1000万个数字的集合,并关闭了GC。这里有一个使用该add函数的顺序版本和使用该函数的并发版本addConcurrent。

    代码3

    func BenchmarkSequential(b *testing.B) {
        for i := 0; i < b.N; i++ {
            add(numbers)
        }
    }
    
    func BenchmarkConcurrent(b *testing.B) {
        for i := 0; i < b.N; i++ {
            addConcurrent(runtime.NumCPU(), numbers)
        }
    }
    

    以下是所有Goroutines只有一个OS /硬件线程可用的结果。顺序版本使用1个 Goroutine,并发版本runtime.NumCPU()或8个 Goroutines。在这种情况下,并发版本正在利用没有并行性的并发。

    代码4

    10 Million Numbers using 8 goroutines with 1 core
    2.9 GHz Intel 4 Core i7
    Concurrency WITHOUT Parallelism
    -----------------------------------------------------------------------------
    $ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
    goos: darwin
    goarch: amd64
    pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
    BenchmarkSequential             1000       5720764 ns/op : ~10% Faster
    BenchmarkConcurrent             1000       6387344 ns/op
    BenchmarkSequentialAgain        1000       5614666 ns/op : ~13% Faster
    BenchmarkConcurrentAgain        1000       6482612 ns/op
    

    注意:在本地计算机上运行基准测试很复杂。有许多因素可能导致您的基准测试不准确。确保您的机器尽可能空闲并多运行基准测试几次。你希望确保在结果中看到一致性。通过测试工具运行两次基准测试,可以为此基准测试提供最一致的结果。

    清单4中的基准测试表明,当只有一个OS /硬件线程可供所有Goroutines使用时,Sequential版本比Concurrent快约10%到13%。这与我的预期一致,因为并发版本具有单个OS线程上的上下文切换和Goroutines管理的开销。

    下面的是每个Goroutine可用的单独OS /硬件线程的结果。顺序版本使用1个 Goroutine,并发版本使用runtime.NumCPU()或8个 Goroutines。在这种情况下,并发版本正在利用并行性。

    代码5

    10 Million Numbers using 8 goroutines with 8 cores
    2.9 GHz Intel 4 Core i7
    Concurrency WITH Parallelism
    -----------------------------------------------------------------------------
    $ GOGC=off go test -cpu 8 -run none -bench . -benchtime 3s
    goos: darwin
    goarch: amd64
    pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
    BenchmarkSequential-8               1000       5910799 ns/op
    BenchmarkConcurrent-8               2000       3362643 ns/op : ~43% Faster
    BenchmarkSequentialAgain-8          1000       5933444 ns/op
    BenchmarkConcurrentAgain-8          2000       3477253 ns/op : ~41% Faster
    

    清单5中的基准测试表明,当为每个Goroutine提供单独的OS /硬件线程时,并发版本比顺序版本快大约41%到43%。这与我的预期相符,因为所有Goroutines现在并行运行,八个Goroutines同时执行他们的同时工作。

    排序

    要知道并非所有CPU-Bound工作都适合并发。尤其是在把工作拆分再把结果合并所带来的开销很大时。使用冒号排序的排序算法可以看到这方面的一个例子。以下是在Go中实现冒泡排序的代码。

    代码6

    01 package main
    02
    03 import "fmt"
    04
    05 func bubbleSort(numbers []int) {
    06     n := len(numbers)
    07     for i := 0; i < n; i++ {
    08         if !sweep(numbers, i) {
    09             return
    10         }
    11     }
    12 }
    13
    14 func sweep(numbers []int, currentPass int) bool {
    15     var idx int
    16     idxNext := idx + 1
    17     n := len(numbers)
    18     var swap bool
    19
    20     for idxNext < (n - currentPass) {
    21         a := numbers[idx]
    22         b := numbers[idxNext]
    23         if a > b {
    24             numbers[idx] = b
    25             numbers[idxNext] = a
    26             swap = true
    27         }
    28         idx++
    29         idxNext = idx + 1
    30     }
    31     return swap
    32 }
    33
    34 func main() {
    35     org := []int{1, 3, 2, 4, 8, 6, 7, 2, 3, 0}
    36     fmt.Println(org)
    37
    38     bubbleSort(org)
    39     fmt.Println(org)
    40 }
    

    代码6中,是一段用Go写的冒泡排序的例子,这种排序算法每次传递时会扫描交换值的整数集合。根据列表的顺序,在对所有内容进行排序之前,可能需要多次遍历集合。

    问题:该bubbleSort函数是一个适合乱序执行的工作吗?我相信答案是否定的。整数集合可以分解为较小的列表,并且可以同时对这些列表进行排序。但是,在完成所有并发工作之后,没有有效的方法将较小的列表排序在一起。以下是冒泡排序的并发版本的示例。

    代码8

    01 func bubbleSortConcurrent(goroutines int, numbers []int) {
    02     totalNumbers := len(numbers)
    03     lastGoroutine := goroutines - 1
    04     stride := totalNumbers / goroutines
    05
    06     var wg sync.WaitGroup
    07     wg.Add(goroutines)
    08
    09     for g := 0; g < goroutines; g++ {
    10         go func(g int) {
    11             start := g * stride
    12             end := start + stride
    13             if g == lastGoroutine {
    14                 end = totalNumbers
    15             }
    16
    17             bubbleSort(numbers[start:end])
    18             wg.Done()
    19         }(g)
    20     }
    21
    22     wg.Wait()
    23
    24     // Ugh, we have to sort the entire list again.
    25     bubbleSort(numbers)
    26 }
    

    在代码8中,显示了bubbleSortConcurrent函数,它是并发版本的bubbleSort。它使用多个Goroutines同时对列表的某些部分进行排序。但是,你剩下的是以块为单位的已排序值列表。给定一个36个数组的列表,分成每组12个数字。如果整个列表在25行没有再次排序,这将是结果列表。

    代码9

    Before:
      25 51 15 57 87 10 10 85 90 32 98 53
      91 82 84 97 67 37 71 94 26  2 81 79
      66 70 93 86 19 81 52 75 85 10 87 49
    
    After:
      10 10 15 25 32 51 53 57 85 87 90 98
       2 26 37 67 71 79 81 82 84 91 94 97
      10 19 49 52 66 70 75 81 85 86 87 93
    

    由于冒泡排序的本质是扫描列表,因此bubbleSort对第25行的调用将抵消使用并发性带来的任何潜在收益。对于冒泡排序,使用并发性没有性能提升。

    文件读取

    前面展示两种CPU-Bound类型的工作,那么IO-Bound是怎么样的?当Goroutines自然地进出等待状态时,语义是否不同?来看一个IO-Bound的例子,读取文件并执行文本搜索。

    第一个版本是顺序执行的版本find函数

    代码10

    42 func find(topic string, docs []string) int {
    43     var found int
    44     for _, doc := range docs {
    45         items, err := read(doc)
    46         if err != nil {
    47             continue
    48         }
    49         for _, item := range items {
    50             if strings.Contains(item.Description, topic) {
    51                 found++
    52             }
    53         }
    54     }
    55     return found
    56 }
    
    

    在代码10中的第43行,声明了一个名为found的变量,以保存在给定文档中找到topic的次数。然后在44行迭代文档并在45行读取文档。最后在第49-53行,strings包中的Contains函数用于检查是否可以在从文档中读取的项集合中找到topic。如果找到topic,则found变量加1。

    下面是在find中调用的read函数

    代码11

    33 func read(doc string) ([]item, error) {
    34     time.Sleep(time.Millisecond) // Simulate blocking disk read.
    35     var d document
    36     if err := xml.Unmarshal([]byte(file), &d); err != nil {
    37         return nil, err
    38     }
    39     return d.Channel.Items, nil
    40 }
    

    read方法在开始的地方滴啊用了time.Sleep方法延时1毫秒。此调用用于模拟在我们执行实际系统调用以从磁盘读取文档时可能产生的延迟。此延迟的一致性对于准确测量顺版find与并发版本的性能对比非常重要。在35到39行,将存储在全局变量file中的模拟xml文档解析为struct进行处理。最后在39行将这组数据返回给调用方。

    有了顺序版本,自然就有并发版本。

    注意:编写并发版本的find时,可以采用多种方法和选项。不要过分关注我的特定实现。如果你有一个更易读的版本,表现相同或更好,我希望你能分享它。

    代码12

    58 func findConcurrent(goroutines int, topic string, docs []string) int {
    59     var found int64
    60
    61     ch := make(chan string, len(docs))
    62     for _, doc := range docs {
    63         ch <- doc
    64     }
    65     close(ch)
    66
    67     var wg sync.WaitGroup
    68     wg.Add(goroutines)
    69
    70     for g := 0; g < goroutines; g++ {
    71         go func() {
    72             var lFound int64
    73             for doc := range ch {
    74                 items, err := read(doc)
    75                 if err != nil {
    76                     continue
    77                 }
    78                 for _, item := range items {
    79                     if strings.Contains(item.Description, topic) {
    80                         lFound++
    81                     }
    82                 }
    83             }
    84             atomic.AddInt64(&found, lFound)
    85             wg.Done()
    86         }()
    87     }
    88
    89     wg.Wait()
    90
    91     return int(found)
    92 }
    

    在代码12中,对比非并发版本的13行代码,这里使用了30行代码。我实现并发版本的目标是控制用于处理未知数量文档的Goroutine的数量,我选择通过一个通道来控制Goroutines池。

    61-64行,创建一个通道并放入全部需要处理的文档。

    65行,当所有文档处理完毕,关闭通道这样Goroutines池可以自然进入terminate状态

    70行,创建Goroutine池

    73-83行,每个Goroutine从channel接收到一个文档,读取文档进内存并检查内容是否有topic,当匹配成功,found变量累加。

    84行,每个独立的Goroutine的统计结果最终一起求和。

    同样进行基准测试进行对比,我使用了一个1000个文件的集合,并关闭了GC。

    代码13

    func BenchmarkSequential(b *testing.B) {
        for i := 0; i < b.N; i++ {
            find("test", docs)
        }
    }
    
    func BenchmarkConcurrent(b *testing.B) {
        for i := 0; i < b.N; i++ {
            findConcurrent(runtime.NumCPU(), "test", docs)
        }
    }
    

    以下是所有Goroutines只有一个OS /硬件线程可用的结果。顺序版本使用1个 Goroutine,并发版本runtime.NumCPU()或8个 Goroutines。在这种情况下,并发版本正在利用没有并行性的并发。

    代码14

    10 Thousand Documents using 8 goroutines with 1 core
    2.9 GHz Intel 4 Core i7
    Concurrency WITHOUT Parallelism
    -----------------------------------------------------------------------------
    $ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
    goos: darwin
    goarch: amd64
    pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
    BenchmarkSequential                3    1483458120 ns/op
    BenchmarkConcurrent               20     188941855 ns/op : ~87% Faster
    BenchmarkSequentialAgain           2    1502682536 ns/op
    BenchmarkConcurrentAgain          20     184037843 ns/op : ~88% Faster
    

    测试结果表明,并发版本大约要快百分之87到88,与我期望的相符,Goroutines高效的共享了单线程,每个Goroutine在单个OS /硬件线程上自然的上下文切换让它执行了更多的工作。

    下面的是每个Goroutine可用的单独OS /硬件线程的结果。顺序版本使用1个 Goroutine,并发版本使用runtime.NumCPU()或8个 Goroutines。在这种情况下,并发版本正在利用并行性。

    代码15

    10 Thousand Documents using 8 goroutines with 1 core
    2.9 GHz Intel 4 Core i7
    Concurrency WITH Parallelism
    -----------------------------------------------------------------------------
    $ GOGC=off go test -run none -bench . -benchtime 3s
    goos: darwin
    goarch: amd64
    pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
    BenchmarkSequential-8                  3    1490947198 ns/op
    BenchmarkConcurrent-8                 20     187382200 ns/op : ~88% Faster
    BenchmarkSequentialAgain-8             3    1416126029 ns/op
    BenchmarkConcurrentAgain-8            20     185965460 ns/op : ~87% Faster
    

    这项测试表明,额外的OS/硬件线程并没有取得更好的表现。

    结论

    这篇文章的目的是提供有关必须考虑的语义的指导,以确定工作负载是否适合使用并发。我尝试提供不同类型的算法和工作负载的示例,以便你可以看到语义上的差异以及需要考虑的不同工程决策。

    可以清楚地看到,使用IO-Bound工作负载并不需要并行性来获得性能的大幅提升。这与你在CPU-Bound工作中看到的相反。当涉及像冒泡排序这样的算法时,并发性的使用会增加复杂性,而不会带来任何实际的性能优势。

    相关文章

      网友评论

          本文标题:【译】goroutine调度器,三:并发

          本文链接:https://www.haomeiwen.com/subject/bmtodqtx.html