第八章 并发

作者: 牧码人爱跑马 | 来源:发表于2018-07-20 08:59 被阅读15次

    1. 并发和并行的区别

    并行:多核cpu在同一时间片内并行处理多个任务。
    并发:如单核cpu在多个任务间进行时间片切换,并非同一时间片执行多个任务,只是上下文切换时间很短,看似多个任务并行。

    多线程和多线程是并行的基本前提条件,单线程也可用协程做到并发。

    在golang中是通过goroutine来实现并发的,goroutine并不能简单的归纳为协程,其运行时会创建多个线程来实现并发任务,且任务单元可被调度到其他线程并行执行。所以goroutine更像是多线程和协程的综合体,能最大限度提升执行效率,发挥多核处理能力。

    goroutine

    关键字go并非执行并发操作,而是创建一个并发任务单元。新建任务被放置在系统队列中,等待调度器安排合适的系统线程去获取执行权。
    当前流程不会阻塞,不会等待该任务启动,且运行时也不保证并发任务的执行顺序。

    每个任务单元除保存函数指针、调用参数外,还会分配执行所需的栈内存空间。相比系统默认MB级别的线程栈,goroutine自定义栈仅需2KB,所以才能创建成千上万的并发任务。自定义栈采取按需分配策略,在需要时仅需扩容,最大能到GB规模。

    与defer一样,goroutine也会因延迟执行而立即计算并复制执行参数。

    var c int
    func counter()int{
        c++
        return c
    }
    func main() {
        a:=100
        go func(x,y int) {
            time.Sleep(time.Second)
            fmt.Println("go:",x,y)
        }(a,counter())
        a+=100
        fmt.Println("main:",a,counter())
        time.Sleep(time.Second*3)
    }
    

    输出:

    main: 200 2
    go: 100 1
    

    wait

    进程退出时不会等待并发任务结束,可用channel阻塞,然后发出退出信号。

    func main() {
        exit:=make(chan interface{})  //创建通道。因为仅是通知,此处channel可为任何类型。
        go func() {
            time.Sleep(time.Second)
            fmt.Println("goroutine done")
            close(exit)                      //关闭通道,发出信号。
        }()
    
        fmt.Println("main...")
        <-exit                             //通道关闭则立即解除。
        fmt.Println("main exit")
    }
    

    输出:

    main...
    goroutine done
    main exit
    

    除了关闭通道外,向通道内写入数据也可解除阻塞。channel的更多信息,后面再做详述。

    如要等待多个任务结束,推荐使用sync.WaitGruop。通过设定计数器,让每个goroutine在退出前递减,直至归零时解除阻塞。

    func main() {
        var wg sync.WaitGroup
    
        for i:=0;i<10;i++{
            wg.Add(1)         //累加计数
            go func(id int) {
                defer wg.Done()        //递减计数
                time.Sleep(time.Second)
                fmt.Println("goroutine",id,"done")
    
            }(i)
        }
    
        fmt.Println("main...")
        wg.Wait()                   //阻塞,直到计数归零
        fmt.Println("main exit")
    }
    

    尽管WaitGroup.Add实现了原子操作,但建议在goroutine外累加计数器,以免Add尚未执行,Wait以及推出。

    func main() {
        var wg sync.WaitGroup
        go func() {
            wg.Add(1)           //可以运行试一下,不是每次都能设置上
            defer wg.Done() //递减计数
            fmt.Println("goroutine", "done")
    
        }()
    
        fmt.Println("main...")
        wg.Wait() //阻塞,直到计数归零
        fmt.Println("main exit")
    }
    

    可在多处用Wait阻塞,他们都能接收到通知。上栗就可在go func前加wg.Wait().

    GOMAXPROCS

    运行时可能会创建很多线程,但任何时候仅有限的几个线程参与并发任务执行。该数量默认与CPU核数相等,可用runtime.GOMAXPROCS函数(或环境变量)修改。


    如参数小于1,GOMAXPROCS仅返回当前设置值,不做任何调整。


    import (
        "math"
        "fmt"
        "sync"
        "runtime"
    )
    
    //测试目标函数
    func count(){
        x:=0
        for i:=0;i<math.MaxUint32;i++{
            x+=i
        }
        fmt.Println(x)
    }
    
    //循环执行
    func test(n int){
        for i:=0;i<n;i++{
            count()
        }
    }
    
    //并发执行
    func test2(n int){
        var wg sync.WaitGroup
        wg.Add(n)
        for i:=0;i<n;i++{
            go func() {
                count()
                wg.Done()
            }()
        }
        wg.Wait()
    }
    
    func main() {
        n:=runtime.GOMAXPROCS(0)
        n1:=runtime.NumCPU()
        fmt.Println(n1)
        test(n)
    }
    
        n:=runtime.GOMAXPROCS(0)
        n1:=runtime.NumCPU()
    

    上述两个都可用来获取当前系统的cpu核数。

    Local Storage

    与线程不同,goroutine任务无法设置优先级,无法获取编号,没有局部存储(TLS),甚至连返回值都会被抛弃。但除优先级外,其他功能都很容易实现。

    func main() {
        var wg sync.WaitGroup
        var gs [5]struct{             //用于实现类似TLS功能
            id  int  //编号
            result  int //返回值
        }
        for i:=0;i<len(gs);i++{
            wg.Add(1)
            go func(id int) {   //使用参数避免参数闭包延迟求值
                defer wg.Done()
                gs[id].id = id
                gs[id].result=(id + 1) * 100
            }(i)
        }
        wg.Wait()
        fmt.Printf("%+v\n",gs)
    }
    

    输出:

    [{id:0 result:100} {id:1 result:200} {id:2 result:300} {id:3 result:400} {id:4 result:500}]

    **如使用map作为局部存储容器,建议做同步处理,因为运行时会对其做并发读写检查。

    Gosched

    暂停,释放线程去执行其他任务。当前任务被放回队列,等待下次调度时恢复执行。

    func main() {
        runtime.GOMAXPROCS(1)
        exit:=make(chan struct{})
    
        go func() {    //任务a
            defer close(exit)
    
            go func() {   //任务b,放在此处是为了确保a先执行。
                fmt.Println("b")
            }()
            for i:=0;i<4;i++{
                fmt.Println("a:", i)
                if i==1{
                    runtime.Gosched()
                }
            }
        }()
    
        <-exit
    }
    

    输出:

    a: 0
    a: 1
    b
    a: 2
    a: 3
    

    该函数很少被使用,因为运行时会主动向长时间运行(10ms)的任务发出抢占调度。

    Goexit

    Goexit立即终止当前任务,运行时确保所有已注册延迟调用被执行。该函数不会影响其他并发任务,不会引发panic,自然也就无法捕获。

    func main() {
        exit:=make(chan struct{})
    
        go func() {
            defer close(exit)
            defer println("a")
            func(){
                defer func() {
                    println("b",recover() ==nil)   //执行recover返回nil
                }()
                func(){                                 //在多层调用中执行Goexit
                    println("c")
                    runtime.Goexit()                    //立即终止整个调用堆栈
                    println("c done.")            //不会执行
                }()
                println("b done.") //不会被执行
            }()
            println("a done.")  //不会执行
        }()
    
        <- exit
        println("main exit.")
    }
    

    输出:

    c
    b true
    a
    main exit.
    

    如果在main.main里调用Goexit,它会等待其他任务结束,然后让进程直接崩溃。


    无论身处哪一层,Goexit都能立即终止整个调用堆栈,这与return仅退出当前函数不同。 标准库函数os.Exit可终止进程,但不会执行延迟调用。


    2. 通道

    Go并未实现严格的并发安全。
    允许全局变量、指针、引用类型这些非安全内存共享操作,就需要开发人员自行维护数据一致性和完整性。Go鼓励使用CSP通道,以通信代替内存共享,实现并发安全。
    通过消息来避免竟态的模型除了CSP,还有Actor。但两者区别较大。
    作为CSP核心,通道是显式的,要求操作双方必须知道数据类型和具体通道,并不关心另一端操作者身份和数量。可如果另一端未准备妥当,或消息未能及时处理时,会阻塞当前端。
    相比起来,Actor是透明的,它不在乎数据类型及通道,只要知道接收者信箱即可。默认就是异步方式,发送方对消息是否被接收和处理并不关心。

    从底层实现上来说,通道只是一个队列。同步模式下,发送和接收双方配对,然后直接赋值数据给对方。如配对失败,则置入等待队列,直到另一方出现后才被唤醒。异步模式抢夺的则是数据缓冲槽。发送方要求有空槽可供写入,而接收方则要求有缓冲数据可读。需求不符时,同样加入等待队列,直到有另一方写入数据或腾出空槽后被唤醒。
    除传递消息外,通道还被用作时间通知。

    func main() {
        done:=make(chan struct{})
        c:=make(chan string)
        go func() {
            s:=<-c
            fmt.Println(s)
            close(done)
        }()
    
        c<-"hello!"
        <-done   //阻塞,直到有数据或通道关闭
    }
    

    同步模式必须有配对操作的goroutine出现,否则会一直阻塞。而异步模式在缓冲区未满或数据未读完前,不会阻塞。

    func main() {
        c:=make(chan int,3)     //创建带三个缓冲槽的异步通道
        c<-1                    //缓冲区未满,不会阻塞
        c<-2
    
        println(<-c)
        println(<-c)
    }
    输出:
    1
    2
    

    多数时候,异步通道有助于提升性能,减少排队阻塞。

    缓冲区大小仅仅是内部属性,不属于类型组成部分。另外通道变量本身就是指针,可用相等操作符判断是否为同一对象或nil。

    func main(){
        var a,b chan int = make(chan int,3),make(chan int)
        var c chan bool
        fmt.Println(a==b)
        fmt.Println(c==nil)
    
        fmt.Printf("%p,%d\n",a,unsafe.Sizeof(a))
    }
    输出:
    false
    true
    0xc04207a000,8
    

    虽然可传递指针来避免数据复制,但须额外注意数据并发安全。
    内置函数cap和len返回缓冲区大小和当前已缓冲数量;而对于同步通道则都返回0,据此可判断通道是同步还是异步。

    func main(){
        a,b:=make(chan int),make(chan int,3)
        b<-1
        b<-2
        println("a",len(a),cap(a))
        println("b",len(b),cap(b))
    }
    

    输出:

    a 0 0
    b 2 3
    

    收发

    除使用简单的发送和接收操作符外,还可用ok-idom或range模式处理数据。

    func main() {
        done :=make(chan struct{})
        c:=make(chan int)
    
        go func() {
            defer close(done)
            for{
                x,ok:=<-c
                if !ok{               //据此判断通道是否关闭
                    return
                }
                fmt.Println(x)
            }
        }()
    
        c<-1
        c<-2
        c<-3
        close(c)
        <-done
    
    }
    

    输出:1,2,3

    对于循环接收数据,range模式更简洁一些。

    [...]
        go func() {
            defer close(done)
            for x:=range c{       //循环获取消息,直到通道被关闭。
                println(x)
            }
        }()
    
    [...]
    

    及时用close函数关闭通道引发结束通知,否则可能会导致死锁。
    通知可以是群体性的。也未必就是通知结束,可以是任何需要表达的事件。

    一次性事件用close效率更好,没有多余开销。连续或多样性事件,可传递不同数据标志实现。还可使用sync.Cond实现单播或广播事件。
    对于closed或nil通道,发送或接收操作都有相应规则:

    • 向已关闭通道发送数据,引发panic。
    • 从已关闭通道接收数据,返回已缓冲数据或零值。
    • 无论收发,nil通道都会阻塞。
    func main(){
        c:=make(chan int,3)
        
        c<-10
        c<-20
        close(c)
    
        for i:=0;i<cap(c)+1;i++{
            x,ok:=<-c
            println(i,":",ok,x)
        }
    }
    

    输出:

    0 : true 10
    1 : true 20
    2 : false 0
    3 : false 0
    

    重复关闭,或关闭nil通道都会引发panic错误。

    单向

    通道默认都是双向的,并不区分发送和接收端。但某些时候,我们可限制收发操作的方向来获得更严谨的操作逻辑。

    尽管可用make创建单向通道,但那没有任何意义。通常使用类型转换来获取单向通道,并分别赋予操作双方。

    func main() {
        var wg sync.WaitGroup
        wg.Add(2)
    
        c:=make(chan int)
        var send chan<- int =c
        var recv <-chan int =c
    
        go func() {
            defer wg.Done()
            for x:=range recv{
                println(x)
            }
        }()
    
        go func() {
            defer wg.Done()
            defer close(c)
            for i:=0;i<3;i++{
                send<-i
            }
        }()
    
        wg.Wait()
    
    }
    

    不能在单向通道上做逆向操作。close也不能用于接收端。也无法将单向通道重新转换回去。

    选择

    如要同时处理多个通道,可选用Select语句。它会随机选择一个可用通道做收发操作。

    func main() {
        var wg sync.WaitGroup
        wg.Add(2)
        a, b := make(chan int), make(chan int)
    
        go func() {             //接收端
            defer wg.Done()
            for {
                var (
                    name string
                    x    int
                    ok   bool
                )
                select {
                case x, ok = <-a:            //随机选择可用channel接收数据
                    name = "a"
                case x, ok = <-b:
                    name = "b"
                }
                if !ok {                     //如果任一通道关闭则终止接收。
                    return
                }
                println(name, x)  //输出接收的数据信息
    
            }
        }()
    
        go func() {         //发送端
            defer wg.Done()
            defer close(a)
            defer close(b)
    
            for i := 0; i < 10; i++ {
                select {
                case a <- i:
                case b <- i * 10:
    
                }
            }
    
        }()
    
        wg.Wait()
    }
    

    输出:

    b 0
    a 1
    a 2
    b 30
    a 4
    b 50
    a 6
    a 7
    a 8
    b 90
    

    如果要等全部的通道消息处理结束,可将已完成的通道设置为nil,这样她就会被阻塞,而不再被Select选中。
    以下示例是两个独立的通道,逻辑是等两个通道都结束了收发才最终close,哪个先完成哪个阻塞住在那等待。

    func main() {
        var wg sync.WaitGroup
        wg.Add(3)
        a, b := make(chan int), make(chan int)
    
        go func() { //接收端
            defer wg.Done()
            for {
                select {
                case x, ok := <-a:
                    if !ok {
                        a = nil
                        break
                    }
                    println("a",x)
                case x, ok := <-b:
                    if !ok {
                        b = nil
                        break
                    }
                    println("b", x)
                }
    
    
                if a == nil && b == nil {
                    return
                }
    
            }
        }()
    
        go func() {
            defer wg.Done()
            defer close(a)
    
            for i := 0; i < 10; i++ {
                a <- i
            }
    
        }()
    
        go func() {
            defer wg.Done()
            defer close(b)
    
            for i := 100; i < 105; i++ {
                b <- i
            }
    
        }()
    
        wg.Wait()
    }
    

    输出:

    a 0
    a 1
    b 100
    a 2
    a 3
    a 4
    b 101
    b 102
    b 103
    a 5
    b 104
    a 6
    a 7
    a 8
    a 9
    

    即使是同一通道,也会随机选择case执行。

    func main() {
        var wg sync.WaitGroup
        wg.Add(2)
        c := make(chan int)
    
        go func() { //接收端
            defer wg.Done()
            for {
                var x int
                var ok bool
                select {
                case x, ok = <-c:
                    println("a1",x)
                case x, ok = <-c:
                    println("a2", x)
                }
                if !ok {
                    return
                }
    
            }
        }()
    
        go func() {
            defer wg.Done()
            defer close(c)
    
            for i := 0; i < 10; i++ {
                select {
                case c<-i:
                case c<-i*10:
                }
            }
        }()
        wg.Wait()
    }
    

    输出:

    a1 0
    a1 1
    a2 2
    a2 3
    a1 4
    a1 50
    a1 60
    a1 7
    a2 80
    a2 90
    a2 0
    

    当所有通道都不可用时,Select会执行default语句。如此可避开Select阻塞,但须注意处理外层循环,以免陷入空耗。

    func main() {
        c:=make(chan int)
        done:=make(chan bool)
    
        go func() {
            defer close(done)
            for {
    
                select {
                case x,ok:=<-c:
                    if !ok{
                        return
                    }
                    fmt.Println("data:",x)
                default:                               //避免Select阻塞
    
                }
    
                fmt.Println(time.Now())
                time.Sleep(time.Second)
            }
        }()
        time.Sleep(5*time.Second)
        c<-100
        close(c)
        <-done
    }
    

    输出:略

    也可以用default处理一些默认逻辑。

    func main() {
        done := make(chan struct{})
        data := []chan int{           //数据缓冲区
            make(chan int, 3),
        }
    
        go func() { //生产数据
            defer close(done)
            for i := 0; i < 10; i++ {
                select {
                case data[len(data)-1] <- i:   //生产数据
                default: //数据通道已满则新建chan
                    data = append(data, make(chan int, 3))
                }
            }
    
        }()
        <-done
    
        for x := 0; x < len(data); x++ {
            c := data[x]
            close(c)  //关闭通道后也能从中读取数据
            for i := range (c) {
                fmt.Println(i)
            }
        }
    
    }
    

    输出:

    0
    1
    2
    4
    5
    6
    8
    9
    

    可以看到,channel缓存满了后的第一个数据会被丢弃,直接走default创建新的通道了。

    模式

    通常使用工厂方法将goroutine和通道绑定。

    type receiver struct {
        wg   sync.WaitGroup
        data chan int
    }
    
    func newReceiver() *receiver {
        r := &receiver{
            data: make(chan int),
        }
        r.wg.Add(1)
        go func() {
            defer r.wg.Done()
            for x := range r.data { //接收消息,直到通道关闭
                println("recv:", x)
            }
        }()
        return r
    }
    
    func main() {
    
        r := newReceiver()
        r.data <- 1
        r.data <- 2
        close(r.data)  //关闭通道,发出结束通知
        r.wg.Wait()   //等待接收者处理结束
    
    }
    

    输出:

    recv: 1
    recv: 2
    

    鉴于通道本身就是一个并发安全的队列,可用作ID generator、Pool等用途。

    type pool chan []byte
    
    func newPool(cap int)pool{
        return make(chan []byte,cap)
    }
    
    func (p pool)get() []byte{
        var v []byte
        select {
        case v=<-p:                       //返回
        default:
            v=make([]byte,10)    //返回失败,新建
        }
        return v
    }
    
    func (p pool)put(b []byte){
        select {
        case p<-b:                //放回
        default:            //放回失败,新建
    
        }
    }
    

    用通道实现信号量(semaphore)。

    func main() {
        runtime.GOMAXPROCS(4)
        var wg sync.WaitGroup
    
        sem:=make(chan struct{}, 2)        //最多允许两个并发同时执行
        for i:=0;i<5;i++{
            wg.Add(1)
            go func(id int) {
                defer wg.Done()
                sem<- struct{}{}           // acquire: 获取信号
                defer func() {<-sem}()     //release: 释放信号
                time.Sleep(time.Second * 2)
                fmt.Println(id,time.Now())
            }(i)
        }
        wg.Wait()
    }
    

    标准库time提供了timeout和tick channel实现。

    package main
    
    import (
        "time"
        "fmt"
        "os"
    )
    
    func main() {
        go func() {
            for{
                select {
                case <-time.After(time.Second*5):
                    fmt.Println("timeout...")
                    os.Exit(0)
                }
            }
        }()
    
        go func() {
            tick:=time.Tick(time.Second)
            //for _=range tick{
            //  fmt.Println(time.Now(),"test")
            //}
            for {
                select {
                case <-tick:
                    fmt.Println(time.Now())
                }
            }
        }()
    
        <-(chan struct {})(nil)      //直接用nil channel阻塞进程
    }
    

    捕获INT、TERM信号,顺便实现一个简易的atexit函数。

    atexit函数是一个特殊的函数,它是在正常程序退出时调用的函数,我们把他叫为登记函数(函数原型:int atexit (void (*)(void))):
    ⼀个进程可以登记若⼲个(具体⾃⼰验证⼀下)个函数,这些函数由exit⾃动调⽤,这些函数被称为终⽌处理函数, atexit函数可以登记这些函数。 exit调⽤终⽌处理函数的顺序和atexit登记的顺序相反(网上很多说造成顺序相反的原因是参数压栈造成的,参数的压栈是先进后出,和函数的栈帧相同),如果⼀个函数被多次登记,也会被多次调⽤。
    python中有专门的atexit模块,简介如下:
    从模块的名字也可以看出来,atexit模块主要的作用就是在程序即将结束之前执行的代码,atexit模块使用register函数用于注册程序退出时的回调函数,然后在回调函数中做一些资源清理的操作。
    注意:
    1,如果程序是非正常crash,或通过os._exit()退出,注册的回调函数将不会被调用。
    2,也可以通过sys.exitfunc来注册回调,但通过它只能注册一个回调,而且还不支持参数。
    3,建议使用atexit来注册回调函数。

    import (
        "sync"
        "os"
        "os/signal"
        "syscall"
        "fmt"
    )
    
    //type atexits struct {
    //  sync.WaitGroup
    //  signal chan os.Signal
    //  funcs []func()
    //}
    var exits=&struct {
        sync.RWMutex
        signals chan os.Signal
        funcs []func()
    }{}
    
    func atexit(f func()){
        exits.Lock()
        defer exits.Unlock()
        exits.funcs=append(exits.funcs,f)
    }
    
    func waitExit(){
        if exits.signals==nil{
            exits.signals = make(chan os.Signal)
            signal.Notify(exits.signals,syscall.SIGINT,syscall.SIGTERM)
            fmt.Println("test")
        }
        exits.RLock()
        for _,f:=range exits.funcs{
            defer f()      //延迟调用函数采用FILO顺序执行。即便某些函数panic,延迟调用也能确保后续函数执行。
        }
        fmt.Println("after range exits.funcs")
        exits.RUnlock()
        fmt.Println("after exits.Runlock")
        <-exits.signals
    }
    
    func main() {
        atexit(func() {
            println("exit1...")
        })
        atexit(func() {
            println("exit2...")
        })
        fmt.Println("befor exit")
    
        waitExit()
    }
    

    性能

    将发往通道的数据打包,减少传输次数,可有效提升性能。从实现上来说,通道队列依旧使用锁同步机制,单次获取更多数据(批处理),可改善因频繁加锁造成的性能问题。

    const (
        max     = 500000 //数据统计上限
        block   = 500    //数据块大小
        bufsize = 100    //缓冲区大小
    )
    
    func test() { //普通模式,每次传递一个整数
        done := make(chan struct{})
        c := make(chan int, bufsize)
    
        go func() {
            count := 0
            for x := range c {
                count += x
            }
            close(done)
        }()
    
        for i := 0; i < max; i++ {
            c <- i
        }
        close(c)
        <-done
    }
    
    func testBlock() { //块模式:每次将500个数字打包成块传输
        done := make(chan struct{})
        c:=make(chan [block]int,bufsize)
    
        go func() {
            count:=0
            for a:=range c{
                for _,x:=range a{  //a 是[block]int数组
                    count +=x
                }
            }
            fmt.Println(count)
            close(done)
        }()
    
        for i:=0;i<max;i+=block{
            var b [block]int       //使用数组对数据打包
            for n:=0; n<block;n++{
                b[n] = i+n
                if i+n == max -1{
                    break
                }
            }
    
            c <- b
        }
    
        close(c)
        <-done
    }
    
    

    BenchmarkTest
    虽然单次消耗更多内存,但性能提升非常明显。如将数组改成切片会造成更多内存分配次数。

    资源泄漏

    通道可能会引发goroutine leak,确切地说,是指goroutine处于发送或接受阻塞状态,但一直未被唤醒。垃圾回收器并不手机此类资源,导致它们会在等待队列里长久休眠,形成资源泄漏。

    3. 同步

    通道并不是用来取代锁的,它们有各自不同的应用场景。通道倾向于解决逻辑层次的并发处理架构,而锁则用来保护局部范围内的数据安全。
    标准库sync提供了互斥和读写锁,另有原子操作等。mutex、rwmutex的使用并不复杂,只有几个地方需要注意。
    将Mutex作为匿名字段时,相关方法必须实现为pointer-receiver,否则会因复制导致锁机制失效。

    type data struct {
        sync.Mutex
    }
    
    func (d data)test(s string){
        d.Lock()
        defer d.Unlock()
        for i:=0;i<5;i++{
            fmt.Println(s, i)
            time.Sleep(time.Second)
        }
    }
    func main() {
        var wg sync.WaitGroup
        var d data
        wg.Add(2)
    
        go func() {
            defer wg.Done()
            d.test("Read")
        }()
    
        go func() {
            defer wg.Done()
            d.test("write")
        }()
    
        wg.Wait()
    
    }
    

    上述代码运行后会发现锁机制已失效,解决方案是将data 改为data.
    也可用嵌入
    Mutex来避免复制问题,但那需要专门初始化。

    应将Mutex锁粒度控制在最小范围内,及早释放。

    Mutex不支持递归锁,即锁里面不允许有锁,否则即使在同一goroutine下也会导致死锁。
    在设计并发安全类型时,千万注意此类问题。

    import "sync"
    
    type cache struct {
        sync.Mutex
        data []int
    }
    
    func (c *cache)count()int{
        c.Lock()
        n:=len(c.data)
        c.Unlock()
        return n
    }
    
    func (c *cache)get() int{
        c.Lock()
        defer c.Unlock()
        var d int
        if n:=c.count();n>0{    //锁中套锁
            d=c.data[0]
            c.data=c.data[1:]
        }
        return d
    }
    
    func main() {
    
        c:=cache{data:[]int{1,2,3,4}}
        c.get()
    }
    

    输出:

    fatal error: all goroutines are asleep - deadlock!
    

    相关建议:

    • 对性能要求较高时,应避免使用defer Unlock.
    • 读写并发时,用RWMutex性能会更好一些。
    • 对单个数据读写保护,可尝试用原子操作。
    • 执行严格测试,尽可能打开数据竞争检查。

    相关文章

      网友评论

        本文标题:第八章 并发

        本文链接:https://www.haomeiwen.com/subject/usnnpftx.html