美文网首页程序员
Golang并发编程套路

Golang并发编程套路

作者: 小餐包 | 来源:发表于2022-06-30 11:56 被阅读0次

    同步机制

    由于并发编程没法直接保证协程的执行顺序,因此需要一个同步的机制来进行同步通信,以确保各个协程中的任务处于特定的状态再进行特定的后续操作。

    go中实现同步的方案大体有两种,一种是使用channel机制,另一种则是使用sync.WaitGroup对象进行计数。

    Channel同步

    使用无缓冲channel:

    func main() {
        done := make(chan int)
    
        go func(){
            println("你好, 世界")
            done <- 1
        }()
    
        <-done
    }
    

    无缓存的Channel上的发送操作总在对应的接收操作完成前发生。

    因此无缓冲区的channel又叫同步channel,它会保证读写是个同步操作。

    使用有缓冲channel的写法:

    func main() {
        done := make(chan int, 1)
    
        go func(){
            println("你好, 世界")
            done <- 1
        }()
    
        <-done
    }
    

    使用有缓冲的channel时必须注意空channel只会阻塞读操作,不会阻塞写操作,比如下面的写法就是无效的:

    func main() {
        done := make(chan int)
    
        go func(){
            println("你好, 世界")
            <-done
        }()
    
        done <- 1
    }
    

    sync.WaitGroup同步

    另一种常用方式是使用sync包提供的WaitGroup对象,如下:

    package main
    
    import "sync"
    
    func main() {
        var wg sync.WaitGroup
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func() {
                println("你好, 世界")
                wg.Done()
            }()
        }
        wg.Wait()
    }
    

    生产者-消费者模式

    并发编程中最常见的例子就是生产者消费者模式,该模式主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度,一个多生产者和消费者的例子:

    package main
    
    import (
        "fmt"
        "math/rand"
        "sync"
        "time"
    )
    
    var wg sync.WaitGroup
    var producerWg sync.WaitGroup
    var mu sync.Mutex
    
    type Info struct {
        count int32
    }
    
    func countNums() map[int]*Info {
        ch := make(chan int, 1000)
        res := map[int]*Info{}
        for i := 0; i < 100; i++ {
            producerWg.Add(1)
            go func(out chan int, n int) {
                defer producerWg.Done()
                for i := 0; i < n; i++ {
                    out <- rand.Int() % 10
                }
            }(ch, 10000)
        }
    
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(in chan int) {
                defer wg.Done()
                for key := range in {
                    mu.Lock()
                    if _, ok := res[key]; ok {
                        res[key].count++
                    } else {
                        res[key] = &Info{
                            count: 1,
                        }
                    }
                    mu.Unlock()
                } 
            }(ch)
        }
        producerWg.Wait()
        close(ch)
        wg.Wait()
        return res
    }
    
    func main() {
        start := time.Now()
        res := countNums()
        fmt.Println(time.Since(start))
        for k, v := range res {
            fmt.Printf("%d: %d\n", k, v.count)
        }
    }
    
    

    安全的退出

    如果是持续运行的任务希望通过Ctrl+C这种中断信号来退出的话可以用下面的方式实现:

    // Ctrl+C 退出
        sig := make(chan os.Signal, 1)
        signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
        fmt.Printf("quit (%v)\n", <-sig)
    

    线程安全

    先看一个线程不安全的累加器实现:

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    var x int32
    var wg sync.WaitGroup
    
    func UnsafeAdd(n int) {
        for i := 0; i < n; i++ {
            x += 1
        }
        wg.Done()
    }
    
    func AddWrapper(adder func(int)) {
        start := time.Now()
        wg.Add(2)
        go adder(1000000)
        go adder(1000000)
        wg.Wait()
        fmt.Println(x)
        fmt.Println(time.Since(start))
    }
    
    func main() {
        AddWrapper(UnsafeAdd)
    }
    
    

    如果直接运行程序,可能看不出问题。但是在运行时带上-race参数,就会发现如下错误:

    go run -race .\play
    ==================
    WARNING: DATA RACE
    Read at 0x00000065111c by goroutine 8:
      main.UnsafeAdd()
          E:/Dev/gogogo/play/play.go:14 +0x4e       
    
    Previous write at 0x00000065111c by goroutine 7:
      main.UnsafeAdd()
          E:/Dev/gogogo/play/play.go:14 +0x68       
    
    Goroutine 8 (running) created at:
      main.AddWrapper()
          E:/Dev/gogogo/play/play.go:30 +0x41
    
    Goroutine 7 (running) created at:
      main.AddWrapper()
          E:/Dev/gogogo/play/play.go:22 +0x95
      main.main()
          E:/Dev/gogogo/play/play.go:30 +0x41
    ==================
    

    意思并发存在数据竞争,不是线程安全的实现,那么要实现线程安全,我们有什么办法呢?

    sync.Mutex锁机制

    常见的思路是通过加锁来保证同一时刻只有一个线程能对数据进行操作,sync.Mutex提供了简单的锁机制(如果读频繁的场景可以使用读写锁RWMutex,这里的场景是写频繁场景,因此直接使用Mutex),代码实现如下:

    func MutexAdd(n int) {
        for i := 0; i < n; i++ {
            mu.Lock()
            x += 1
            mu.Unlock()
        }
        wg.Done()
    }
    

    sync/atomic包的原子操作

    上面的线程安全都是通过加锁的机制来保证的,实际上对于简单的数据的读写,Go还提供了一种原子操作包来保证线程安全,比如上述例子的实现可以这样:

    func AtomicAdd(n int) {
        for i := 0; i < n; i++ {
            atomic.AddInt32(&x, 1)
        }
        wg.Done()
    }
    

    所谓的原子操作就是并发编程中“最小的且不可并行化”的操作。从线程角度看,在当前线程修改共享资源期间,其它的线程是不能访问该资源的。原子操作对于多线程并发编程模型来说,不会发生有别于单线程的意外情况,共享资源的完整性可以得到保证。并且,由于原子操作低层是通过CPU指令的硬件层面实现,相比使用锁实现的线程安全机制开销更低。

    比较两种实现的运行时间,输出如下,可以看到原子操作确实较锁实现更加高效:

    func main() {
        // AddWrapper(UnsafeAdd)
        AddWrapper(MutexAdd)
        AddWrapper(AtomicAdd)
    }
    
    // output
    2000000
    36.0276ms
    4000000
    24.003ms
    

    附完整代码:

    package main
    
    import (
        "fmt"
        "sync"
        "sync/atomic"
        "time"
    )
    
    var x int32
    var wg sync.WaitGroup
    var mu sync.Mutex
    
    func UnsafeAdd(n int) {
        for i := 0; i < n; i++ {
            x += 1
        }
        wg.Done()
    }
    
    func MutexAdd(n int) {
        for i := 0; i < n; i++ {
            mu.Lock()
            x += 1
            mu.Unlock()
        }
        wg.Done()
    }
    
    func AtomicAdd(n int) {
        for i := 0; i < n; i++ {
            atomic.AddInt32(&x, 1)
        }
        wg.Done()
    }
    
    func AddWrapper(adder func(int)) {
        start := time.Now()
        wg.Add(2)
        go adder(1000000)
        go adder(1000000)
        wg.Wait()
        fmt.Println(x)
        fmt.Println(time.Since(start))
    }
    
    func main() {
        // AddWrapper(UnsafeAdd)
        AddWrapper(MutexAdd)
        AddWrapper(AtomicAdd)
    }
    
    

    相关文章

      网友评论

        本文标题:Golang并发编程套路

        本文链接:https://www.haomeiwen.com/subject/klyfbrtx.html