Go并发

作者: 普朗tong | 来源:发表于2020-04-20 00:54 被阅读0次

并发和并行

Go是并发语言,而不是并行语言。(Go is a concurrent language and not a parallel one.)

并发(Concurrency)的关键在于你有处理多个任务的能力,不一定要同时
并行(Parallellism)的关键是你有同时处理多个任务的能力
最关键的点就是:是否是『同时』

下面举个例子来说明:
假设我们正在编写一个web浏览器。web浏览器有各种组件。其中两个是web页面呈现区域和下载文件从internet下载的下载器。假设我们以这样的方式构建了浏览器的代码,这样每个组件都可以独立地执行。
当这个浏览器运行在单个核处理器中时,处理器将在浏览器的两个组件之间进行上下文切换。它可能会下载一个文件一段时间,然后它可能会切换到呈现用户请求的网页的html。这就是所谓的并发性。并发进程从不同的时间点开始,它们的执行周期重叠。在这种情况下,下载和呈现从不同的时间点开始,它们的执行重叠。
假设同一浏览器运行在多核处理器上。在这种情况下,文件下载组件和HTML呈现组件可能同时在不同的内核中运行。这就是所谓的并行性

并发与并行
需要说明的是:并行性Parallelism不会总是导致更快的执行时间,这是因为并行运行的组件可能需要相互通信。

进程,线程和协程

  1. 进程(Process)
    进程是可并发执行的程序在某个数据集合上的一次计算活动,也是操作系统进行资源分配和调度的基本单位。进程一般由程序、数据集、进程控制块三部分组成。进程的局限是创建、撤销和切换的开销比较大。

  2. 线程(Thread)
    线程是操作系统进程中能够并发执行的实体,是处理器调度和分派的基本单位。每个进程内可包含多个可并发执行的线程。线程的优点是减小了程序并发执行时的开销,提高了操作系统的并发性能,缺点是线程自己基本不拥有系统资源,只拥有少量必不可少的资源:程序计数器、一组寄存器、栈。同属一个进程的线程共享进程所拥有的主存空间和资源。

  3. 协程(Coroutine)
    协程是一种用户态的轻量级线程,又称微线程,英文名Coroutine,协程的调度完全由用户控制。人们通常将协程和子程序(函数)比较着理解。 子程序调用总是一个入口,一次返回,一旦退出即完成了子程序的执行。

    Go语言对于并发的实现是靠协程:Goroutine。

Go的并发调度:G-P-M模型

在操作系统提供的内核线程之上,Go搭建了一个特有的两级线程模型。goroutine机制实现了M : N的线程模型,goroutine机制是协程(coroutine)的一种实现,golang内置的调度器,可以让多核CPU中每个CPU执行一个协程。

1. 调度器如何工作

// 用go关键字加上一个函数(这里用了匿名函数)
// 调用就做到了在一个新的“线程”并发执行任务
go func() { 
// do something in one new goroutine
}()

调度器的实现主要包括4个结构:M,P,G,Sched,前三个定义在runtime.h中,Sched定义在proc.c中。

* Sched结构就是调度器,它维护有存储M和G的队列以及调度器的一些状态信息等。
* M结构是Machine,系统线程,它由操作系统管理的,goroutine就是跑在M之上的;M是一个很大的结构,里面维护小对象内存cache(mcache)、当前执行的goroutine、随机数发生器等等非常多的信息。
* P结构是Processor,处理器,它的主要用途就是用来执行goroutine的,它维护了一个goroutine队列,即runqueue。Processor是让我们从N:1调度到M:N调度的重要部分。
* G是goroutine实现的核心结构,它包含了栈,指令指针,以及其他对调度goroutine很重要的信息,例如其阻塞的channel。

说明:Processor的数量是在启动时被设置为环境变量GOMAXPROCS的值,也可以通过运行时调用函数runtime.GOMAXPROCS()进行设置。Processor数量固定意味着任意时刻只有GOMAXPROCS个线程在运行go代码。

在单核处理器的场景下,所有goroutine运行在同一个M系统线程中,每一个M系统线程维护一个Processor,任何时刻,一个Processor中只有一个goroutine,其他goroutine在runqueue中等待。一个goroutine运行完自己的时间片后,让出上下文,回到runqueue中。 多核处理器的场景下,为了运行goroutines,每个M系统线程会持有一个Processor。


2. 线程阻塞
正常情况下,Go调度器会按照上面的流程进行调度,但当发生阻塞时,比如Goroutine进行系统调用,Go调度器会再创建一个线程(或者从线程池取),当前的M线程放弃了它的Processor,P转到新的线程中去运行。

3. runqueue执行完成
当其中一个Processor的runqueue为空,没有goroutine可以调度。它会从另外一个上下文偷取一半的goroutine。

Go原生支持并发:Go调度器负责将并发任务分配到不同的内核线程上运行,然后内核调度器接管内核线程在CPU上的执行与调度。


共享资源安全问题

如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时 读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)。如果这种竞争状态处理不当,可能会出现安全问题。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

var (
    count int32
    wg    sync.WaitGroup
)

func main() {
    wg.Add(2)
    go incCount()
    go incCount()
    wg.Wait()
    fmt.Println(count)
}

func incCount() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
        value := count
        runtime.Gosched()
        value++
        count = value
    }
}
//输出:2
竞争状态下程序行为的分析

锁住共享资源

  1. 原子函数
    原子函数能够以很底层的加锁机制来同步访问整型变量和指针。
    sync/atomic
  2. 互斥锁
    互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以 执行这个临界区代码。
package main

import (
    "fmt"
    "runtime"
    "sync"
)

var (
    count int32
    wg    sync.WaitGroup//声明互斥锁
    mutex sync.Mutex
)

func main() {
    wg.Add(2)
    go incCount()
    go incCount()
    wg.Wait()
    fmt.Println(count)
}

func incCount() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
        mutex.Lock()//临界区上锁
        value := count
        runtime.Gosched()
        value++
        count = value
        mutex.Unlock()//解锁
    }
}

Channel

使用通道,通过发送和接收需要共享的资源,在 goroutine 之间做同步。声明通道时,需要指定将要被共享的数据的类型。通道分为有缓冲通道和无缓冲通道。

// 无缓冲的整型通道
unbuffered := make(chan int)
// 有缓冲的字符串通道
buffered := make(chan string, 10)
//定义单向管道
var send chan<- int //只能发送
var receive <-chan int //只能接收
// 有缓冲的字符串通道
buffered := make(chan string, 10)
// 通过通道发送一个字符串 
buffered <- "Gopher"
// 从通道接收一个字符串
value := <-buffered

对通道的操作行为总结:

操作 unbuffered channel closed channel not-closed buffered channel
close 成功close panic 成功 close
写 ch <- 阻塞 panic 通道满:阻塞;通道未满:成功写入数据
读 <- ch 阻塞 通道内有数据可读:成功读取数据;通道内没有数据可读:返回对应类型的零值 通道空:阻塞;通道非空:成功读取数据

读取一个已关闭的 channel 时,总是能读取到对应类型的零值,为了和读取非空未关闭 channel 的行为区别,可以使用两个接收值来加以判断:

// ok is false when ch is closed
v, ok := <-ch

优雅的关闭Channel

关闭channel应遵循以下准则:

  • 不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ;
  • 有多个写入端时,不要再写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ;
  • 如果只有一个写入端,可以在这个写入端放心关闭 channel ;

具体分析:

  1. 一写多读
    这种场景下这个唯一的写入端可以关闭 channel 用来通知读取端所有数据都已经写入完成了。读取端只需要用 for range 把 channel 中数据遍历完就可以了,当� channel 关闭时,for range 仍然会将 channel 缓冲中的数据全部遍历完然后再退出循环:
package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int, 100)

    send := func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
        // signal sending finish
        close(ch)
    }

    recv := func(id int) {
        defer wg.Done()
        for i := range ch {
            fmt.Printf("receiver #%d get %d\n", id, i)
        }
        fmt.Printf("receiver #%d exit\n", id)
    }

    wg.Add(3)
    go recv(0)
    go recv(1)
    go recv(2)
    send()

    wg.Wait()
}
  1. 多写一读
    这种场景下虽然可以用 sync.Once 来解决多个写入端重复关闭 channel 的问题,但更优雅的办法设置一个额外的 channel ,由读取端通过关闭来通知写入端任务完成不要再继续再写入数据了:
package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int, 100)
    done := make(chan struct{})

    send := func(id int) {
        defer wg.Done()
        for i := 0; ; i++ {
            select {
            case <-done:
                // get exit signal
                fmt.Printf("sender #%d exit\n", id)
                return
            case ch <- id*1000 + i:
            }
        }
    }

    recv := func() {
        count := 0
        for i := range ch {
            fmt.Printf("receiver get %d\n", i)
            count++
            if count >= 1000 {
                // signal recving finish
                close(done)
                return
            }
        }
    }

    wg.Add(3)
    go send(0)
    go send(1)
    go send(2)
    recv()

    wg.Wait()
}
  1. 多写多读
    这种场景稍微复杂,和上面的例子一样,也需要设置一个额外 channel 用来通知多个写入端和读取端。另外需要起一个额外的协程来通过关闭这个 channel 来广播通知:
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int, 100)
    done := make(chan struct{})

    send := func(id int) {
        defer wg.Done()
        for i := 0; ; i++ {
            select {
            case <-done:
                // get exit signal
                fmt.Printf("sender #%d exit\n", id)
                return
            case ch <- id*1000 + i:
            }
        }
    }

    recv := func(id int) {
        defer wg.Done()
        for {
            select {
            case <-done:
                // get exit signal
                fmt.Printf("receiver #%d exit\n", id)
                return
            case i := <-ch:
                fmt.Printf("receiver #%d get %d\n", id, i)
                time.Sleep(time.Millisecond)
            }
        }
    }

    wg.Add(6)
    go send(0)
    go send(1)
    go send(2)
    go recv(0)
    go recv(1)
    go recv(2)

    time.Sleep(time.Second)
    // signal finish
    close(done)
    // wait all sender and receiver exit
    wg.Wait()
}

参考:
Golang并发
并发模型的一些实例和详细分析
Golang channel使用总结

相关文章

  • Go语言并发

    Go语言并发 Go语言级别支持协程,叫做goroutine Go 语言从语言层面支持并发和并行的开发操作 Go并发...

  • Go基础语法(九)

    Go语言并发 Go 是并发式语言,而不是并行式语言。 并发是指立即处理多个任务的能力。 Go 编程语言原生支持并发...

  • Go并发模型:并发协程chan的优雅退出

    Go并发模型:并发协程chan的优雅退出 go chan的使用

  • Go并发

    并发和并行 Go是并发语言,而不是并行语言。(Go is a concurrent language and no...

  • 第14章-并发性Concurrency

    并发性Concurrency 1.1 什么是并发 Go是并发语言,而不是并行语言。在讨论如何在Go中进行并发处理之...

  • Golang(十四) 并发性Concurrency

    并发性Concurrency 1.1 什么是并发 Go是并发语言,而不是并行语言。在讨论如何在Go中进行并发处理之...

  • GO语言初级学习之代码案例13 (QQ群聊)

    @(go语言 黑马)[GO语言] 并发聊天室 题目:利用Go语言高并发的特性,编写一个类似QQ群聊功能的并发聊天服...

  • Learn Golang in Days - Day 16-Go

    Learn Golang in Days - Day 16-Go并发 简介 go语言支持并发,只需要使用go关键字...

  • day08-go.GPM

    当别人到go为什么支持高并发,或者问为什么go本身对并发编程友好?以及go与Java对比的并发对比 正确回答: 在...

  • Go Goroutine

    协程并发 Go并发 什么是goroutine

网友评论

      本文标题:Go并发

      本文链接:https://www.haomeiwen.com/subject/vxxgbhtx.html