参考
《快学 Go 语言》第 11 课 —— 千军万马跑协程
《快学 Go 语言》第 12 课 —— 通道
let's GoLang(三): Goroutine&Channel
知识回顾
并发编程基础知识一 并发和并行
并发编程基础知识二 线程和进程
并发编程基础知识三 异步,非阻塞和 IO 复用
协程和通道是 Go 语言作为并发编程语言最为重要的特色之一,初学者可以完全将协程理解为线程,但是用起来比线程更加简单,占用的资源也更少。通常在一个进程里启动上万个线程就已经不堪重负,但是 Go 语言允许你启动百万协程也可以轻松应付。如果把协程比喻成小岛,那通道就是岛屿之间的交流桥梁,数据搭乘通道从一个协程流转到另一个协程。通道是并发安全的数据结构,它类似于内存消息队列,允许很多的协程并发对通道进行读写。
image.png
Go 语言里面的协程称之为 goroutine,通道称之为 channel。
一、协程
1.协程的启动
Go 语言里创建一个协程非常简单,使用 go 关键词加上一个函数调用就可以了。Go 语言会启动一个新的协程,函数调用将成为这个协程的入口。
package main
import "fmt"
import "time"
func main() {
fmt.Println("run in main goroutine")
go func() {
fmt.Println("run in child goroutine")
go func() {
fmt.Println("run in grand child goroutine")
go func() {
fmt.Println("run in grand grand child goroutine")
}()
}()
}()
time.Sleep(time.Second)
fmt.Println("main goroutine will quit")
}
-------
run in main goroutine
run in child goroutine
run in grand child goroutine
run in grand grand child goroutine
main goroutine will quit
main 函数运行在主协程(main goroutine)里面,上面的例子中我们在主协程里面启动了一个子协程,子协程又启动了一个孙子协程,孙子协程又启动了一个曾孙子协程。这些协程之间似乎形成了父子、子孙、关系,但是实际上协程之间并不存在这么多的层级关系,在 Go 语言里只有一个主协程,其它都是它的子协程,子协程之间是平行关系。
值得注意的是这里的 go 关键字语法和前面的 defer 关键字语法是一样的,它后面跟了一个匿名函数,然后还要带上一对(),表示对匿名函数的调用。
上面的代码中主协程睡眠了 1s,等待子协程们执行完毕。如果将睡眠的这行代码去掉,将会看不到子协程运行的痕迹
-------------
run in main goroutine
main goroutine will quit
这是因为主协程运行结束,其它协程就会立即消亡,不管它们是否已经开始运行。
2.协程的本质
一个进程内部可以运行多个线程,而每个线程又可以运行很多协程。线程要负责对协程进行调度,保证每个协程都有机会得到执行。当一个协程睡眠时,它要将线程的运行权让给其它的协程来运行,而不能持续霸占这个线程。同一个线程内部最多只会有一个协程正在运行。
线程的调度是由操作系统负责的,调度算法运行在内核态,而协程的调用是由 Go 语言的运行时负责的,调度算法运行在用户态。
协程可以简化为三个状态,运行态、就绪态和休眠态。同一个线程中最多只会存在一个处于运行态的协程,就绪态的协程是指那些具备了运行能力但是还没有得到运行机会的协程,它们随时会被调度到运行态,休眠态的协程还不具备运行能力,它们是在等待某些条件的发生,比如 IO 操作的完成、睡眠时间的结束等。
image.png
操作系统对线程的调度是抢占式的,也就是说单个线程的死循环不会影响其它线程的执行,每个线程的连续运行受到时间片的限制。
Go 语言运行时对协程的调度并不是抢占式的。如果单个协程通过死循环霸占了线程的执行权,那这个线程就没有机会去运行其它协程了,你可以说这个线程假死了。不过一个进程内部往往有多个线程,假死了一个线程没事,全部假死了才会导致整个进程卡死。
每个线程都会包含多个就绪态的协程形成了一个就绪队列,如果这个线程因为某个别协程死循环导致假死,那这个队列上所有的就绪态协程是不是就没有机会得到运行了呢?Go 语言运行时调度器采用了 work-stealing 算法,当某个线程空闲时,也就是该线程上所有的协程都在休眠(或者一个协程都没有),它就会去其它线程的就绪队列上去偷一些协程来运行。也就是说这些线程会主动找活干,在正常情况下,运行时会尽量平均分配工作任务。
3.设置线程数
默认情况下,Go 运行时会将线程数会被设置为机器 CPU 逻辑核心数。同时它内置的 runtime 包提供了 GOMAXPROCS(n int) 函数允许我们动态调整线程数,注意这个函数名字是全大写,Go 语言的设计者就是这么任性,该函数会返回修改前的线程数,如果参数 n <=0 ,就不会产生修改效果,等价于读操作。
package main
import "fmt"
import "runtime"
func main() {
// 读取默认的线程数
fmt.Println(runtime.GOMAXPROCS(0))
// 设置线程数为 10
runtime.GOMAXPROCS(10)
// 读取当前的线程数
fmt.Println(runtime.GOMAXPROCS(0))
}
--------
4
10
获取当前的协程数量可以使用 runtime 包提供的 NumGoroutine() 方法
package main
import "fmt"
import "time"
import "runtime"
func main() {
fmt.Println(runtime.NumGoroutine())
for i:=0;i<10;i++ {
go func(){
for {
time.Sleep(time.Second)
}
}()
}
fmt.Println(runtime.NumGoroutine())
}
------
1
11
二、使用atmoic包和互斥锁mutex解决竞争状态
摘自《go语言实战》P132
1.atmoic包
如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)。竞争状态的存在是让并发程序变得复杂的地方,十分容易引起潜在问题。对一个共享资源的读和写操作必须是原子化的,换句话说,同一时刻只能有一个 goroutine 对共享资源进行读和写操作。
01 // 这个示例程序展示如何在程序里造成竞争状态
02 // 实际上不希望出现这种情况
03 package main
04
05 import (
06 "fmt"
07 "runtime"
08 "sync"
09 )
10
11 var (
12 // counter 是所有 goroutine 都要增加其值的变量
13 counter int
14
15 // wg 用来等待程序结束
16 wg sync.WaitGroup
17 )
18
19 // main 是所有 Go 程序的入口
20 func main() {
21 // 计数加 2,表示要等待两个 goroutine
22 wg.Add(2)
23
24 // 创建两个 goroutine
25 go incCounter(1)
26 go incCounter(2)
27
28 // 等待 goroutine 结束
29 wg.Wait()
30 fmt.Println("Final Counter:", counter)
31 }
32
33 // incCounter 增加包里 counter 变量的值
34 func incCounter(id int) {
35 // 在函数退出时调用 Done 来通知 main 函数工作已经完成
36 defer wg.Done()
37
38 for count := 0; count < 2; count++ {
39 // 捕获 counter 的值
40 value := counter
41
42 // 当前 goroutine 从线程退出,并放回到队列
43 runtime.Gosched()
44
45 // 增加本地 value 变量的值
46 value++
47
48 // 将该值保存回 counter
49 counter = value
50 }
51 }
变量 counter会进行 4次读和写操作,每个 goroutine 执行两次。但是,程序终止时,counter变量的值为 2。每个 goroutine 都会覆盖另一个 goroutine 的工作。这种覆盖发生在 goroutine 切换的时候。
每个 goroutine 创造了一个 counter 变量的副本,之后就切换到另一个goroutine。当这个 goroutine再次运行的时候, counter 变量的值已经改变了,但是 goroutine 并没有更新自己的那个副本的值,而是继续使用这个副本的值,用这个值递增,并存回 counter 变量,结果覆盖了另一个goroutine 完成的工作。
原子函数能够以很底层的加锁机制来同步访问整型变量和指针。我们可以用原子函数来修正代码清单 6-9 中创建的竞争状态
41 for count := 0; count < 2; count++ {
42 // 安全地对 counter 加 1
43 atomic.AddInt64(&counter, 1)
44
45 // 当前 goroutine 从线程退出,并放回到队列
46 runtime.Gosched()
47 }
现在,程序的第 43 行使用了 atmoic 包的 AddInt64 函数。这个函数会同步整型值的加法,方法是强制同一时刻只能有一个 goroutine 运行并完成这个加法操作。当 goroutine 试图去调用任何原子函数时,这些 goroutine都会自动根据所引用的变量做同步处理。现在我们得到了正确的值 4。
另外两个有用的原子函数是 LoadInt64 和 StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式。代码清单 6-15 中的示例程序使用 LoadInt64 和 StoreInt64 来创建一个同步标志,这个标志可以向程序里多个 goroutine 通知某个特殊状态。后续略……
2.互斥锁
// This sample program demonstrates how to use a mutex
// to define critical sections of code that need synchronous
// access.
package main
import (
"fmt"
"runtime"
"sync"
)
var (
// counter is a variable incremented by all goroutines.
counter int
// wg is used to wait for the program to finish.
wg sync.WaitGroup
// mutex is used to define a critical section of code.
mutex sync.Mutex
)
// main is the entry point for all Go programs.
func main() {
// Add a count of two, one for each goroutine.
wg.Add(2)
// Create two goroutines.
go incCounter(1)
go incCounter(2)
// Wait for the goroutines to finish.
wg.Wait()
fmt.Printf("Final Counter: %d\n", counter)
}
// incCounter increments the package level Counter variable
// using the Mutex to synchronize and provide safe access.
func incCounter(id int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for count := 0; count < 2; count++ {
// Only allow one goroutine through this
// critical section at a time.
mutex.Lock()
{
// Capture the value of counter.
value := counter
// Yield the thread and be placed back in queue.
runtime.Gosched()
// Increment our local value of counter.
value++
// Store the value back into counter.
counter = value
}
mutex.Unlock()
// Release the lock and allow any
// waiting goroutine through.
}
}
对 counter 变量的操作在第 46 行和第 60 行的 Lock()和 Unlock()函数调用定义的临界区里被保护起来。使用大括号只是为了让临界区看起来更清晰,并不是必需的。同一时刻只有一个 goroutine 可以进入临界区。之后,直到调用 Unlock()函数之后,其他 goroutine 才能进入临界区。当第 52 行强制将当前 goroutine 退出当前线程后,调度器会再次分配这个 goroutine 继续运行。当程序结束时,我们得到正确的值 4,竞争状态不再存在。
三、通道
原子函数和互斥锁都能工作,但是依靠它们都不会让编写并发程序变得更简单,更不容易出错,或者更有趣。在 Go 语言里,你不仅可以使用原子函数和互斥锁来保证对共享资源的安全访问以及消除竞争状态,还可以使用通道,通过发送和接收需要共享的资源,在 goroutine 之间做同步。
不同的并行协程之间交流的方式有两种,一种是通过共享变量,另一种是通过队列。Go 语言鼓励使用队列的形式来交流,它单独为协程之间的队列数据交流定制了特殊的语法 —— 通道。
通道是协程的输入和输出。作为协程的输出,通道是一个容器,它可以容纳数据。作为协程的输入,通道是一个生产者,它可以向协程提供数据。通道作为容器是有限定大小的,满了就写不进去,空了就读不出来。通道还有它自己的类型,它可以限定进入通道的数据的类型。
image.png
1.创建通道
创建通道只有一种语法,那就是 make 全局函数,提供第一个类型参数限定通道可以容纳的数据类型,再提供第二个整数参数作为通道的容器大小。大小参数是可选的,如果不填,那这个通道的容量为零,叫着「非缓冲型通道」,非缓冲型通道必须确保有协程正在尝试读取当前通道,否则写操作就会阻塞直到有其它协程来从通道中读东西。
如果两个 goroutine没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。非缓冲型通道总是处于既满又空的状态。
与之对应的有限定大小的通道就是缓冲型通道。在 Go 语言里不存在无界通道,每个通道都是有限定最大容量的。
// 缓冲型通道,里面只能放整数
var bufferedChannel = make(chan int, 1024)
// 非缓冲型通道
var unbufferedChannel = make(chan int)
2.读写通道
Go 语言为通道的读写设计了特殊的箭头语法糖 <-,让我们使用通道时非常方便。可以把箭头的方向理解为数据的流向。把箭头写在通道变量的右边就是写通道,把箭头写在通道的左边就是读通道。一次只能读写一个元素。
package main
import "fmt"
func main() {
var ch chan int = make(chan int, 4)
for i:=0; i<cap(ch); i++ {
ch <- i // 写通道
}
for len(ch) > 0 {
var value int = <- ch // 读通道
fmt.Println(value)
}
}
通道作为容器,它可以像切片一样,使用 cap() 和 len() 全局函数获得通道的容量和当前内部的元素个数。通道一般作为不同的协程交流的媒介,在同一个协程里它也是可以使用的。
3.读写阻塞
通道满了,写操作就会阻塞,协程就会进入休眠,直到有其它协程读通道挪出了空间,协程才会被唤醒。如果有多个协程的写操作都阻塞了,一个读操作只会唤醒一个协程。
通道空了,读操作就会阻塞,协程也会进入睡眠,直到有其它协程写通道装进了数据才会被唤醒。如果有多个协程的读操作阻塞了,一个写操作也只会唤醒一个协程。
package main
import "fmt"
import "time"
import "math/rand"
func send(ch chan int) {
for {
var value = rand.Intn(100)
ch <- value
fmt.Printf("send %d\n", value)
}
}
func recv(ch chan int) {
for {
value := <- ch
fmt.Printf("recv %d\n", value)
time.Sleep(time.Second)
}
}
func main() {
var ch = make(chan int, 1)
// 子协程循环读
go recv(ch)
// 主协程循环写
send(ch)
}
--------
send 81
send 87
recv 81
recv 87
send 47
recv 47
send 59
4.关闭通道
Go 语言的通道有点像文件,不但支持读写操作, 还支持关闭。读取一个已经关闭的通道会立即返回通道类型的「零值」,而写一个已经关闭的通道会抛异常。如果通道里的元素是整型的,读操作是不能通过返回值来确定通道是否关闭的。
package main
import "fmt"
func main() {
var ch = make(chan int, 4)
ch <- 1
ch <- 2
close(ch)
value := <- ch
fmt.Println(value)
value = <- ch
fmt.Println(value)
value = <- ch
fmt.Println(value)
}
-------
1
2
0
这时候就需要引入一个新的知识点 —— 使用 for range 语法糖来遍历通道
for range 语法我们已经见了很多次了,它是多功能的,除了可以遍历数组、切片、字典,还可以遍历通道,取代箭头操作符。当通道空了,循环会暂停阻塞,当通道关闭时,阻塞停止,循环也跟着结束了。当循环结束时,我们就知道通道已经关闭了。
package main
import "fmt"
func main() {
var ch = make(chan int, 4)
ch <- 1
ch <- 2
close(ch)
// for range 遍历通道
for value := range ch {
fmt.Println(value)
}
}
------
1
2
通道如果没有显式关闭,当它不再被程序使用的时候,会自动关闭被垃圾回收掉。不过优雅的程序应该将通道看成资源,显式关闭每个不再使用的资源是一种良好的习惯。
5.通道写安全
上面提到向一个已经关闭的通道执行写操作会抛出异常,这意味着我们在写通道时一定要确保通道没有被关闭。
package main
import "fmt"
func send(ch chan int) {
i := 0
for {
i++
ch <- i
}
}
func recv(ch chan int) {
value := <- ch
fmt.Println(value)
value = <- ch
fmt.Println(value)
close(ch)
}
func main() {
var ch = make(chan int, 4)
go recv(ch)
send(ch)
}
---------
1
2
panic: send on closed channel
goroutine 1 [running]:
main.send(0xc42008a000)
/Users/qianwp/go/src/github.com/pyloque/practice/main.go:9 +0x44
main.main()
/Users/qianwp/go/src/github.com/pyloque/practice/main.go:24 +0x66
exit status 2
那如何确保呢?Go 语言并不存在一个内置函数可以判断出通道是否已经被关闭。即使存在这样一个函数,当你判断时通道没有关闭,并不意味着当你往通道里写数据时它就一定没有被关闭,并发环境下,它是可能被其它协程随时关闭的。
确保通道写安全的最好方式是由负责写通道的协程自己来关闭通道,读通道的协程不要去关闭通道。
package main
import "fmt"
func send(ch chan int) {
ch <- 1
ch <- 2
ch <- 3
ch <- 4
close(ch)
}
func recv(ch chan int) {
for v := range ch {
fmt.Println(v)
}
}
func main() {
var ch = make(chan int, 1)
go send(ch)
recv(ch)
}
-----------
1
2
3
4
这个方法确实可以解决单写多读的场景,可要是遇上了多写单读的场合该怎么办呢?任意一个读写通道的协程都不可以随意关闭通道,否则会导致其它写通道协程抛出异常。这时候就必须让其它不相干的协程来干这件事,这个协程需要等待所有的写通道协程都结束运行后才能关闭通道。那其它协程要如何才能知道所有的写通道已经结束运行了呢?这个就需要使用到内置 sync 包提供的 WaitGroup 对象,它使用计数来等待指定事件完成。
package main
import "fmt"
import "time"
import "sync"
func send(ch chan int, wg *sync.WaitGroup) {
defer wg.Done() // 计数值减一
i := 0
for i < 4 {
i++
ch <- i
}
}
func recv(ch chan int) {
for v := range ch {
fmt.Println(v)
}
}
func main() {
var ch = make(chan int, 4)
var wg = new(sync.WaitGroup)
wg.Add(2) // 增加计数值
go send(ch, wg) // 写
go send(ch, wg) // 写
go recv(ch)
// Wait() 阻塞等待所有的写通道协程结束
// 待计数值变成零,Wait() 才会返回
wg.Wait()
// 关闭通道
close(ch)
time.Sleep(time.Second)
}
---------
1
2
3
4
1
2
3
4
6.多路通道
这里可以先参考并发编程基础知识三 异步,非阻塞和 IO 复用
在真实的世界中,还有一种消息传递场景,那就是消费者有多个消费来源,只要有一个来源生产了数据,消费者就可以读这个数据进行消费。这时候可以将多个来源通道的数据汇聚到目标通道,然后统一在目标通道进行消费。
package main
import "fmt"
import "time"
// 每隔一会生产一个数
func send(ch chan int, gap time.Duration) {
i := 0
for {
i++
ch <- i
time.Sleep(gap)
}
}
// 将多个原通道内容拷贝到单一的目标通道
func collect(source chan int, target chan int) {
for v := range source {
target <- v
}
}
// 从目标通道消费数据
func recv(ch chan int) {
for v := range ch {
fmt.Printf("receive %d\n", v)
}
}
func main() {
var ch1 = make(chan int)
var ch2 = make(chan int)
var ch3 = make(chan int)
go send(ch1, time.Second)
go send(ch2, 2 * time.Second)
go collect(ch1, ch3)
go collect(ch2, ch3)
recv(ch3)
}
---------
receive 1
receive 1
receive 2
receive 2
receive 3
receive 4
receive 3
receive 5
receive 6
receive 4
receive 7
receive 8
receive 5
receive 9
....
但是上面这种形式比较繁琐,需要为每一种消费来源都单独启动一个汇聚协程。Go 语言为这种使用场景带来了「多路复用」语法糖,也就是下面要讲的 select 语句,它可以同时管理多个通道读写,如果所有通道都不能读写,它就整体阻塞,只要有一个通道可以读写,它就会继续。下面我们使用 select 语句来简化上面的逻辑
package main
import "fmt"
import "time"
func send(ch chan int, gap time.Duration) {
i := 0
for {
i++
ch <- i
time.Sleep(gap)
}
}
func recv(ch1 chan int, ch2 chan int) {
for {
select {
case v := <- ch1:
fmt.Printf("recv %d from ch1\n", v)
case v := <- ch2:
fmt.Printf("recv %d from ch2\n", v)
}
}
}
func main() {
var ch1 = make(chan int)
var ch2 = make(chan int)
go send(ch1, time.Second)
go send(ch2, 2 * time.Second)
recv(ch1, ch2)
}
------------
recv 1 from ch2
recv 1 from ch1
recv 2 from ch1
recv 3 from ch1
recv 2 from ch2
recv 4 from ch1
recv 3 from ch2
recv 5 from ch1
上面是多路复用 select 语句的读通道形式,下面是它的写通道形式,只要有一个通道能写进去,它就会打破阻塞。
select {
case ch1 <- v:
fmt.Println("send to ch1")
case ch2 <- v:
fmt.Println("send to ch2")
}
7.非阻塞读写
前面我们讲的读写都是阻塞读写,Go 语言还提供了通道的非阻塞读写。当通道空时,读操作不会阻塞,当通道满时,写操作也不会阻塞。非阻塞读写需要依靠 select 语句的 default 分支。当 select 语句所有通道都不可读写时,如果定义了 default 分支,那就会执行 default 分支逻辑,这样就起到了不阻塞的效果。下面我们演示一个单生产者多消费者的场景。生产者同时向两个通道写数据,写不进去就丢弃。
package main
import "fmt"
import "time"
func send(ch1 chan int, ch2 chan int) {
i := 0
for {
i++
select {
case ch1 <- i:
fmt.Printf("send ch1 %d\n", i)
case ch2 <- i:
fmt.Printf("send ch2 %d\n", i)
default:
}
}
}
func recv(ch chan int, gap time.Duration, name string) {
for v := range ch {
fmt.Printf("receive %s %d\n", name, v)
time.Sleep(gap)
}
}
func main() {
// 无缓冲通道
var ch1 = make(chan int)
var ch2 = make(chan int)
// 两个消费者的休眠时间不一样,名称不一样
go recv(ch1, time.Second, "ch1")
go recv(ch2, 2 * time.Second, "ch2")
send(ch1, ch2)
}
------------
send ch1 27
send ch2 28
receive ch1 27
receive ch2 28
send ch1 6708984
receive ch1 6708984
send ch2 13347544
send ch1 13347775
receive ch2 13347544
receive ch1 13347775
send ch1 20101642
receive ch1 20101642
send ch2 26775795
receive ch2 26775795
...
从输出中可以明显看出有很多的数据都丢弃了,消费者读到的数据是不连续的。如果将 select 语句里面的 default 分支干掉,再运行一次,结果如下
send ch2 1
send ch1 2
receive ch1 2
receive ch2 1
receive ch1 3
send ch1 3
receive ch2 4
send ch2 4
send ch1 5
receive ch1 5
receive ch1 6
send ch1 6
receive ch1 7
可以看到消费者读到的数据都连续了,但是每个数据只给了一个消费者。select 语句的 default 分支非常关键,它是决定通道读写操作阻塞与否的关键。
8.通道内部结构
Go 语言的通道内部结构是一个循环数组,通过读写偏移量来控制元素发送和接受。它为了保证线程安全,内部会有一个全局锁来控制并发。对于发送和接受操作都会有一个队列来容纳处于阻塞状态的协程。
image.png
type hchan struct {
qcount uint // 通道有效元素个数
dataqsize uint // 通道容量,循环数组总长度
buf unsafe.Pointer // 数组地址
elemsize uint16 // 内部元素的大小
closed uint32 // 是否已关闭 0或者1
elemtype *_type // 内部元素类型信息
sendx uint // 循环数组的写偏移量
recvx uint // 循环数组的读偏移量
recvq waitq // 阻塞在读操作上的协程队列
sendq waitq // 阻塞在写操作上的协程队列
lock mutex // 全局锁
}
这个循环队列和 Java 语言内置的 ArrayBlockingQueue 结构如出一辙。从这个数据结构中我们也可以得出结论:队列在本质上是使用共享变量加锁的方式来实现的,共享变量才是并行交流的本质。
class ArrayBlockingQueue extends AbstractQueue {
Object[] items;
int takeIndex;
int putIndex;
int count;
ReentrantLock lock;
...
}
所以读者请不要认为 Go 语言的通道很神奇,Go 语言只是对通道设计了一套便于使用的语法糖,让这套数据结构显的平易近人。它在内部实现上和其它语言的并发队列大同小异。
网友评论