Channel 是什么?
channel,通道,本质上是一个通信对象,goroutine 之间可以使用它来通信。从技术上讲,通道是一个数据传输管道,可以向通道写入或从中读取数据。
定义一个channel
零值通道Go 规定: 使用
chan
关键字来创建通道,使用make
关键字初始化通道,一个通道只能传输一种类型的数据。
上面的程序定义一个通道c
变量,它可以传输类型为int的数据。上面的程序打印 <nil>, 是因为通道的零值是 nil。但是 nil 通道不能传输数据。因此,我们必须使用 make 函数来创建一个可用的通道。
我们使用简写语法 :=
和 make
函数创建通道。上述程序产生以下结果:
type of `c` is chan int
value of `c` is 0xc0420160c0
注意通道的值 c
,看起来它是一个内存地址。
通道是指针类型。某些场景下,goroutine 之间通信时,会将通道作为参数传递给函数或方法,当程序接收该通道作为参数时,无需取消引用它即可从该通道推送或拉取数据。
通道数据读写
Go规定:使用左箭头语法
<-
从通道读取和写入数据。
- 写入数据
c <- data
上面的示例表示:将data发送到通道 c 中。看箭头的方向,它从data指向c, 因此可以想象”我们正在尝试将data推送到 c”。
- 读取数据
<- c
上述示例表示:从通道 c 读取数据。看箭头方向,从 c 通道开始。
该语句不会将数据推存到任何内容中,但它仍然是一个有效的语句。
如果您有一个变量data,可以保存来自通道的数据,则可以使用以下语法:
var data int
data = <- c
现在来自通道 c 的 int 类型的数据可以存储到变量data中, data 必须也是int类型。
上面的语法可以使用简写语法重写,如下所示
data := <- c
golang 自动识别c中的数据类型,并将data设置为同样的类型
以上所有通道操作都是阻塞的
在上一课中,我们使用 time.Sleep 阻塞了goroutine,从而调度了其它的goroutine。 而通道操作本质上也是阻塞的,当向通道写入数据时,当前goroutine 会被阻塞,直到其它goroutine 从该通道读取数据。我们在并发章节中看到,当前的goroutine阻塞后, 调度器会调度其它空闲的goroutine继续工作,从而保证程序不会永远阻塞,这是用channel来做的。通道的这个特性在 goroutines 通信中非常有用,因为它可以防止我们编写手动锁和 hack 来使它们彼此协同工作。
channel 实践
goroutine 和 channel下面我们一步一步的讲讲解上面程序的执行过程:
- 首先声明了greet函数,它接受字符串类型的通道c。greeter这个函数从通道 c 读取数据,并打印到控制台。
- 在 main 函数中,第一条语句: 打印 "main started" 到控制台
- main函数第二条语句:使用 make 初始化字符串类型的通道 c
- main 函数第三条语句中: 将通道c 传递给 greet 函数,但使用 go 关键字将其作为 goroutine 执行。
- 此时,进程有 2 个 goroutine,而活动goroutine 是
main goroutine
(查看上一课就知道它是什么)。然后控制权转到下一行代码。 - main 第四行语句:将字符串值 "John" 发送到通道 c。此时,goroutine 被阻塞,直到某个 goroutine 读取它。 Go调度器调度
greet goroutine
,它按照第一点中提到的那样执行。 - 然后
main goroutine
激活并执行最后的语句,打印 "main stopped"。
deadlock
前面我们说了“通道本质上是阻塞的”,当在通道写入或读取数据时,该 goroutine 会阻塞,而且会一直阻塞,控制权被传递给其他可用的 goroutine。如果没有其他可用的 goroutine 怎么办,程序无法向下执行了,这就产生死锁错误,导致整个程序崩溃。
当尝试从某个通道读取数据,但通道没有可用的值时,会期望其它 goroutine 推送值到该通道, 而阻塞当前goroutine,交出控制权,因此,此读取操作将是阻塞的。同样,如果要向通道发送数据,会交出控制权到其它goroutine,直到某个 goroutine 从中读取数据。因此,此发送操作将被阻塞。
死锁的一个简单示例是只有主 goroutine 执行一些通道操作。
死锁上面的程序将在运行时死锁了, 抛出以下错误:
main() started
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
program.Go:10 +0xfd
exit status 2
fatal error: all goroutines are asleep - deadlock!
。似乎所有 goroutine 都处于睡眠状态,或者根本没有其他 goroutine 可用于调度。
关闭channel
一个通道可以关闭,这样就不能再通过它发送数据了。接收器 goroutine 可以使用 val, ok := <- c
语法找出通道的状态,如果通道打开或可以执行读取操作,则 ok 为真,如果通道关闭且无法执行更多读取操作,则为 false 。
可以使用
close(channel)
的close
内置函数关闭通道。
让我们看一个简单的例子。
关闭channel只是为了帮助你理解阻塞的概念,首先发送操作
c <- "John"
是阻塞的,一些 goroutine 必须从通道读取数据,因此greet goroutine
是由 Go调度器调度的。然后第一次读取操作<-c
是非阻塞的,因为数据存在于通道 c 中以供读取。第二次读取操作<-c
将被阻塞,因为通道 c 没有任何数据可供读取,因此 Go 调度器激活main goroutine
并且程序从close(c)
函数开始执行。
从上面的错误中,是由尝试在关闭的通道上发送数据引起的。为了更好地理解关闭通道的可用性,让我们看看 for 循环。
for loop
for{} 的无限循环语法可用于读取通过通道发送的多个值。
for循环读取channel在上面的例子中,我们正在创建 goroutine squares,它一个一个地返回从 0 到 9 的数字的平方。在main goroutine 中,我们正在无限循环中读取这些数字。
在无限 for 循环中,由于我们需要一个条件来在某个时刻中断循环,因此我们使用语法 val, ok := <-c
从通道读取值。在这里,当通道关闭时,ok 会给我们额外的信息。因此,在 squares 协程中,在写完所有数据后,我们使用语法 close(c)
关闭通道。当 ok 为真时,程序打印 val 中的值和通道状态 ok。当它为假时,我们使用 break 关键字跳出循环。因此,上述程序产生以下结果:
main() started
0 true
1 true
4 true
9 true
16 true
25 true
36 true
49 true
64 true
81 true
0 false <-- loop broke!
main() stopped
当通道关闭时,goroutine 读取的值为通道数据类型的零值。在这种情况下,由于 channel 正在传输 int 数据类型,因此可以从结果中看到它是 0。与向通道读取或写入值不同,关闭通道不会阻塞当前的 goroutine。
为了避免手动检查通道关闭条件的痛苦,Go 提供了更简单的
for range
循环,它会在通道关闭时自动关闭。
让我们修改我们之前的上述程序。
range读取channel在上面的程序中,我们使用了 for val := range c
而不是 for{}
。 range 将一次从通道读取一个值,直到它关闭。因此,上述程序产生以下结果
main() started
0
1
4
9
16
25
36
49
64
81
main() stopped
如果不关闭 for range 循环中的通道,程序将在运行时抛出死锁致命错误。所以,数据发送完成时,要记得关闭通道;还可以使用select,default来避免这个问题。
缓冲区通道和通道容量
正如我们所见,每一个通道写入操作都会阻塞当前goroutine。但是到目前位置,在创建channel时,我们都没有给make第二个参数。这第二个参数就是通道容量,或缓冲区。默认为0的通道成为无缓冲区通道。
当通道的缓冲区大于0时,缓冲区被填满之前, goroutine不会被阻塞。缓冲区满后, 只有当通道的最后数据被读取,新的值才能被添加。有一个问题是通道读取是饥渴操作,这表示读取一旦开始,只要缓冲区有数据, 读取就会一直进行。技术上来讲,缓冲区为空时,读取操作才是阻塞的。
我们使用下面的语法定义带缓冲的通道:
c := make(chan Type, n)
上面会创建一个数据类型为 Type 且缓冲区大小为 n 的通道。直到 channel 收到 n+1 个发送操作,它才会阻塞当前的 goroutine。
buffered channel在上面的程序中,通道 c 的缓冲区容量为 3。这意味着它可以容纳 3 个值,在第20行,由于缓冲区没有溢出(因为我们没有推送任何新值),main goroutine 不会阻塞,运行后退出,并不会调度到squares goroutine。
让我们在发送一个额外的值:
buffer channel2如我们前面讨论, 通道 c <- 4 发送操作超出了channel的容量,阻塞了main goroutine, squares goroutine 获得控制权,并读取通道内的所有值。
一个通道的长度和容量怎么计算呢
length and capacity of channel与切片类似,缓冲通道具有长度和容量。通道的长度是通道缓冲区中值的数量,而通道的容量是缓冲区大小,创建时n的值。计算长度,我们使用len函数,而找出容量,我们使用cap函数,就像切片一样
如果你想知道为什么上面的程序运行良好并且没有抛出死锁错误。这是因为,由于通道容量为 3 并且缓冲区中只有 2 个值可用,Go 没有尝试通过阻止主 goroutine 执行来调度另一个 goroutine。如果需要,您可以简单地在main goroutine 中读取这些值,因为即使缓冲区未满,也不会阻止您从通道读取值。
另外一个例子:
example使用多个 goroutine
下面我们创建2个goroutines, 一个计算整数的平方, 一个计算整数的立方
https://play.golang.org/p/6wdhWYpRfrX下面分析一下程序的执行过程:
- 首先创建了两个函数,
square
和cube
, 两个函数都使用c chan int
channel 作为参数, 函数从c
中读取整数, 计算完成后,写回c
中 - 在main goroutinue 中,我们创建了两个int 类型的 channel :
squareChan
和cubeChan
- 使用
go
关键字, 以goroutine的方式 square 和 cube - 此时控制权还在 main goroutine中, 我们个变量
testNum
一个值3 - 此时我们把
testNum
发送到channelsquareChan
和cubeChan
, main goroutine 将被阻塞,直到这些channel的数据被读取。一旦chanel中的数据被读取,main goroutine 将继续执行。 - 此时在main goroutine中, 尝试从
squareChan
和cubeChan
读取数据, 这依然是阻塞操作, 直到这些channel在他们各自的goroutine被写入数据, main gorounine 才能继续执行
上图中程序的执行结果如下:
[main] main() started
[main] sent testNum to squareChan
[square] reading
[main] resuming
[main] sent testNum to cubeChan
[cube] reading
[main] resuming
[main] reading from channels
[main] sum of square and cube of 3 is 36
[main] main() stopped
单向通道
至此, 我们所操作的channel都是双向的, 既能读取,也能写入。我们也可以创建单向操作的channel, 只读channel:只能读取数据;只写chanel:只能写入数据。
单向chanel创建依然使用make
函数,只是额外添加了单向箭头(<-
)语法:
roc := make(<-chan int) // read-only chan
soc := make(chan<- int) // send-only chan
在上面的程序中, roc是只读channel, make函数中的箭头方向是远离chan(<-chan
); soc是只写channel, make函数中箭头方向,指向chan(chan<-
), 他们是两个不同的类型
但是单向信道有什么用呢?使用单向通道增加了程序的类型安全性。可减少程序出错概率。
假如有如下场景: 假如你有一个goroutine, 你只需要在其中读取channel中的数据, 但是main goroutine需要在同一个channle读取和写入数据,该怎么做呢?
幸运的是go 提供了简单的语法, 把双向的channle,改为单向
https://play.golang.org/p/k3B3gCelrGv如上述示例所示, 我们只需要在 greet 函数中, 将接收参数修改为单向channel即可,现在我们在greet中,对channel的操作,只能读了, 任何写造作都会导致 fatal 错误 "invalid operation: roc <- "some text" (send to receive-only type <-chan string)"。
匿名goroutine
在 goroutines 章节中,我们学习了匿名 goroutines。我们也可以与他们一起实施渠道。让我们修改前面的简单示例,在匿名 goroutine 中实现 channel。
这是我们之前的例子
https://play.golang.org/p/c5erdHX1gwR下面是一个修改后的例子,我们将 greet goroutine 变成了一个匿名 goroutine。
https://play.golang.org/p/cM5nFgRha7cchannel 作为 channel 的数据类型
如标题所示, channel 作为 golang中类型中国的一等公民, 可以像其他值一样在任何地方使用: 作为结构的元素, 函数参数,返回值,甚至是另外一个通道的类型。下面的例子中,我们使用一个通道作为另外一个通道的数据类型。
https://play.golang.org/p/xVQvvb8O4Deselect
select 就像没有任何输入参数的 switch 一样,但它只用于通道操作。 select 语句用于仅对多个通道中的一个执行操作,由 case 块有条件地选择。
我们先看一个例子:
https://play.golang.org/p/ar5dZUQ2ArH从上面的程序中,可以看到select语句就像switch一样,但不是布尔操作,而是通道操作。 select 语句是阻塞的,除非它有default项。 一旦满足条件之一, 它将解除阻塞。 那么它什么时候满足条件呢?
如果所有 case 语句(通道操作)都被阻塞,则 select 语句将等待,直到其中一个 case 语句(其通道操作)解除阻塞,然后执行该 case。如果部分或全部通道操作是非阻塞的,则将随机选择非阻塞情况之一并立即执行。
为了解释上面的程序,我们启动了 2 个具有独立通道的 goroutine。然后启动了 2 个case的 select 语句。一种情况从 chan1 读取值,另一种情况从 chan2 读取值。由于这些通道是无缓冲的,读操作将被阻塞(写操作也是如此)。所以这两种选择的情况都是阻塞的。因此 select 将等待,直到其中一种情况变为非阻塞。
当程序运行到 select
代码段时, main goroutine 会阻塞, 然后它将调度 select 语句中存在的所有 goroutine, 每次一个,这个例子里面是service1
和 service2
对应的goroutine,service1
将会等待3s,然后, 写入一条数据到 chan1 解除阻塞, service2
等待5s,写入一条数据到chan2, 然后解除阻塞。由于 service1 比 service2 更早解除阻塞,case 1 将首先解除阻塞,因此将执行该 case,而其他 case(此处为 case 2)将被忽略。完成案例执行后,主函数的执行将继续进行。
上面的程序模拟了真实世界的 Web 服务,其中负载均衡器收到数百万个请求,并且必须从可用服务之一返回响应。使用 goroutines、channels 和 select,我们可以向多个服务请求响应,并且可以使用快速响应的服务。
为了模拟所有情况何时都阻塞并且响应几乎同时可用,我们可以简单地删除 Sleep 调用。
https://play.golang.org/p/giSkkqt8XHb上述程序产生以下结果(您可能会得到不同的结果):
main() started 0s
service2() started 481µs
Response from service 2 Hello from service 2 981.1µs
main() stopped 981.1µs
有时候也可能是:
main() started 0s
service1() started 484.8µs
Response from service 1 Hello from service 1 984µs
main() stopped 984µs
发生这种情况是因为 chan1 和 chan2 操作几乎同时发生,但执行和调度仍然存在一些时间差。
default case
和 switch 语句一样,select 语句也有 default 项。 default 是非阻塞的:default case 使得 select 语句总是非阻塞的。这意味着,任何通道(缓冲或非缓冲)上的发送和接收操作始终是非阻塞的。
如果某个值在任何通道上可用,则 select 将执行该情况。如果没有,它将立即执行默认情况。
https://play.golang.org/p/rFMpc80EuT3在上面的程序中,由于通道是无缓冲的,并且两个通道操作的值都不是立即可用的,因此将执行默认情况。如果上面的 select 语句没有 default case,select 就会阻塞并且响应会有所不同。
由于有default的情况下select 是非阻塞的,main goroutine 不会阻塞,调度程序也不会调用其他 goroutine, service1 和 service2 并不会执行。但是我们可以手动调用 time.Sleep 来阻塞 main goroutine。这样,所有其他的 goroutine 都会执行并死亡,将控制权返回给 main goroutine,它会在一段时间后唤醒。当main gorountine唤醒时,通道将立即有可用的值。
上述程序的执行结果:
main() started 0s
service1() started 0s
service2() started 0s
Response from service 1 Hello from service 1 3.0001805s
main() stopped 3.0001805s
也可能是:
main() started 0s
service1() started 0s
service2() started 0s
Response from service 2 Hello from service 2 3.0000957s
main() stopped 3.0000957s
死锁
当没有通道可用于发送或接收数据时,default case
很有用。为了避免死锁,我们可以使用 default case。这是可能的,因为默认情况下的所有通道操作都是非阻塞的,如果数据不是立即可用,Go 不会安排任何其他 goroutine 将数据发送到通道。
与接收类似,在发送操作中,如果其他 goroutine 处于休眠状态(未准备好接收值),则执行 default case。
nil channel
众所周知,通道的默认值为 nil。因此我们不能在 nil 通道上执行发送或接收操作。一旦在 select 语句中使用 nil 通道时,它会抛出以下错误之一或两个错误。
https://play.golang.org/p/uhraFubcF4S从上面的结果我们可以看出,select(no cases)意味着select语句实际上是空的,因为忽略了带有nil channel的cases。但是由于空的 select{} 语句阻塞了主 goroutine 并且 service goroutine 被安排在它的位置,nil 通道上的通道操作会抛出 chan send (nil chan) 错误。为了避免这种情况,我们default情况。
https://play.golang.org/p/upLsz52_CrE上面的程序不仅忽略了 case 块,而且立即执行了 default 语句。因此调度程序没有时间来调度 service goroutine。但这真是糟糕的设计。应该始终检查通道的 nil 值。
添加超时
上面的程序不是很有用,因为只执行default case。但有时,我们想要的是任何可用的服务都应该在理想的时间内做出响应,如果没有,则应该执行 default case。这可以通过使用在定义的时间后解除阻塞的通道操作的情况来完成。此通道操作由时间包的 After 函数提供。让我们看一个例子。
https://play.golang.org/p/mda2t2IQK__X上面的程序,在 2 秒后产生以下结果。
main() started 0s
No response received 2.0010958s
main() stopped 2.0010958s
在上面的程序中, <-time.After(2 * time.Second)
2s 后解除阻塞,并返回时间,但在这里,我们对其返回值不感兴趣。由于它也像一个 goroutine,我们有 3 个 goroutine, time.After 是第一个解除阻塞的channel。因此,对应于该 goroutine 操作的 case 被执行。
这很有用,因为您不想等待来自可用服务的响应太长时间,因为用户必须等待很长时间才能从服务中获取任何信息。如果我们在上面的例子中添加 10 * time.Second,来自 service1 的响应将被打印出来,我想现在很明显了。
empty select
与 for{} 空循环一样,空的 select{} 语法也是有效的,但有一个问题。{}正如我们所知,select 语句会被阻塞,直到其中一个 case 解除阻塞,并且由于没有可用的 case 语句来解除阻塞,主 goroutine 将永远阻塞,从而导致死锁。
https://play.golang.org/p/-pBd-BLMFOu在上面的程序中,我们知道 select 会阻塞 main goroutine,调度器会调度另一个可用的 goroutine,即 service。但在那之后,它会死掉,调度必须调度另一个可用的 goroutine,但由于主例程被阻塞,没有其他 goroutine 可用,导致死锁。
main() started
Hello from service!
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
main.main()
program.Go:16 +0xba
exit status 2
waitgroup
让我们想象一种情况,您需要知道所有 goroutine 是否都完成了它们的工作。这与 select 你只需要一个条件为真的情况有点相反,但在这里你需要所有条件都为真才能解除main goroutine 的阻塞。这里的条件是通道操作成功。
WaitGroup 是一个带有计数器值的结构体,它跟踪产生了多少 goroutine 以及有多少已经完成了它们的工作。当这个计数器达到零时,意味着所有的 goroutine 都完成了他们的工作。
让我们深入研究一个示例:
https://play.golang.org/p/8qrAD9ceOfJ在上面的程序中,创建了一个类型为 sync.WaitGroup
的空结构(带有零值字段)wg。 WaitGroup 结构体有禁止导出的字段,例如 noCopy、state1 和 sema,我们不需要知道它们的内部实现。这个结构有三个方法,即 Add
, Wait
和 Done
。
Add
方法需要一个 int 参数,它是 WaitGroup 计数器的增加量。计数器只不过是一个默认值为 0 的整数。它保存了正在运行的 goroutine 的数量。当 WaitGroup 创建时,它的计数器值为 0,我们可以通过使用 Add 方法传递 delta 作为参数来增加它。请记住,当 goroutine 启动时,计数器不会递增,因此我们需要手动递增它。
Wait
方法用于阻塞当前的gorountine。一旦计数器达到 0,该 goroutine 将解除阻塞。因此,我们需要一些东西来减少计数器。
Done
方法就是用来递减计数器的。它不接受任何参数,因此它只将计数器减 1。
在上面的程序中,创建 wg 后,我们运行了 3 次 for 循环。在每一轮中,我们启动了 1 个 goroutine 并将计数器加 1。这意味着,现在我们有 3 个 goroutine 等待执行,WaitGroup 计数器为 3。请注意,我们在 goroutine 中传递了一个指向 wg 的指针。这是因为在 goroutine 中,一旦我们完成了 goroutine 应该做的任何事情,我们需要调用 Done 方法来递减计数器。如果 wg 作为值传递,则 main 中的 wg 不会递减。这是很明显的。
在 for 循环执行完毕后,我们仍然没有将控制权交给其他 goroutine。这是通过调用wg 的 Wait方法来完成的。这将阻塞 main goroutine,直到计数器达到 0。一旦计数器达到 0,因为从 3 个协程开始,我们在 wg 上调用了 Done 方法 3 次,主协程将解除阻塞并开始执行进一步的代码。
上面的程序将产生如下结果:
main() started
Service called on instance 2
Service called on instance 3
Service called on instance 1
main() stopped
以上结果对你们来说可能不同,因为 goroutine 的执行顺序可能会有所不同。
添加方法接受 int 类型,这意味着 delta 也可以是负数。要了解更多信息,请查看官方文档[https://golang.org/pkg/sync/#WaitGroup.Add]。
worker pool
顾名思义,worker pool
:即工作池,是并发模式协同完成同样工作一些列 goroutine 的集合。在前面 WaitGroup 中,我们看到一组 goroutines 并发工作,但它们没有特定的工作内容。一旦将channel加入其中,让它们有相同的工作去完成,这些goroutines就会成为一个工作池。
因此,worker pool
背后的概念是维护一个 worker goroutines 池,它接收一些任务并返回结果。一旦他们都完成了他们的工作,我们就会收集结果。所有这些 goroutine 都出于各自的目的使用相同的通道。
让我们看一个带有两个通道的简单示例,即 tasks 和 results
https://play.golang.org/p/IYiMV1I4lCj这个程序发生了什么呢?
-
sqrWorker
是一个工作函数, 它接收三个参数tasks
channel,results
channel 和id
, 这个gorountine的任务是接收tasks
中的数据,计算平方,并把结果发送到results
中。
- 在 main 函数中, 创建了缓冲容量为10的
tasks
和results
channel,因此,在tasks
缓冲满之前, 发送操作都是非阻塞的。因此,设置大的缓冲值是一个是一个好主意。
- 然后我们生成多个
sqrWorker
goroutines实例,把前面建立的两个channel 和 id 作为参数,其中id是作为标记,标识是哪个gorountine正在执行任务。
- 然后我们将 5 个job传递给非阻塞的
tasks
channel。
- 完成了发送job到
tasks
后,我们关闭了它。这并不是必需的,但是如果出现一些错误,它将在将来节省大量时间。
- 然后使用 for, 循环 5 次,我们从
results
channel中提取数据。由于对空缓冲区的读取操作是阻塞的,因此将从工作池中调度一个 goroutine。在 goroutine 返回一些结果之前,main goroutine 将被阻塞。
- 因为在 worker goroutine 中模拟阻塞操作,调度程序将调用另一个可用的 goroutine,直到work gorountine变得可用时,它会将计算结果写入
results
channel。由于在缓冲区满之前,写入通道是非阻塞的,因此在此处写入results
通道是非阻塞的。此外,虽然当前的 worker goroutine 不可用,但执行了多个其他 worker goroutines,消耗了tasks
缓冲区中的值。在所有worker goroutine 消耗完tasks
后,tasks
通道缓冲区为空,for range 循环结束。当tasks
通道关闭时,它不会抛出死锁错误。
- 有时,所有工作协程都可能处于休眠状态,因此主协程将唤醒并工作,直到结果通道缓冲区再次为空。
- 在所有工作 goroutine 死后,main goroutine 将重新获得控制权并从结果通道打印剩余的结果并继续执行。
上面的例子解释了多个 goroutines 如何可以在同一个通道上提供数据并优雅地完成工作。当worker被阻塞时,goroutines 很灵活的解决了这个问题。如果删除 time.Sleep() 的调用,则只有一个 goroutine 独自完成该作业,因为在 for range 循环完成且 goroutine 终止之前不会调度其他 goroutine。
运行系统速度不同, 您可能得到与上述例子不同的结果,因为如果所有的gorountine即使是被阻塞很短的时间, main gorountine也会被唤醒执行程序。
现在,让我们使用sync.WaitGroup 实现相同的效果,但更优雅。
https://play.golang.org/p/0rRfchn7sL1上面的结果看起来很整洁,因为main goroutine 中results
channel 上的读取操作是非阻塞的,而results
channel 开始读取前已经完成结果填充,而main goroutine 被 wg.Wait() 调用阻塞。使用 waitGroup,我们可以防止大量(不必要的)上下文切换(调度),这里是 7,而前面的例子是 9。但是有一个牺牲,因为你必须等到所有的工作都完成。
metux
Mutex【锁】 是 Go 中最简单的概念之一。但在解释之前,让我们先了解什么是竞态条件。 goroutines 有它们独立的堆栈,因此它们之间不共享任何数据。但是可能存在堆中的某些数据在多个 goroutine 之间共享的情况。在这种情况下,多个 goroutine 试图在同一内存位置操作数据,从而导致意外结果。下面展示一个简单的例子:
https://play.golang.org/p/MQNepChxiEa在上面的程序中,我们生成了 1000 个 goroutines,它们增加了初始为 0 的全局变量 i 的值。由于我们正在实现 WaitGroup,我们希望所有 1000 个 goroutines 一一增加 i 的值,从而得到 i 的最终值为 1000。当主 goroutine 在 wg.Wait() 调用后再次开始执行时,我们正在打印 i。让我们看看最终的结果。
value of i after 1000 operations is 937
什么?为什么我们不到1000?看起来有些 goroutines 不起作用。但实际上,我们的程序存在竞争条件。让我们看看可能发生了什么。
i = i + 1 calculation has 3 steps
-(1) 获取 i 当前值
-(2) 将i的值增加 1
-(3) 使用新值替换 i
让我们想象一个场景,在这些步骤之间安排了不同的 goroutine。例如,让我们考虑 1000 个 goroutine 池中的 2 个 goroutine,即:G1 和 G2。
当 i 为 0 时 G1 首先启动,运行前 2 个步骤,现在 i 现在为 1。但在 G1 更新步骤 3 中 i 的值之前,新的 goroutine G2 被调度并运行所有步骤。但是在 G2 的情况下,i 的值仍然是 0,因此在它执行第 3 步之后,i 将是 1。现在 G1 再次被安排完成第 3 步并从第 2 步更新 i 的值为 1。在 goroutines 的完美世界中在完成所有 3 个步骤后安排,2 个 goroutines 的成功操作会产生 i 的值是 2 但这里不是这种情况。因此,我们几乎可以推测为什么我们的程序没有将 i 的值变为 1000。
到目前为止,我们了解到 goroutine 是协作调度的。除非一个 goroutine 在并发课程中提到的条件之一阻塞,否则另一个 goroutine 不会取代它。既然 i = i + 1 没有阻塞,为什么 Go 调度器会调度另一个 goroutine?
你绝对应该在stackoverflow上查看这个答案。在任何情况下,您都不应该依赖 Go 的调度算法并实现自己的逻辑来同步不同的 goroutine。
确保一次只有一个 goroutine 完成上述所有 3 个步骤的一种方法是实现互斥锁。 Mutex(互斥)是编程中的一个概念,其中一次只有一个例程(线程)可以执行多个操作。这是通过一个例程获取对值的锁定,对它必须执行的值进行任何操作,然后释放锁定来完成的。当值被锁定时,没有其他例程可以读取或写入它。
在 Go 中,互斥锁数据结构(本质上是个map)由sync包提供的。在 Go 中,在对可能导致竞争条件的值执行任何操作之前,我们使用 mutex.Lock() 方法获取锁,然后是操作代码。一旦我们完成了操作,在上面的程序 i = i + 1 中,我们使用 mutext.Unlock() 方法解锁它。当任何其他 goroutine 在锁存在时尝试读取或写入 i 的值时,该 goroutine 将阻塞,直到操作从第一个 goroutine 解锁。因此只有 1 个 goroutine 可以读取或写入 i 的值,避免竞争条件。请记住,在整个操作被解锁之前,锁定和解锁之间的操作中存在的任何变量将不可用于其他 goroutine。
让我们用互斥锁修改前面的例子。
https://play.golang.org/p/xVFAX_0Uig8在上面的程序中,我们创建了一个互斥锁 m 并将指向它的指针传递给所有生成的 goroutine。在开始对 i 进行操作之前,我们使用 m.Lock() 语法获取了互斥锁 m 上的锁,并且在操作之后,我们使用 m.Unlock() 语法将其解锁。以上程序产生以下结果
value of i after 1000 operations is 1000
从上面的结果可以看出,互斥锁帮助我们解决了竞态条件。但是第一条规则是避免 goroutine 之间共享资源。
您可以在运行 Go run -race program.Go 之类的程序时使用种族标志测试 Go 中的竞争条件。在此处阅读有关比赛检测器的更多信息。
Concurrency Patterns 【并发模式】
通俗来讲,就是日常使用的常用范式。
以下是一些可以使程序更快更可靠的概念和方法。
- Generator 【生成器模式】
使用通道,可以实现更好实现生成器。
比如斐波那契数列,计算上开销很大, 我们可以提前计算好结果,并放入channel中, 等待程序执行到此,直接取结果即可, 而不必等待。
此图中,调用fib 函数,返回了一个channel,通过循环channel,可以接收到的计算好的数据。在 fib 函数内部,必须返回一个只接收通道,我们创建一个有buffer的通道,并在函数最后返回它。fib的返回值会将这个双向通道转换为单向只接收通道。在匿名 goroutine 中,使用 for 循环将斐波那契数推送到此通道,完成后,关闭此通道。在主协程中,使用 fib 函数调用的范围,我们可以直接访问这个通道。
- fan-in & fan-out 【多路复用】
fan-in 是一种多路复用策略,将过个输入通道组合,以产生输出通道。fan-out 是将单个通道拆分为多个通道的解复用策略。
package main
import (
"fmt"
"sync"
)
// return channel for input numbers
func getInputChan() <-chan int {
// make return channel
input := make(chan int, 100)
// sample numbers
numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
// run goroutine
go func() {
for num := range numbers {
input <- num
}
// close channel once all numbers are sent to channel
close(input)
}()
return input
}
// returns a channel which returns square of numbers
func getSquareChan(input <-chan int) <-chan int {
// make return channel
output := make(chan int, 100)
// run goroutine
go func() {
// push squares until input channel closes
for num := range input {
output <- num * num
}
// close output channel once for loop finishesh
close(output)
}()
return output
}
// returns a merged channel of `outputsChan` channels
// this produce fan-in channel
// this is veriadic function
func merge(outputsChan ...<-chan int) <-chan int {
// create a WaitGroup
var wg sync.WaitGroup
// make return channel
merged := make(chan int, 100)
// increase counter to number of channels `len(outputsChan)`
// as we will spawn number of goroutines equal to number of channels received to merge
wg.Add(len(outputsChan))
// function that accept a channel (which sends square numbers)
// to push numbers to merged channel
output := func(sc <-chan int) {
// run until channel (square numbers sender) closes
for sqr := range sc {
merged <- sqr
}
// once channel (square numbers sender) closes,
// call `Done` on `WaitGroup` to decrement counter
wg.Done()
}
// run above `output` function as groutines, `n` number of times
// where n is equal to number of channels received as argument the function
// here we are using `for range` loop on `outputsChan` hence no need to manually tell `n`
for _, optChan := range outputsChan {
go output(optChan)
}
// run goroutine to close merged channel once done
go func() {
// wait until WaitGroup finishesh
wg.Wait()
close(merged)
}()
return merged
}
func main() {
// step 1: get input numbers channel
// by calling `getInputChan` function, it runs a goroutine which sends number to returned channel
chanInputNums := getInputChan()
// step 2: `fan-out` square operations to multiple goroutines
// this can be done by calling `getSquareChan` function multiple times where individual function call returns a channel which sends square of numbers provided by `chanInputNums` channel
// `getSquareChan` function runs goroutines internally where squaring operation is ran concurrently
chanOptSqr1 := getSquareChan(chanInputNums)
chanOptSqr2 := getSquareChan(chanInputNums)
// step 3: fan-in (combine) `chanOptSqr1` and `chanOptSqr2` output to merged channel
// this is achieved by calling `merge` function which takes multiple channels as arguments
// and using `WaitGroup` and multiple goroutines to receive square number, we can send square numbers
// to `merged` channel and close it
chanMergedSqr := merge(chanOptSqr1, chanOptSqr2)
// step 4: let's sum all the squares from 0 to 9 which should be about `285`
// this is done by using `for range` loop on `chanMergedSqr`
sqrSum := 0
// run until `chanMergedSqr` or merged channel closes
// that happens in `merge` function when all goroutines pushing to merged channel finishes
// check line no. 86 and 87
for num := range chanMergedSqr {
sqrSum += num
}
// step 5: print sum when above `for loop` is done executing which is after `chanMergedSqr` channel closes
fmt.Println("Sum of squares between 0-9 is", sqrSum)
}
完
网友评论