美文网首页
go 的并发机制

go 的并发机制

作者: OOM_Killer | 来源:发表于2019-06-19 23:58 被阅读0次

并发与并行的概念

① 多线程程序在单核cpu上运行就是并发;
② 多线程程序在多核cpu上运行就是并行;

goroutine 特点

  • 它是一个协程,是一个轻量级的线程。
  • 非抢占式多任务处理,由协程主动交出控制权。(下面会解释)
  • 编译器/解释器/虚拟机层面的多任务。
  • 多个协程可能运行在多个线程之上,这个是由调度器去决定的。

协程是非抢占式的,由协程主动交出控制权,而线程是抢占式的,也就是由操作系统主动停掉一个线程让给其他线程执行。

非抢占式多任务

非抢占式的含义就是其他任务不会抢我的cpu,而是等待我主动让出去。别的goroutine才可以执行。而io等操作会主动让出去goroutine的执行权。

func main() {
    var a [10]int
    for i := 0;i< 10;i++{
        go func(i int) {
            for {
                a[i]++
            }
        }(i)
    }
    time.Sleep(time.Second)
    fmt.Println(a)
}

这段代码可能让人感觉 1s后就会退出阿,因为main函数会退出阿。其实不会的。这个程序会卡住。而其实main函数也是一个goroutine。而程序中对一个变量++ 是不会交出控制权的(io操作可以交出控制权,如fmt.println),所以main也得不到运行了。。可以看到这段程序死机了(cpu使用率400%,我是4核机器)

进程   USER      PR  NI    VIRT    RES    SHR   %CPU %MEM     TIME+ COMMAND                                                                   
20990 kiosk     20   0  102708   1440   1052 S 390.2  0.0   0:36.46 /tmp/___go_build_main_go  

为了手动交出控制权,可以添加一行 runtime.Gosched()
普通函数是协程的一个特例。因为协程中,main和dowork是可以相互交互的。
调度器会在合适的点进行切换,不需要人来介入。


goroutine的可能切换点

  • I/O select
  • channel
  • 函数调用
  • runtime.Gosched()
  • 等待锁
  • 其他
多个协程可能运行于多个线程之上。

还是以刚才的例子。观察goroutine 运行占cpu 362。他开启了6个线程,但实质上只运行了4个(看下图的pidstat输出),因为我的机器是4核。他运行在4核CPU上。go的调度器是很智能的。

pidstat -t -p 21312 1
Linux 4.15.0-50-generic (Prometheus)    2019年06月05日     _x86_64_    (4 CPU)

23时34分45秒   UID      TGID       TID    %usr %system  %guest   %wait    %CPU   CPU  Command
23时34分46秒  1000     21312         -  100.00    1.00    0.00    0.00  100.00     3  ___go_build_mai
23时34分46秒  1000         -     21312    0.00    0.00    0.00    0.00    0.00     3  |_____go_build_mai
23时34分46秒  1000         -     21313    0.00    0.00    0.00    0.00    0.00     2  |_____go_build_mai
23时34分46秒  1000         -     21314   98.00    0.00    0.00    2.00   98.00     2  |_____go_build_mai
23时34分46秒  1000         -     21315  100.00    0.00    0.00    0.00  100.00     1  |_____go_build_mai
23时34分46秒  1000         -     21316   96.00    0.00    0.00    4.00   96.00     3  |_____go_build_mai
23时34分46秒  1000         -     21317   98.00    0.00    0.00    2.00   98.00     0  |_____go_build_mai

M,P,G

Go 的调度器内部有三个十分重要的结构,M,P,G。(M>P 如上个例子的 6 个M,4个P)

  • M 表示真正的内核OS线程,和POSIX里的thread差不多,真正干活的人。
  • P 表示调度等上下文,可以把他看做一个局部的调度器,使go代码在一个线程上跑,它是实现从N:1 (多个用户线程在一个内核线程上跑)到 N:M 映射的关键。
  • G 代表一个 goroutine,它有自己的栈,用于调度。
    M,P,G
    上图表示有两个物理线程M,每个M都拥有一个context(P),每一个P上又拥有一个正在运行的G和很多等待运行的G。
    P 的总数量可以通过 GOMAXPROCS() 设置。它表示真正的并发量,即有多少个goroutine可以同时运行。
    上面等待的(灰色)goroutine处于ready的就绪态。而每个P都维护着一个队列(runqueue)
M,P,G

当一个M(线程)阻塞了,P(调度器)可以转而投奔另一个OS线程,当一个OS线程M0 阻塞,P转而在OS线程M1 运行。调度器保证有足够的线程来运行所有的P。(如之前看到的4个P,6个M)
当M0返回时,它必须尝试取得一个 context P 调度器,一般情况下,会从其他的OS 线程上偷steal一个P过来。
如果没有偷到的话,他就把goroutine放到global runqueue 中,自己睡眠(放回线程池)。P 也会周期性检查global runqueue。


M,P,G

另一种情况就是某一个P所分配的任务G很快被执行完了(分配不均),这就导致一个context P 闲着,如果 global runqueue 上没有 G 了,那么它会偷其他P 的G。一般偷的话会偷一半。确保每个OS线程都能得到充分的使用。

这段参考知乎 Golang 的 goroutine 是如何实现的? Yi Wang 的回答

CSP 模型 channel

看下面的一段代码。这里的使用函数式编程,以一个函数创建一个通道。

func CreateWorker(id int) chan<- int {     // 返回一个只允许往里送数据的chan。
    c := make(chan int)
    go func() {
        for {
            n, ok := <-c    // 自己在goroutine 里收数据 
            if !ok {break}
            fmt.Printf("Worker %d received %d \n",id,n)
        }
    }()
    return c
}

func main() {
    var c [5]chan<- int
    for i := 0;i < 5; i++{
        c[i] = CreateWorker(i)      //创建5个没有buffer的通道 返回值是只允许发数据的chan
    }

    for j := 0;j < 5; j++ {
        c[j] <- j                       // 通道里写值来确保一个任务结束
        close(c[j])             // close 不是说必须的, 但是关闭通道的最好是发送方!!!
    }
}

执行结果:
Worker 0 received 0 
Worker 1 received 1 
Worker 2 received 2 
Worker 3 received 3 

这个函数有个问题啊。为什么打印出的是4个不是5个呢,在最后一个打印时,gorounting结束了,但是最后一个还没来得及打印。
如下改造, 加一个 done,当 done 中的数据被取出来后,打印的动作肯定也就完成了。

type Worker struct {
    In      chan    int
    Done    chan    bool
}

func doWorker(id int,w Worker) {
    for n := range w.In {
        fmt.Printf("Worker %d received %c \n",id,n)  // 先打印再down
        w.Done <- true
    }
}

func CreateWorker(id int) Worker {
    w := Worker{
        In: make(chan int),
        Done: make(chan bool),
    }
    go doWorker(id,w)
    return w
}

func main() {
    var workers [5]Worker
    for i := 0;i < 5; i++{
        workers[i] = CreateWorker(i)
    }

    for j := 0;j < 5; j++ {
        workers[j].In <- 'a'+j
        <- workers[j].Done   // 将数据送进去之后,等待work打印完成(等待down)
    }
}

执行结果
Worker 0 received a 
Worker 1 received b 
Worker 2 received c 
Worker 3 received d 
Worker 4 received e 

close 了的channel还可以接受数据吗?

通道被关闭,是还可以接着收数据的。如下面的代码 (呼应了通道的关闭最好是发送方!!)

close 了的channel 关闭了就不能再发送数据了,这里就不做解释了。

func worker(c chan int) {
    for {
        fmt.Printf("Worker recived %d\n",<-c)
    }
}

func main() {
    c := make(chan int)
    go worker(c)
    c <- 'a'
    c <- 'b'
    c <- 'd'
    close(c)

    time.Sleep(50 * time.Microsecond)
}

执行结果
Worker recived 97
Worker recived 98
Worker recived 100
Worker recived 0
Worker recived 0
Worker recived 0
Worker recived 0
....

当一个channel 被关闭了,就会一直收到 0。怎么避免呢?

  • 方法一: ok-parten 模式
    所有的 channel 接收者都会在 channel 关闭时,立立刻从阻塞等待中返回且 ok 值为 false。
func worker( c chan int) {
    for {
        if n,ok := <-c ;ok {
            fmt.Printf("Worker recived %d\n",n)
        }
    }
}
  • 方法二 :range 模式
func worker( c chan int) {
    for n := range c{
        fmt.Printf("Worker recived %d\n",n)
    }
}
打印怎么是顺序的?

这里 还有一个问题,打印的数据是按顺序的,这和直接按顺序打印没有区别了。(因为这里每打印一次就要等待一个down,down了才能开始下一次)
将done 和 打印分开就可以了

type Worker struct {
    In      chan    int
    Done    chan    bool
}

func doWorker(id int,w Worker) {
    for n := range w.In {
        fmt.Printf("Worker %d received %c \n",id,n)  // 先打印再down
        go func(w Worker) {
            w.Done <- true
        }(w)

    }
}

func CreateWorker(id int) Worker {
    w := Worker{
        In: make(chan int),
        Done: make(chan bool),
    }
    go doWorker(id,w)
    return w
}

func main() {
    var workers [20]Worker
    for i := 0;i < 20; i++{
        workers[i] = CreateWorker(i)
    }

    for i,worker := range workers {
        worker.In <- 'a'+i   // 专心往里送数据,然后打印
    }

    for _,worker := range workers {
        <- worker.Done    // 等待完成这件事放到最后面
    }
}

执行结果:
Worker 0 received a 
Worker 3 received d 
Worker 5 received f 
Worker 2 received c 
Worker 1 received b 
Worker 4 received e 
Worker 6 received g 
...

上述的方法还是不够优雅,看下面的

WaitGroup 去并发任务(确保任务都执行)

引入WaitGroup,当所有的任务都完成才退出。还有waitgroup确保一个任务被执行。

type Worker struct {
    In      chan    int
    Done    func()         // 函数式编程,Done 去调用 wg.Done
}

func doWorker(id int,w Worker) {
    for n := range w.In {
        fmt.Printf("Worker %d received %c \n",id,n)  // 先打印再down
        w.Done()
    }
}

func CreateWorker(id int,wg *sync.WaitGroup) Worker {   // 这里的wg必须是指针
    w := Worker{
        In: make(chan int),
        Done: func() {
            wg.Done()                      // 函数式编程,将wg.Done() 放在函数中
        },
    }
    go doWorker(id,w)
    return w
}

func main() {
    var workers [20]Worker
    var wg sync.WaitGroup
    for i := 0;i < 20; i++{
        wg.Add(1)
        workers[i] = CreateWorker(i,&wg)
    }

    for i,worker := range workers {
        worker.In <- 'a'+i
    }

    wg.Wait()
}

执行结果:
乱序打印

锁争抢

看以下代码。下面的代码是一段有问题的代码。

func main() {
    cnt := 0
    var wg  sync.WaitGroup
    for i := 0; i < 500 ; i++{
        wg.Add(1)
        go func(wg *sync.WaitGroup) {
            cnt++
            wg.Done()
        }(&wg)
    }
    wg.Wait()
    fmt.Println(cnt)
}

执行结果:
498

使用 go run -race 去检测一下。

$ go run -race goroutine.go 
==================
WARNING: DATA RACE
Read at 0x00c0000a6010 by goroutine 7:
  main.main.func1()
      /home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:45 +0x38

Previous write at 0x00c0000a6010 by goroutine 6:
  main.main.func1()
      /home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:45 +0x4e

Goroutine 7 (running) created at:
  main.main()
      /home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:44 +0xe4

Goroutine 6 (finished) created at:
  main.main()
      /home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:44 +0xe4
==================
496
Found 1 data race(s)

上面显示 cnt++ 即在读又在写

为了解决这种线程不安全,就需要加锁。经过改造得。这下的结果就正确了,结果为500。

func main() {
    cnt := 0
    var wg  sync.WaitGroup
    var lock sync.Mutex
    for i := 0; i < 500 ; i++{
        wg.Add(1)
        go func(wg *sync.WaitGroup) {
            lock.Lock()
            defer lock.Unlock()
            cnt++
            wg.Done()
        }(&wg)
    }
    wg.Wait()
    fmt.Println(cnt)
}

执行结果:
500
$ go run -race goroutine.go 
500

本例子是对一个变量进行++ 操作。这是线程不安全的。所以这种操作还是加锁比较安全

select 多路选择和超时

当select 中没有default的话,哪个个case收到值了,就执行并返回,否则一直阻塞等待。

func createWorker() <-chan int {
    ch := make(chan int)
    go func() {
        randInt :=  rand.New(rand.NewSource(time.Now().Unix())).Intn(10)   //产生随机数
        time.Sleep(time.Duration(randInt)*time.Second)  // 睡眠随机秒
        ch <- randInt     // 扔进去一个channel
    }()
    return ch
}

func main() {
    ch1 := createWorker()            
    ch2 := createWorker()

    select {                       // 2个都不返回的话就阻塞,一个返回就彻底运行结束
    case n := <- ch1:
        fmt.Println("ch1 get ",n)
    case n := <- ch2:
        fmt.Println("ch2 get ",n)
    }
}

执行结果:
ch2 get 6

那么select 有什么用呢?select 和 select,poll,epoll 类似。就是监听 IO 操作,当IO 操作发生时,就触发相应的动作,否则就阻塞。

  • 用处一 超时等待
    加上一个 time.After 。如果5s 还没有数据返回,就不阻塞了。
   ch1 := createWorker()
   ch2 := createWorker()

   select {
   case n := <- ch1:
       fmt.Println("ch1 get ",n)
   case n := <- ch2:
       fmt.Println("ch2 get ",n)
   case <- time.After(5*time.Second):
       fmt.Println("get value timeout ")
   }
  • 用处二 判断channel是否满或空
    因为ch1 和 ch2 都是空,所以就执行到 default,那么就可以判断所有的通道是否为空了。
 ch1 := make (chan int, 1)
ch2 := make (chan int, 1)

select {
case <-ch1:
    fmt.Println("ch1 pop one element")
case <-ch2:
    fmt.Println("ch2 pop one element")
default:
    fmt.Println("default")
}

相关文章

网友评论

      本文标题:go 的并发机制

      本文链接:https://www.haomeiwen.com/subject/nykdxctx.html