并发和并行
Go是并发语言,而不是并行语言。(Go is a concurrent language and not a parallel one.)
并发(Concurrency)的关键在于你有处理多个任务的能力,不一定要同时
并行(Parallellism)的关键是你有同时处理多个任务的能力
最关键的点就是:是否是『同时』
下面举个例子来说明:
假设我们正在编写一个web浏览器。web浏览器有各种组件。其中两个是web页面呈现区域和下载文件从internet下载的下载器。假设我们以这样的方式构建了浏览器的代码,这样每个组件都可以独立地执行。
当这个浏览器运行在单个核处理器中时,处理器将在浏览器的两个组件之间进行上下文切换。它可能会下载一个文件一段时间,然后它可能会切换到呈现用户请求的网页的html。这就是所谓的并发性。并发进程从不同的时间点开始,它们的执行周期重叠。在这种情况下,下载和呈现从不同的时间点开始,它们的执行重叠。
假设同一浏览器运行在多核处理器上。在这种情况下,文件下载组件和HTML呈现组件可能同时在不同的内核中运行。这就是所谓的并行性。
需要说明的是:并行性Parallelism不会总是导致更快的执行时间,这是因为并行运行的组件可能需要相互通信。
进程,线程和协程
-
进程(Process)
进程是可并发执行的程序在某个数据集合上的一次计算活动,也是操作系统进行资源分配和调度的基本单位。进程一般由程序、数据集、进程控制块三部分组成。进程的局限是创建、撤销和切换的开销比较大。 -
线程(Thread)
线程是操作系统进程中能够并发执行的实体,是处理器调度和分派的基本单位。每个进程内可包含多个可并发执行的线程。线程的优点是减小了程序并发执行时的开销,提高了操作系统的并发性能,缺点是线程自己基本不拥有系统资源,只拥有少量必不可少的资源:程序计数器、一组寄存器、栈。同属一个进程的线程共享进程所拥有的主存空间和资源。 -
协程(Coroutine)
协程是一种用户态的轻量级线程,又称微线程,英文名Coroutine,协程的调度完全由用户控制。人们通常将协程和子程序(函数)比较着理解。 子程序调用总是一个入口,一次返回,一旦退出即完成了子程序的执行。Go语言对于并发的实现是靠协程:Goroutine。
Go的并发调度:G-P-M模型
在操作系统提供的内核线程之上,Go搭建了一个特有的两级线程模型。goroutine机制实现了M : N的线程模型,goroutine机制是协程(coroutine)的一种实现,golang内置的调度器,可以让多核CPU中每个CPU执行一个协程。
1. 调度器如何工作
// 用go关键字加上一个函数(这里用了匿名函数)
// 调用就做到了在一个新的“线程”并发执行任务
go func() {
// do something in one new goroutine
}()
调度器的实现主要包括4个结构:M,P,G,Sched,前三个定义在runtime.h中,Sched定义在proc.c中。
* Sched结构就是调度器,它维护有存储M和G的队列以及调度器的一些状态信息等。
* M结构是Machine,系统线程,它由操作系统管理的,goroutine就是跑在M之上的;M是一个很大的结构,里面维护小对象内存cache(mcache)、当前执行的goroutine、随机数发生器等等非常多的信息。
* P结构是Processor,处理器,它的主要用途就是用来执行goroutine的,它维护了一个goroutine队列,即runqueue。Processor是让我们从N:1调度到M:N调度的重要部分。
* G是goroutine实现的核心结构,它包含了栈,指令指针,以及其他对调度goroutine很重要的信息,例如其阻塞的channel。
说明:Processor的数量是在启动时被设置为环境变量GOMAXPROCS的值,也可以通过运行时调用函数runtime.GOMAXPROCS()进行设置。Processor数量固定意味着任意时刻只有GOMAXPROCS个线程在运行go代码。
在单核处理器的场景下,所有goroutine运行在同一个M系统线程中,每一个M系统线程维护一个Processor,任何时刻,一个Processor中只有一个goroutine,其他goroutine在runqueue中等待。一个goroutine运行完自己的时间片后,让出上下文,回到runqueue中。 多核处理器的场景下,为了运行goroutines,每个M系统线程会持有一个Processor。
2. 线程阻塞
正常情况下,Go调度器会按照上面的流程进行调度,但当发生阻塞时,比如Goroutine进行系统调用,Go调度器会再创建一个线程(或者从线程池取),当前的M线程放弃了它的Processor,P转到新的线程中去运行。
3. runqueue执行完成
当其中一个Processor的runqueue为空,没有goroutine可以调度。它会从另外一个上下文偷取一半的goroutine。
Go原生支持并发:Go调度器负责将并发任务分配到不同的内核线程上运行,然后内核调度器接管内核线程在CPU上的执行与调度。
共享资源安全问题
如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时 读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)。如果这种竞争状态处理不当,可能会出现安全问题。
package main
import (
"fmt"
"runtime"
"sync"
)
var (
count int32
wg sync.WaitGroup
)
func main() {
wg.Add(2)
go incCount()
go incCount()
wg.Wait()
fmt.Println(count)
}
func incCount() {
defer wg.Done()
for i := 0; i < 2; i++ {
value := count
runtime.Gosched()
value++
count = value
}
}
//输出:2
竞争状态下程序行为的分析
锁住共享资源
- 原子函数
原子函数能够以很底层的加锁机制来同步访问整型变量和指针。
sync/atomic - 互斥锁
互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以 执行这个临界区代码。
package main
import (
"fmt"
"runtime"
"sync"
)
var (
count int32
wg sync.WaitGroup//声明互斥锁
mutex sync.Mutex
)
func main() {
wg.Add(2)
go incCount()
go incCount()
wg.Wait()
fmt.Println(count)
}
func incCount() {
defer wg.Done()
for i := 0; i < 2; i++ {
mutex.Lock()//临界区上锁
value := count
runtime.Gosched()
value++
count = value
mutex.Unlock()//解锁
}
}
Channel
使用通道,通过发送和接收需要共享的资源,在 goroutine 之间做同步。声明通道时,需要指定将要被共享的数据的类型。通道分为有缓冲通道和无缓冲通道。
// 无缓冲的整型通道
unbuffered := make(chan int)
// 有缓冲的字符串通道
buffered := make(chan string, 10)
//定义单向管道
var send chan<- int //只能发送
var receive <-chan int //只能接收
// 有缓冲的字符串通道
buffered := make(chan string, 10)
// 通过通道发送一个字符串
buffered <- "Gopher"
// 从通道接收一个字符串
value := <-buffered
对通道的操作行为总结:
操作 | unbuffered channel | closed channel | not-closed buffered channel |
---|---|---|---|
close | 成功close | panic | 成功 close |
写 ch <- | 阻塞 | panic | 通道满:阻塞;通道未满:成功写入数据 |
读 <- ch | 阻塞 | 通道内有数据可读:成功读取数据;通道内没有数据可读:返回对应类型的零值 | 通道空:阻塞;通道非空:成功读取数据 |
读取一个已关闭的 channel 时,总是能读取到对应类型的零值,为了和读取非空未关闭 channel 的行为区别,可以使用两个接收值来加以判断:
// ok is false when ch is closed
v, ok := <-ch
优雅的关闭Channel
关闭channel应遵循以下准则:
- 不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ;
- 有多个写入端时,不要再写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ;
- 如果只有一个写入端,可以在这个写入端放心关闭 channel ;
具体分析:
- 一写多读
这种场景下这个唯一的写入端可以关闭 channel 用来通知读取端所有数据都已经写入完成了。读取端只需要用 for range 把 channel 中数据遍历完就可以了,当� channel 关闭时,for range 仍然会将 channel 缓冲中的数据全部遍历完然后再退出循环:
package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
send := func() {
for i := 0; i < 100; i++ {
ch <- i
}
// signal sending finish
close(ch)
}
recv := func(id int) {
defer wg.Done()
for i := range ch {
fmt.Printf("receiver #%d get %d\n", id, i)
}
fmt.Printf("receiver #%d exit\n", id)
}
wg.Add(3)
go recv(0)
go recv(1)
go recv(2)
send()
wg.Wait()
}
- 多写一读
这种场景下虽然可以用 sync.Once 来解决多个写入端重复关闭 channel 的问题,但更优雅的办法设置一个额外的 channel ,由读取端通过关闭来通知写入端任务完成不要再继续再写入数据了:
package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
done := make(chan struct{})
send := func(id int) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id*1000 + i:
}
}
}
recv := func() {
count := 0
for i := range ch {
fmt.Printf("receiver get %d\n", i)
count++
if count >= 1000 {
// signal recving finish
close(done)
return
}
}
}
wg.Add(3)
go send(0)
go send(1)
go send(2)
recv()
wg.Wait()
}
- 多写多读
这种场景稍微复杂,和上面的例子一样,也需要设置一个额外 channel 用来通知多个写入端和读取端。另外需要起一个额外的协程来通过关闭这个 channel 来广播通知:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
done := make(chan struct{})
send := func(id int) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id*1000 + i:
}
}
}
recv := func(id int) {
defer wg.Done()
for {
select {
case <-done:
// get exit signal
fmt.Printf("receiver #%d exit\n", id)
return
case i := <-ch:
fmt.Printf("receiver #%d get %d\n", id, i)
time.Sleep(time.Millisecond)
}
}
}
wg.Add(6)
go send(0)
go send(1)
go send(2)
go recv(0)
go recv(1)
go recv(2)
time.Sleep(time.Second)
// signal finish
close(done)
// wait all sender and receiver exit
wg.Wait()
}
网友评论