介绍
当我解决问题时,尤其是新问题,我不会一上来就想着能不能使用并发来处理。我会首先想出一个顺序执行的解决方案。然后在可读性和技术评审后,我会开始考虑并发处理能不能让问题得到更快的解决。有时很显然并发是很合适的,但有时候并不那么明显。
第一篇文章,我讲解了系统调度器的机制和原理,我相信这对写一个多线程代码是重要的。第二篇文章我讲解了 Go 语言调度器的机制,对如何 Go 语言写出并发代码是重要的。这篇文章,我会开始把操作系统和 Go 调度器统一起来,提供深入理解下什么并发,什么不是。
这篇文章有两个目的:
- 提供几个在考虑你的服务是否适合用并发来解决时,需要思考的关键点。
- 像你展示不同类型的工作,需要的工程决策也是不同的。
什么是并发
并发意味着不按顺序执行。给定一组指令,可以按顺序执行,也可以找一种方式不按顺序执行,但仍能得到同样的结果。对于你眼前的问题,显然乱序执行能够增加价值。我所说的价值,意思是以复杂性为成本,而得到足够的性能增益。关键还是要看你的问题,它可能无法或甚至无法进行无序执行。
理解并发并不等于并行也是重要的。并行意味着 2 个或者 2 个以上的指令可以在同一时间一起执行。这和并发的概念不同。只有你的机器有至少 2 个 hardware threads 才能使你的程序达到并行效果。
并发和并行上图中,你看到有两个逻辑处理器(P),每个都分配了一个独立的系统线程(M),线程被关联到了独立的 Core 上。你可以看到 2 个 Goroutine 正在并行执行,他们同时在不同的 Core 上执行各自的指令。对于每一个逻辑处理器,3 个 Groutine 正在轮流共享他们的系统线程。所有的 Goroutine 正在并发执行,没有按照特定的顺序执行他们的指令。
这里有个难点,就是有时候在没有并行能力机器上使用并发,实际上会降低你的性能。更有趣的是,有时候利用并行来达并发效果,并不会如你期望的那样得到更好的性能。
工作负荷
首先,你最好是先搞懂你的问题属于哪种负荷类型。是 CPU 密集型的还是 IO 密集型的。之前的文章有描述,这里不再重复。
CPU 密集型的工作你需要通过并行来达到并发。单个 hardware thread 处理多个 Goroutine 并不高效,因为这些 Goroutine 不会进入阻塞状态。使用比机器 hardware thread 数量更多的 Goroutine 会减慢工作,因为把 Goroutine 不断从系统线程上调来调去也是有成本。上下文切换会触发 Stop The World(简称STW)事件,因为在切换过程中你的工作不会被执行。
对于 IO 密集型的工作,你不需要并行来保证并发。单个 hardware thread 可以处理高效的处理多个 Goroutine,因为这些 Goroutine 会自动进出于阻塞态。拥有比 hardware thread 更多的 Goroutine 可以加速工作的完成,因为让 Goroutine 从系统线程上调来调去不会触发 STW 事件。你的工作会自动暂停,这允许其他不同的 Goroutine 使用同一个 hardware thread 来更高效的完成工作,而不是让它处于空闲状态。
你如何能知道,一个 hardware thread 上跑多少个 Goroutine 才能得到最大程度的吞吐量呢?太少的 Goroutine 导致你有太多的空闲资源,太多的 Goroutine 导致你有太多的延迟时间。这是您需要考虑的,但是超出了这篇文章的范围。
是时候该看一些代码了,可以巩固一下你判断工作适不适合并发,是否需要并行的能力。
加法
我们不需要复杂的代码来搞懂这个机制。看下面的 add
函数,功能就是对一组数字求和。
func add(numbers []int) int {
var v int
for _, n := range numbers {
v += n
}
return v
}
问题:这个 add
函数可不可以乱序执行?答案可以的。一个数字集合可以被拆成更小的一些集合,这些小集合是可以并发处理的。一旦小集合求和完了,所有的结果再相加求和,能得到同样的答案。
但是,还有另外一个问题。应该把集合拆成多小,才能让速度最快呢?
为了回答这个问题你就必须知道 add
属于哪种工作。add
方法属于 CPU 密集型,因为算法就是不断执行数学运算,不会导致 Goroutine 进入阻塞态。这意味着每一个 hardware thread 跑一个 Goroutine 会让速度最快。
下面有一个并发版本的 add
。
44 func addConcurrent(goroutines int, numbers []int) int {
45 var v int64
46 totalNumbers := len(numbers)
47 lastGoroutine := goroutines - 1
48 stride := totalNumbers / goroutines
49
50 var wg sync.WaitGroup
51 wg.Add(goroutines)
52
53 for g := 0; g < goroutines; g++ {
54 go func(g int) {
55 start := g * stride
56 end := start + stride
57 if g == lastGoroutine {
58 end = totalNumbers
59 }
60
61 var lv int
62 for _, n := range numbers[start:end] {
63 lv += n
64 }
65
66 atomic.AddInt64(&v, int64(lv))
67 wg.Done()
68 }(g)
69 }
70
71 wg.Wait()
72
73 return int(v)
74 }
上面代码中,addConcurrent
函数就是并发版本的 add
方法。解释下这里面比较重要的几行代码。
48行:每个 Goroutine 获得一个独立的但是更小的数字集合进行相加。每个集合的大小是总集合的大小除以 Goroutine 的数量。
53行: 一堆 Goroutine 开始执行加法运算。
57-59行:最后一个 Goroutine 要把剩下的所有数字相加,这有可能使得它的集合要比其他集合大。
66行: 将小集合的求和结果,加在一起得到最终求和结果。
并发版本的实现,要比顺序版本的更复杂,但这到底值不值呢?最好的办法就是做压测。压测时我使用一千万数字的集合,并关闭掉 GC。
func BenchmarkSequential(b *testing.B) {
for i := 0; i < b.N; i++ {
add(numbers)
}
}
func BenchmarkConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
addConcurrent(runtime.NumCPU(), numbers)
}
}
上面代码就是压测函数。这里我只分配一个 CPU Core 给程序。顺序版本的使用 1 个 Goroutine,并发版本的使用 runtie.NumCPU()
个(也就是 8 个)Goroutine。这种情况下并发版本因为无法使用并行来进行并发。
10 Million Numbers using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential 1000 5720764 ns/op : ~10% Faster
BenchmarkConcurrent 1000 6387344 ns/op
BenchmarkSequentialAgain 1000 5614666 ns/op : ~13% Faster
BenchmarkConcurrentAgain 1000 6482612 ns/op
上面的压测结果显示了,在只有一个 CPU Core 的情况下,顺序版本的要比并发版本的快 10%~13%。这与我估计的一样,因为并发版本的那几个 Goroutine 在同一个 CPU Core 上不断的做切换。
下面我们用多核进行在此测试。顺序版本还是只使用 1 个核,而并发版本使用 8 个核。这种情况下,并发版本可以同步并行来并发。
10 Million Numbers using 8 goroutines with 8 cores
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 8 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential-8 1000 5910799 ns/op
BenchmarkConcurrent-8 2000 3362643 ns/op : ~43% Faster
BenchmarkSequentialAgain-8 1000 5933444 ns/op
BenchmarkConcurrentAgain-8 2000 3477253 ns/op : ~41% Faster
上面的压测结果显示了,在多核的情况下并发版本要比顺序版本快 41%~43%。这也是符合预期的,因为所有的 Goroutine 都是并行执行的,8 个 Goroutine 是同时一起工作的。
排序
我们一定要知道,不是所有的 CPU密集型工作都可以使用并发完成的。尤其是在把工作拆分再把结果合并所带来的开销很大时。比如冒泡排序。看看下面冒泡排序的实现。
01 package main
02
03 import "fmt"
04
05 func bubbleSort(numbers []int) {
06 n := len(numbers)
07 for i := 0; i < n; i++ {
08 if !sweep(numbers, i) {
09 return
10 }
11 }
12 }
13
14 func sweep(numbers []int, currentPass int) bool {
15 var idx int
16 idxNext := idx + 1
17 n := len(numbers)
18 var swap bool
19
20 for idxNext < (n - currentPass) {
21 a := numbers[idx]
22 b := numbers[idxNext]
23 if a > b {
24 numbers[idx] = b
25 numbers[idxNext] = a
26 swap = true
27 }
28 idx++
29 idxNext = idx + 1
30 }
31 return swap
32 }
33
34 func main() {
35 org := []int{1, 3, 2, 4, 8, 6, 7, 2, 3, 0}
36 fmt.Println(org)
37
38 bubbleSort(org)
39 fmt.Println(org)
40 }
冒泡排序通过将数组中数字交换位置来达到排序的效果。在某些情况中,有些数字可能要交换好几次。
问题:bubboSort
函数是否可以乱序执行?答案是不可以。这组数字是可以拆成一堆小数组并并发的执行排序。但是,当所有并发工作完成后,没有一个高效的方法将这些小结果合并成最终的结果。我们来看一下并发版本的冒泡排序。
01 func bubbleSortConcurrent(goroutines int, numbers []int) {
02 totalNumbers := len(numbers)
03 lastGoroutine := goroutines - 1
04 stride := totalNumbers / goroutines
05
06 var wg sync.WaitGroup
07 wg.Add(goroutines)
08
09 for g := 0; g < goroutines; g++ {
10 go func(g int) {
11 start := g * stride
12 end := start + stride
13 if g == lastGoroutine {
14 end = totalNumbers
15 }
16
17 bubbleSort(numbers[start:end])
18 wg.Done()
19 }(g)
20 }
21
22 wg.Wait()
23
24 // Ugh, we have to sort the entire list again.
25 bubbleSort(numbers)
26 }
bubbleSortConcurrent
函数是 bubbleSort
的并发实现。它使用多个 Goroutine 并发的对数组自己进行排序。但是,你最后在结果合并时,相当于把整个数组又重新排了一遍。
读者可以思考下归并排序适不适合并发,最好亲自动手测试一下
读文件
上面举了 2 个 CPU 密集型例子,但是 IO 密集型的呢?在 Goroutine 自然的不断频繁的进出阻塞态时,情况有什么不同?我们看一个 IO 密集的例子,就是读取一些文件然后执行查找操作。
第一个版本是顺序方式实现,函数名是 find
42 func find(topic string, docs []string) int {
43 var found int
44 for _, doc := range docs {
45 items, err := read(doc)
46 if err != nil {
47 continue
48 }
49 for _, item := range items {
50 if strings.Contains(item.Description, topic) {
51 found++
52 }
53 }
54 }
55 return found
56 }
上面代码就是 find
方法的实现。功能就是从一组字符串中找到
下面是 read
函数的实现
33 func read(doc string) ([]item, error) {
34 time.Sleep(time.Millisecond) // Simulate blocking disk read.
35 var d document
36 if err := xml.Unmarshal([]byte(file), &d); err != nil {
37 return nil, err
38 }
39 return d.Channel.Items, nil
40 }
read
函数在使用了 time.Sleep
把自己挂起 1 毫秒。这个主要是为了模拟 IO 操作延迟。因为你实际去读文件,Goroutine 也是一样会被挂起一段时间。这 1 毫秒的延迟对于后面的压测结果至关重要。
下面是并发版本的实现。
58 func findConcurrent(goroutines int, topic string, docs []string) int {
59 var found int64
60
61 ch := make(chan string, len(docs))
62 for _, doc := range docs {
63 ch <- doc
64 }
65 close(ch)
66
67 var wg sync.WaitGroup
68 wg.Add(goroutines)
69
70 for g := 0; g < goroutines; g++ {
71 go func() {
72 var lFound int64
73 for doc := range ch {
74 items, err := read(doc)
75 if err != nil {
76 continue
77 }
78 for _, item := range items {
79 if strings.Contains(item.Description, topic) {
80 lFound++
81 }
82 }
83 }
84 atomic.AddInt64(&found, lFound)
85 wg.Done()
86 }()
87 }
88
89 wg.Wait()
90
91 return int(found)
92 }
上面代码中 findConcurrent
函数就是 find
函数的并发实现版本。并发版本中使用适量的 Goroutine 完成不定数量的文档查询。代码太多,这里只解释几个重要的地方。
61-64行:创建了一个 Channel,用来发送文档。
65行: 关闭了 channel,这样当所有文档都处理完了后,所有的 Goroutine 就都自动退出了。
70行: Goroutine 被创建
73-83行:每个 Goroutine 从 Channel 中获取文档,把文档读到内存并查找 topic。当找到了,将本地变量 lFound
加一。
84行:每个 Goroutine 都将自己查找到的文档个数,加到全局变量found
上。
下面我使用一千个文档进行压测,并关闭 GC。
func BenchmarkSequential(b *testing.B) {
for i := 0; i < b.N; i++ {
find("test", docs)
}
}
func BenchmarkConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
findConcurrent(runtime.NumCPU(), "test", docs)
}
}
上面是压测代码。下面是仅使用 1 个 CPU Core 的压测结果。此时并发版本也只能使用一个 CPU Core 来执行 8 个 Goroutine。
10 Thousand Documents using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential 3 1483458120 ns/op
BenchmarkConcurrent 20 188941855 ns/op : ~87% Faster
BenchmarkSequentialAgain 2 1502682536 ns/op
BenchmarkConcurrentAgain 20 184037843 ns/op : ~88% Faster
发现在单核的情况下。并发版本要比顺序版本快 87%~88%。这和我预期一致。因为所有的 Goroutine 共用一个 CPU Core,而 Goroutine 在调用 read
时候会自动进入阻塞态,这时会将 CPU Core 出让给其他 Goroutine 使用,这使得这个 CPU Core 更加繁忙。
下面看下使用多核进行压测。
10 Thousand Documents using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential-8 3 1490947198 ns/op
BenchmarkConcurrent-8 20 187382200 ns/op : ~88% Faster
BenchmarkSequentialAgain-8 3 1416126029 ns/op
BenchmarkConcurrentAgain-8 20 185965460 ns/op : ~87% Faster
这个压测结果说明,更多的 CPU Core 并不会对性能有多大的提升。
结论
这篇文章的目标就是用具体的例子像你说明,你一定要考虑你的场景到底适不适合并发。这样才能做出更好的工程决策。
你现在清晰的看到了,对于 IO 密集型的工作,并行对提升性能没有多大的帮助,这与 CPU 密集型的正好相反。但是并不是所有 CPU 密集型的工作都适合并发的,比如冒泡排序。搞明白你所应对的场景的特点非常重要。
网友评论