Go并发

作者: 普朗tong | 来源:发表于2020-04-20 00:54 被阅读0次

    并发和并行

    Go是并发语言,而不是并行语言。(Go is a concurrent language and not a parallel one.)

    并发(Concurrency)的关键在于你有处理多个任务的能力,不一定要同时
    并行(Parallellism)的关键是你有同时处理多个任务的能力
    最关键的点就是:是否是『同时』

    下面举个例子来说明:
    假设我们正在编写一个web浏览器。web浏览器有各种组件。其中两个是web页面呈现区域和下载文件从internet下载的下载器。假设我们以这样的方式构建了浏览器的代码,这样每个组件都可以独立地执行。
    当这个浏览器运行在单个核处理器中时,处理器将在浏览器的两个组件之间进行上下文切换。它可能会下载一个文件一段时间,然后它可能会切换到呈现用户请求的网页的html。这就是所谓的并发性。并发进程从不同的时间点开始,它们的执行周期重叠。在这种情况下,下载和呈现从不同的时间点开始,它们的执行重叠。
    假设同一浏览器运行在多核处理器上。在这种情况下,文件下载组件和HTML呈现组件可能同时在不同的内核中运行。这就是所谓的并行性

    并发与并行
    需要说明的是:并行性Parallelism不会总是导致更快的执行时间,这是因为并行运行的组件可能需要相互通信。

    进程,线程和协程

    1. 进程(Process)
      进程是可并发执行的程序在某个数据集合上的一次计算活动,也是操作系统进行资源分配和调度的基本单位。进程一般由程序、数据集、进程控制块三部分组成。进程的局限是创建、撤销和切换的开销比较大。

    2. 线程(Thread)
      线程是操作系统进程中能够并发执行的实体,是处理器调度和分派的基本单位。每个进程内可包含多个可并发执行的线程。线程的优点是减小了程序并发执行时的开销,提高了操作系统的并发性能,缺点是线程自己基本不拥有系统资源,只拥有少量必不可少的资源:程序计数器、一组寄存器、栈。同属一个进程的线程共享进程所拥有的主存空间和资源。

    3. 协程(Coroutine)
      协程是一种用户态的轻量级线程,又称微线程,英文名Coroutine,协程的调度完全由用户控制。人们通常将协程和子程序(函数)比较着理解。 子程序调用总是一个入口,一次返回,一旦退出即完成了子程序的执行。

      Go语言对于并发的实现是靠协程:Goroutine。

    Go的并发调度:G-P-M模型

    在操作系统提供的内核线程之上,Go搭建了一个特有的两级线程模型。goroutine机制实现了M : N的线程模型,goroutine机制是协程(coroutine)的一种实现,golang内置的调度器,可以让多核CPU中每个CPU执行一个协程。

    1. 调度器如何工作

    // 用go关键字加上一个函数(这里用了匿名函数)
    // 调用就做到了在一个新的“线程”并发执行任务
    go func() { 
    // do something in one new goroutine
    }()
    

    调度器的实现主要包括4个结构:M,P,G,Sched,前三个定义在runtime.h中,Sched定义在proc.c中。

    * Sched结构就是调度器,它维护有存储M和G的队列以及调度器的一些状态信息等。
    * M结构是Machine,系统线程,它由操作系统管理的,goroutine就是跑在M之上的;M是一个很大的结构,里面维护小对象内存cache(mcache)、当前执行的goroutine、随机数发生器等等非常多的信息。
    * P结构是Processor,处理器,它的主要用途就是用来执行goroutine的,它维护了一个goroutine队列,即runqueue。Processor是让我们从N:1调度到M:N调度的重要部分。
    * G是goroutine实现的核心结构,它包含了栈,指令指针,以及其他对调度goroutine很重要的信息,例如其阻塞的channel。
    

    说明:Processor的数量是在启动时被设置为环境变量GOMAXPROCS的值,也可以通过运行时调用函数runtime.GOMAXPROCS()进行设置。Processor数量固定意味着任意时刻只有GOMAXPROCS个线程在运行go代码。

    在单核处理器的场景下,所有goroutine运行在同一个M系统线程中,每一个M系统线程维护一个Processor,任何时刻,一个Processor中只有一个goroutine,其他goroutine在runqueue中等待。一个goroutine运行完自己的时间片后,让出上下文,回到runqueue中。 多核处理器的场景下,为了运行goroutines,每个M系统线程会持有一个Processor。


    2. 线程阻塞
    正常情况下,Go调度器会按照上面的流程进行调度,但当发生阻塞时,比如Goroutine进行系统调用,Go调度器会再创建一个线程(或者从线程池取),当前的M线程放弃了它的Processor,P转到新的线程中去运行。

    3. runqueue执行完成
    当其中一个Processor的runqueue为空,没有goroutine可以调度。它会从另外一个上下文偷取一半的goroutine。

    Go原生支持并发:Go调度器负责将并发任务分配到不同的内核线程上运行,然后内核调度器接管内核线程在CPU上的执行与调度。


    共享资源安全问题

    如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时 读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)。如果这种竞争状态处理不当,可能会出现安全问题。

    package main
    
    import (
        "fmt"
        "runtime"
        "sync"
    )
    
    var (
        count int32
        wg    sync.WaitGroup
    )
    
    func main() {
        wg.Add(2)
        go incCount()
        go incCount()
        wg.Wait()
        fmt.Println(count)
    }
    
    func incCount() {
        defer wg.Done()
        for i := 0; i < 2; i++ {
            value := count
            runtime.Gosched()
            value++
            count = value
        }
    }
    //输出:2
    
    竞争状态下程序行为的分析

    锁住共享资源

    1. 原子函数
      原子函数能够以很底层的加锁机制来同步访问整型变量和指针。
      sync/atomic
    2. 互斥锁
      互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以 执行这个临界区代码。
    package main
    
    import (
        "fmt"
        "runtime"
        "sync"
    )
    
    var (
        count int32
        wg    sync.WaitGroup//声明互斥锁
        mutex sync.Mutex
    )
    
    func main() {
        wg.Add(2)
        go incCount()
        go incCount()
        wg.Wait()
        fmt.Println(count)
    }
    
    func incCount() {
        defer wg.Done()
        for i := 0; i < 2; i++ {
            mutex.Lock()//临界区上锁
            value := count
            runtime.Gosched()
            value++
            count = value
            mutex.Unlock()//解锁
        }
    }
    

    Channel

    使用通道,通过发送和接收需要共享的资源,在 goroutine 之间做同步。声明通道时,需要指定将要被共享的数据的类型。通道分为有缓冲通道和无缓冲通道。

    // 无缓冲的整型通道
    unbuffered := make(chan int)
    // 有缓冲的字符串通道
    buffered := make(chan string, 10)
    //定义单向管道
    var send chan<- int //只能发送
    var receive <-chan int //只能接收
    // 有缓冲的字符串通道
    buffered := make(chan string, 10)
    // 通过通道发送一个字符串 
    buffered <- "Gopher"
    // 从通道接收一个字符串
    value := <-buffered
    

    对通道的操作行为总结:

    操作 unbuffered channel closed channel not-closed buffered channel
    close 成功close panic 成功 close
    写 ch <- 阻塞 panic 通道满:阻塞;通道未满:成功写入数据
    读 <- ch 阻塞 通道内有数据可读:成功读取数据;通道内没有数据可读:返回对应类型的零值 通道空:阻塞;通道非空:成功读取数据

    读取一个已关闭的 channel 时,总是能读取到对应类型的零值,为了和读取非空未关闭 channel 的行为区别,可以使用两个接收值来加以判断:

    // ok is false when ch is closed
    v, ok := <-ch
    

    优雅的关闭Channel

    关闭channel应遵循以下准则:

    • 不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ;
    • 有多个写入端时,不要再写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ;
    • 如果只有一个写入端,可以在这个写入端放心关闭 channel ;

    具体分析:

    1. 一写多读
      这种场景下这个唯一的写入端可以关闭 channel 用来通知读取端所有数据都已经写入完成了。读取端只需要用 for range 把 channel 中数据遍历完就可以了,当� channel 关闭时,for range 仍然会将 channel 缓冲中的数据全部遍历完然后再退出循环:
    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        wg := &sync.WaitGroup{}
        ch := make(chan int, 100)
    
        send := func() {
            for i := 0; i < 100; i++ {
                ch <- i
            }
            // signal sending finish
            close(ch)
        }
    
        recv := func(id int) {
            defer wg.Done()
            for i := range ch {
                fmt.Printf("receiver #%d get %d\n", id, i)
            }
            fmt.Printf("receiver #%d exit\n", id)
        }
    
        wg.Add(3)
        go recv(0)
        go recv(1)
        go recv(2)
        send()
    
        wg.Wait()
    }
    
    1. 多写一读
      这种场景下虽然可以用 sync.Once 来解决多个写入端重复关闭 channel 的问题,但更优雅的办法设置一个额外的 channel ,由读取端通过关闭来通知写入端任务完成不要再继续再写入数据了:
    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        wg := &sync.WaitGroup{}
        ch := make(chan int, 100)
        done := make(chan struct{})
    
        send := func(id int) {
            defer wg.Done()
            for i := 0; ; i++ {
                select {
                case <-done:
                    // get exit signal
                    fmt.Printf("sender #%d exit\n", id)
                    return
                case ch <- id*1000 + i:
                }
            }
        }
    
        recv := func() {
            count := 0
            for i := range ch {
                fmt.Printf("receiver get %d\n", i)
                count++
                if count >= 1000 {
                    // signal recving finish
                    close(done)
                    return
                }
            }
        }
    
        wg.Add(3)
        go send(0)
        go send(1)
        go send(2)
        recv()
    
        wg.Wait()
    }
    
    1. 多写多读
      这种场景稍微复杂,和上面的例子一样,也需要设置一个额外 channel 用来通知多个写入端和读取端。另外需要起一个额外的协程来通过关闭这个 channel 来广播通知:
    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func main() {
        wg := &sync.WaitGroup{}
        ch := make(chan int, 100)
        done := make(chan struct{})
    
        send := func(id int) {
            defer wg.Done()
            for i := 0; ; i++ {
                select {
                case <-done:
                    // get exit signal
                    fmt.Printf("sender #%d exit\n", id)
                    return
                case ch <- id*1000 + i:
                }
            }
        }
    
        recv := func(id int) {
            defer wg.Done()
            for {
                select {
                case <-done:
                    // get exit signal
                    fmt.Printf("receiver #%d exit\n", id)
                    return
                case i := <-ch:
                    fmt.Printf("receiver #%d get %d\n", id, i)
                    time.Sleep(time.Millisecond)
                }
            }
        }
    
        wg.Add(6)
        go send(0)
        go send(1)
        go send(2)
        go recv(0)
        go recv(1)
        go recv(2)
    
        time.Sleep(time.Second)
        // signal finish
        close(done)
        // wait all sender and receiver exit
        wg.Wait()
    }
    

    参考:
    Golang并发
    并发模型的一些实例和详细分析
    Golang channel使用总结

    相关文章

      网友评论

          本文标题:Go并发

          本文链接:https://www.haomeiwen.com/subject/vxxgbhtx.html