美文网首页Go
Go语言基础(五)—— 并发编程

Go语言基础(五)—— 并发编程

作者: 齐舞647 | 来源:发表于2020-02-26 21:28 被阅读0次

    前言:
    本专题用于记录自己(647)在Go语言方向的学习和积累。
    系列内容比较偏基础,推荐给想要入门Go语言开发者们阅读。

    目录如下:
    Go语言基础(一)—— 简介、环境配置、HelloWorld
    Go语言基础(二)—— 基本常用语法
    Go语言基础(三)—— 面向对象编程
    Go语言基础(四)—— 优质的容错处理
    Go语言基础(五)—— 并发编程
    Go语言基础(六)—— 测试、反射、Unsafe
    Go语言基础(七)—— 架构 & 常见任务
    Go语言基础(八)—— 性能调优


    本篇将介绍如下内容:
    1.协程机制(Groutine
    2.共享内存并发机制(协程安全)
    3.CSP并发机制(channel
    4.多路选择和超时控制(select
    5.channel的关闭和广播(channel
    6.任务的取消
    7.Context与关联任务取消
    8.常见并发任务(实战)

    一、协程机制

    相信大家肯定都知道 “线程”“进程” 的概念。

    而在Go语言中,“协程”可以理解为更轻量级的线程。
    通过调度“协程”就可以把系统Kernel的效率发挥到极致。

    通过一张表格,我们来对比一下协程与线程的区别。

    • Thread vs. Groutine:
    \ 默认栈大小(创建时) KSE对应关系(Kernel Space Entity)
    线程 Thread 1M 1 : 1
    协程 Groutine 2K M : N

    协程vs.线程的优势在于:

    • 线程之间的切换会牵扯到内核中系统线程(kernel entity)的切换,这会造成较大的成本。
    • 而多个协程在同一个系统线程(kernel entity)下切换,就能降低切换系统线程(kernel entity)的成本。(如上图所示)

    协程的使用:

    语法:go + func

    func TestGroutine(t *testing.T) {
        for i := 0; i < 10; i++ {
            go func(i int) {
                fmt.Println(i) // 正确案例,值传递。各个协程无竞争关系。
            }(i)
    
            // go func() {
            //  fmt.Println(i) // 错误案例,共享变量。各个协程有竞争关系
            // }()
        }
        time.Sleep(time.Millisecond * 50)
    }
    

    二、共享内存并发机制(协程安全)

    说到协程安全,我们第一个会想到的就是加锁(lock)。
    通过加锁来保证协程安全。

    在Go语言中也是如此,我们来看个例子。

    • 协程并发,导致的协程不安全:
    // 协程不安全demo
    func TestCounterThreadUnsafe(t *testing.T) {
        counter := 0
        for i := 0; i < 5000; i++ {
            go func() {
                counter++
            }()
        }
        time.Sleep(1 * time.Second)
        t.Logf("counter = %d", counter)
    }
    

    结果如下:

    === RUN   TestCounterThreadUnsafe
    --- PASS: TestCounterThreadUnsafe (1.00s)
        share_mem_test.go:18: counter = 4765
    

    这时就会发现,计算错误,因为并发导致了漏值。

    • 解决方式一:
      普通加锁,并加延迟等待协程执行完毕(不推荐)
    // 协程等待demo(停1秒,不推荐)
    func TestCounterThreadSafe(t *testing.T) {
        var mut sync.Mutex
        counter := 0
        for i := 0; i < 5000; i++ {
            go func() {
                defer func() {
                    mut.Unlock() //函数调用完成后:解锁,保证协程安全
                }()
                mut.Lock() // 函数将要调用前:加锁,保证协程安全
                counter++
            }()
        }
        time.Sleep(1 * time.Second) // 等待一秒,等协程全部执行完
        t.Logf("counter = %d", counter)
    }
    

    结果如下:

    === RUN   TestCounterThreadSafe
    --- PASS: TestCounterThreadSafe (1.01s)
        share_mem_test.go:35: counter = 5000
    

    结果正确,但是有一个问题。因为这里有个1秒的延迟等待,保证协程运行完毕再调用结果。因此,有没有更好的处理方式呢?接下来我们再优化一下。

    • 解决方式二:
      推荐! 使用同步等待队列(WaitGroup)保证顺序执行。
    // 协程安全Demo
    func TestCounterWaitGroup(t *testing.T) {
        var mut sync.Mutex    // 互斥锁
        var wg sync.WaitGroup // 等待队列
        counter := 0
        for i := 0; i < 5000; i++ {
            wg.Add(1) // 加个任务
            go func() {
                defer func() {
                    mut.Unlock() //函数调用完成后:解锁,保证协程安全
                }()
                mut.Lock() // 函数将要调用前:加锁,保证协程安全
                counter++
                wg.Done() // 做完任务
            }()
        }
        wg.Wait() //等待所有任务执行完毕
        t.Logf("counter = %d", counter)
    }
    

    运行结果如下:

    === RUN   TestCounterWaitGroup
    --- PASS: TestCounterWaitGroup (0.00s)
        share_mem_test.go:55: counter = 5000
    

    这样的话,可以看出:互斥锁Mutex和等待队列WaitGroup不仅保证了协程的安全,还避免了提前打印结果。(✔️)


    三、CSP并发机制

    1. CSP

    CSP(Communicating sequential processes):通信顺序进程(管道通信)。
    简单来说,CSP是通过Channel(管道)来通信的。

    Go 中的Channel(管道)有容量限制并且独立于处理Groutine(协程)。

    2. Channel

    Go中常见的Channel有两种,分别对应为ChannelBuffer Channel

    • 第一种:Channel(无缓冲)

    首先,发送者与接受者必须同时站在Channel的两端才进行交互。
    如果一方不在,另一方就会阻塞在一端,直到两端都在才进行交互。

    创建语法:make(chan [type])

    retChannel := make(chan string) // 创建无缓冲channel,并指明channel中的数据为string,双端等待
    

    输入语法:channel <-

    channel <- object // channel输入
    

    获取语法:<- channel

    object <- channel // channel输出
    
    • 第二种:Buffer Channel(有缓冲)

    这是一种稍微高级一点的Channel方式,(更加松耦合)。

    首先,给Channel设置一个容量大小,并且不要求发送者与接受者同时站在两端。
    然后,发送者会以Buffer的形式,不断往Channel里发送消息。
    直到Channel的容量满了才阻塞。
    这时,只要接受方接收了消息(即Channel有剩余容量了),发送者就会继续发送消息。

    创建语法:make(chan [type], Int)

    retChannel := make(chan string, 1) // 创建有缓冲channel,并指明channel中的数据为string
    

    输入语法:channel <-

    channel <- object // channel输入
    

    获取语法:<- channel

    object <- channel // channel输出
    

    Demo:模拟了一个网络请求的方法调用过程,通过Channel来控制当前协程在网络请求的等待过程中,去执行别的任务。

    // 模拟网络请求
    func serviceTask() string {
        fmt.Println("- start working on service task.")
        time.Sleep(time.Millisecond * 50)
        return "- service task is Done."
    }
    
    // 别的任务
    func otherTask() {
        fmt.Println("start working on something else.")
        time.Sleep(time.Millisecond * 100)
        fmt.Println("other task is Done.")
    }
    
    // csp异步管道
    func AsyncService() chan string {
        retChannel := make(chan string) // 无缓冲channel,创建并指明channel中的数据为string,双端等待
        // retChannel := make(chan string, 1) // 有缓冲channel,创建并指明channel中的数据为string
        go func() {
            ret := serviceTask()
            fmt.Println("returned result.")
            retChannel <- ret // channel输入
            fmt.Println("service exited.")
        }()
        return retChannel
    }
    
    func TestAsyncService(t *testing.T) {
        retCh := AsyncService()
        otherTask()
        fmt.Println(<-retCh) // channel输出
        time.Sleep(time.Second * 1)
    }
    

    四、多路选择和超时控制

    使用select关键字,完成“多路选择”与“超时控制”。

    • 多路选择:
      当返回的channel可能有多个时,可以使用select来处理多路的响应事件。

    注意:这里与switch有点像,但是要注意的是,它并不是顺序判断的。也就是如果channel1channel2同时满足时,可能走的是channel1、也可能是channel2,并不像switch一样做顺序的判断。

    Demo:

        select {
        case ret := <-channel1: 
            t.Log(ret)
        case ret:= <- channel2:
            t.Log(ret)
        case default:
            t.Error("No one returned.")
        }
    
    • 超时控制:

    同时,我们也可以设置一个超时等待的一个分路,当channel超时还未返回时,可以执行相应的代码。

    Demo:

        select {
        case ret := <-AsyncService(): //正常返回
            t.Log(ret)
        case <-time.After(time.Millisecond * 100): // 超时等待
            t.Error("time out")
        }
    

    五、channel的关闭和广播

    要点如下:

    1. 向已经closechannel发消息,会导致程序panic
    2. v, ok <- channel
      其中,okbool值,
      ok==true时,表示channel处于open状态。
      ok==false时,表示channel处于close状态。
    3. 所有channel接收者在channel关闭时,都会立刻从阻塞等待中返回,且ok值为false。(PS:广播机制,通常被利用向多个订阅者同时发送信号。如,退出信号。)

    Demo:

    // 消息生产者
    func dataProducer(ch chan int, wg *sync.WaitGroup) {
        go func() {
            for i := 0; i < 10; i++ {
                ch <- i
            }
    
            fmt.Println("channel close.")
            close(ch) // 关闭channel
    
            wg.Done()
        }()
    }
    
    // 消息接收者
    func dataReceiver(ch chan int, wg *sync.WaitGroup) {
        go func() {
            for {
                if data, ok := <-ch; ok { // 有消息就打印,直到channel被close。
                    fmt.Println(data)
                } else {
                    fmt.Println("Receiver close.")
                    break // channel被close
                }
            }
            wg.Done()
        }()
    }
    
    func TestCloseChannel(t *testing.T) {
        var wg sync.WaitGroup
        ch := make(chan int)
        wg.Add(1)
        dataProducer(ch, &wg) // 开启生产者
        wg.Add(1)
        dataReceiver(ch, &wg) // 开启消费者
        wg.Wait()
    }
    

    六、任务的取消

    通过上面的close channel(广播机制),我们可以延伸一下,通过close channel通知所有channel取消当前的任务。

    Demo如下:

    func isCancelled(cancelChan chan struct{}) bool {
        select {
        case <-cancelChan:
            return true
        default:
            return false
        }
    }
    
    // 只能取消单个channel
    func cancel_1(cancelChan chan struct{}) {
        cancelChan <- struct{}{}
    }
    
    // 所有channel全部取消
    func cancel_2(cancelChan chan struct{}) {
        close(cancelChan)
    }
    
    func TestCancel(t *testing.T) {
        cancelChan := make(chan struct{}, 0) // 创建了一个channal,通过它来控制事件取消
        for i := 0; i < 5; i++ {             // 开启5个协程
            go func(i int, chanclCh chan struct{}) { // 每个协程里面都有一个死循环,去等待取消消息
                for {
                    if isCancelled(cancelChan) {
                        break
                    }
                    time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
                }
                fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
            }(i, cancelChan)
        }
        cancel_2(cancelChan) // 通知所有channel关闭。
        time.Sleep(time.Second * 1)
    }
    

    七、Context与关联任务取消

    刚才我们通过close channel来取消任务,但会有些问题。
    比如,当一个任务被取消后,它所关联的子任务也应该被立即取消。

    为了解决这个问题,go 1.9.0之后,golang加入了context,来保证关联任务的取消。

    1. Context

    context就是用于管理相关任务的上下文,包含了共享值的传递,超时,取消通知。

    结构体如下:

    type Context interface {
        Deadline() (deadline time.Time, ok bool)
        Done() <-chan struct{}
        Err() error
        Value(key interface{}) interface{}
    }
    
    1. Deadline会返回一个超时时间,Goroutine获得了超时时间后,例如可以对某些io操作设定超时时间。
    2. Done方法返回一个信道(channel),当Context被撤销或过期时,该信道是关闭的,即它是一个表示Context是否已关闭的信号。
    3. Done信道关闭后,Err方法表明Context被撤的原因。
    4. Value可以让Goroutine共享一些数据,当然获得数据是协程安全的。但使用这些数据的时候要注意同步,比如返回了一个map,而这个map的读写则要加锁。

    要点:

    • 根Context:通过context.Background()创建。
    • 子Context:通过context.WithCancel(parentContext)创建。
    • 当前Context被取消时,基于它的子context都会被取消。
    • 接收取消通知: <-ctx.Done

    2. 关联任务取消

    我们把刚才的例子稍加调整,通过context来取消所有关联的任务。

    • 首先,创建一个context
    ctx, cancel := context.WithCancel(context.Background()) // 创建一个子context
    
    • 编写一个取消方法,把context作为参数。
    func isCancelled(ctx context.Context) bool {
        select {
        case <-ctx.Done():
            return true
        default:
            return false
        }
    }
    
    • 开五个协程死循环,每个协程里面都有一个死循环,等待取消任务消息。再调用cancel方法。
    for i := 0; i < 5; i++ {                                // 开启5个协程
            go func(i int, ctx context.Context) { // 每个协程里面都有一个死循环,去等待取消消息
                for {
                    if isCancelled(ctx) {
                        break
                    }
                    time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
                }
                fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
            }(i, ctx)
        }
        cancel() // 取消ctx
    

    完整示例代码如下:

    func isCancelled(ctx context.Context) bool {
        select {
        case <-ctx.Done():
            return true
        default:
            return false
        }
    }
    
    func TestCancel(t *testing.T) {
        ctx, cancel := context.WithCancel(context.Background()) // 创建一个子context
        for i := 0; i < 5; i++ {                                // 开启5个协程
            go func(i int, ctx context.Context) { // 每个协程里面都有一个死循环,去等待取消消息
                for {
                    if isCancelled(ctx) {
                        break
                    }
                    time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
                }
                fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
            }(i, ctx)
        }
        cancel() // 取消ctx
        time.Sleep(time.Second * 1)
    }
    

    八、常见并发任务(实战)

    1. 只执行一次(单例模式)

    场景:在多协程的情况下,保证某段代码只执行一次。

    type Singleton struct {
        data string
    }
    
    var singleInstance *Singleton
    var once sync.Once
    
    func GetSingletonObj() *Singleton {
        once.Do(func() {
            fmt.Println("Create Obj")
            singleInstance = new(Singleton)
        })
        return singleInstance
    }
    
    func TestGetSingletonObj(t *testing.T) {
        var wg sync.WaitGroup
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func() {
                obj := GetSingletonObj()
                fmt.Printf("%p\n", obj)
                wg.Done()
            }()
        }
        wg.Wait()
    }
    

    2. 仅需任意任务完成

    利用channel管道通信的机制,我们可以再任何一个协程完成任务时,就给对象发消息。

    func runTask(id int) string {
        time.Sleep(10 * time.Millisecond)
        return fmt.Sprintf("The result is from %d", id)
    }
    
    func firstResponse() string {
        numOfRunner := 10
        ch := make(chan string, numOfRunner) // 创建bufferChannel。(如果用channel会导致协程泄漏,剩下9个channel会一直阻塞在系统中。)
        for i := 0; i < numOfRunner; i++ { // 开了10个协程
            go func(i int) {
                ret := runTask(i) // 每个协程去执行任务
                ch <- ret
            }(i)
        }
        return <-ch // 返回channel里的第一个Response。(因为channel是一个先进先出的管道)
    }
    
    func TestFirstResponse(t *testing.T) {
        t.Log(firstResponse()) // 发现每次运行返回的都不一样,会根据协程完成任务的一个顺序返回。
    }
    

    3. 所有任务完成

    刚才,我们介绍了first response,接下来我们看一下all response该怎么做。思路是一样的,只要接收到所有channel返回的数据,再返回即可。

    func runTask(id int) string {
        time.Sleep(10 * time.Millisecond)
        return fmt.Sprintf("The result is from %d", id)
    }
    
    func allResponse() string {
        numOfRunner := 10
        ch := make(chan string, numOfRunner) // 创建bufferChannel。
        for i := 0; i < numOfRunner; i++ {   // 开了10个协程
            go func(i int) {
                ret := runTask(i) // 每个协程去执行任务
                ch <- ret
            }(i)
        }
        finalRet := ""
        for j := 0; j < numOfRunner; j++ {
            finalRet += <-ch + "\n"
        }
        return finalRet // 返回channel里的所有的Response。(因为channel是一个先进先出的管道)
    }
    
    func TestAllResponse(t *testing.T) {
        t.Log("Before:", runtime.NumGoroutine()) // 打印一下当前的协程数量
        t.Log(allResponse())                     // 发现每次运行返回的都不一样,会根据协程完成任务的一个顺序返回。
        t.Log("After:", runtime.NumGoroutine()) // 再打印一下当前的协程数量
    }
    

    4. 对象池

    我们可以用buffer channel的管道特性来做一个对象池。

    Demo:

    type ReusableObj struct {
    }
    
    type ObjPool struct {
        bufChan chan *ReusableObj // 用于缓冲可重用对象
    }
    
    // 生产指定数量对象的对象池
    func NewObjPool(numOfObj int) *ObjPool {
        ObjPool := ObjPool{}
        ObjPool.bufChan = make(chan *ReusableObj, numOfObj)
        for i := 0; i < numOfObj; i++ {
            ObjPool.bufChan <- &ReusableObj{}
        }
        return &ObjPool
    }
    
    // 从对象池中获得对象
    func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
        select {
        case ret := <-p.bufChan:
            return ret, nil
        case <-time.After(timeout): // 超时控制
            return nil, errors.New("time out")
        }
    }
    
    // 释放对象池里的对象
    func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
        select {
        case p.bufChan <- obj:
            return nil
        default:
            return errors.New("overflow")
        }
    }
    
    func TestObjPool(t *testing.T) {
        pool := NewObjPool(10) // 生产一个10容量大小的对象池
        for i := 0; i < 10; i++ {
            if v, err := pool.GetObj(time.Second * 1); err != nil { // 获取obj
                t.Error(err)
            } else {
                fmt.Printf("%T\n", v)                      // 获取成功,答应日志。
                if err := pool.ReleaseObj(v); err != nil { // 释放obj
                    t.Error(err)
                }
            }
        }
        fmt.Println("Done.")
    }
    

    5. sync.pool对象缓存

    我们可以通过sync.pool做对象缓存(创建、获取、缓存的策略)。

    对象获取策略:
    1. 首先,尝试从私有对象获取。

    2. 其次,如果私有对象不存在,就尝试从当前Process的共享池获取。

    3. 如果当前Process的共享池是空的,就尝试从其他Process的共享池获取。

    4. 如果所有Process的共享池都是空的,就从sync.pool指定的New方法中“New”一个新的对象返回。

    sync.pool缓存对象的生命周期:
    • 每一次GC(垃圾回收)都会清除sync.pool的缓存对象。

    • 因此,对象缓存的有效期为下一次GC之前。

    基本使用:

    func TestSyncPool(t *testing.T) {
        pool := &sync.Pool{
            New: func() interface{} { // 创建一个新的对象
                fmt.Println("Create a new object.")
                return 100
            },
        }
    
        v := pool.Get().(int) // 获取对象
        fmt.Println(v)
        pool.Put(3) // 放回对象
        // runtime.GC() // 触发GC,会清除sync.pool中缓存的对象
        v1, _ := pool.Get().(int)
        fmt.Println(v1)
    }
    

    多协程下的使用:

    func TestSyncPoolInMultiGroutine(t *testing.T) {
        pool := &sync.Pool{
            New: func() interface{} {
                fmt.Println("Create a new object.")
                return 10
            },
        }
    
        pool.Put(100)
        pool.Put(100)
        pool.Put(100)
    
        var wg sync.WaitGroup
        for i := 0; i < 10; i++ {// 创建10个协程
            wg.Add(1) 
            go func(id int) {
                fmt.Println(pool.Get()) // 获取对象
                wg.Done() 
            }(i)
        }
        wg.Wait()
    }
    
    sync.pool的优点与问题:
    • 优点:通过sync.pool降低复杂对象的创建和GC代价。

    • 问题:sync.pool会被GC回收,并且在并发使用中需要考虑加锁。因此,在程序中要做好取舍。(考虑是创建一个对象的代价大?还是用sync.pool加锁缓存复用的代价大?)


    最后,本系列我是在蔡超老师的技术分享下总结、实战完成的,
    感谢蔡超老师的技术分享

    PS:另附上,分享链接:《Go语言从入门到实战》
    祝大家学有所成,工作顺利。谢谢!

    相关文章

      网友评论

        本文标题:Go语言基础(五)—— 并发编程

        本文链接:https://www.haomeiwen.com/subject/fpzhxhtx.html