美文网首页Golang干货
Go语言学习笔记七(并发编程)

Go语言学习笔记七(并发编程)

作者: Jabir_Zhang | 来源:发表于2019-08-10 23:45 被阅读0次

    协程机制

    Golang 线程和协程的区别

    备注:需要区分进程、线程(内核级线程)、协程(用户级线程)三个概念。

    进程、线程 和 协程 之间概念的区别

    对于进程、线程,都是有内核进行调度,有CPU时间片的概念,进行抢占式调度(有多种调度算法)

    对于协程(用户级线程),这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的,因为是由用户程序自己控制,那么就很难像抢占式调度那样做到强制的 CPU 控制权切换到其他进程/线程,通常只能进行协作式调度,需要协程自己主动把控制权转让出去之后,其他协程才能被执行到。

    goroutine 和协程区别

    本质上,goroutine 就是协程。 不同的是,Golang 在 runtime、系统调用等多方面对 goroutine 调度进行了封装和处理,当遇到长时间执行或者进行系统调用时,会主动把当前 goroutine 的CPU (P) 转让出去,让其他 goroutine 能被调度并执行,也就是 Golang 从语言层面支持了协程。Golang 的一大特色就是从语言层面原生支持协程,在函数或者方法前面加 go关键字就可创建一个协程。

    func TestGroutine(t *testing.T) {
        for i := 0; i < 10; i++ {
            go func(i int) {
                fmt.Println(i)
            }(i)
        }
        time.Sleep(time.Millisecond * 50)
    }
    

    这里面额外在内部func里传了i,注意这里如果不传i,直接用的话是不可行的,因为使用go创建协程,但是需要注意的是:协程函数的par作为参数是外部i的数据拷贝。

    其他方面的比较
    1. 内存消耗方面
    • 每个 goroutine (协程) 默认占用内存远比 Java 、C 的线程少。
      goroutine:2KB
      线程:8MB
    1. 线程和 goroutine 切换调度开销方面
    • 线程/goroutine 切换开销方面,goroutine 远比线程小
    • 线程:涉及模式切换(从用户态切换到内核态)、16个寄存器、PC、SP...等寄存器的刷新等。
    • goroutine:只有三个寄存器的值修改 - PC / SP / DX.

    共享内存并发机制

    先来看这样一段代码

    func TestCounter(t *testing.T) {
        counter := 0
        for i := 0; i < 5000; i++ {
            go func() {
                counter++
            }()
        }
        time.Sleep(1 * time.Second)
        t.Logf("counter = %d", counter)
    
    }
    

    正常来说counter应该最后的结果是5000,但是实际结果每次都不相同且都小于5000。其中存在两个问题,第一个协程不安全,需要对协程加锁,因为每个协程都在修改counter;第二个因为是异步,主协程并不会等待所有子协程的结束,因此不能保证打印结果的时候,counter加到5000。
    那如何保证最后的结果输出是5000呢?可以做以下两个操作

    Lock(锁)

    很明显保证线程安全就需要锁,看一下Go的锁如何使用:

    func TestCounterThreadSafe(t *testing.T) {
        var mut sync.Mutex
        counter := 0
        for i := 0; i < 5000; i++ {
            go func() {
                defer func() {
                    mut.Unlock()
                }()
                mut.Lock()
                counter++
            }()
        }
        time.Sleep(1 * time.Second)
        t.Logf("counter = %d", counter)
    
    }
    
    WaitGroup

    多个协程等待全部执行完毕,可以用WaitGroup:

    func TestCounterWaitGroup(t *testing.T) {
        var mut sync.Mutex
        var wg sync.WaitGroup
        counter := 0
        for i := 0; i < 5000; i++ {
            wg.Add(1)
            go func() {
                defer func() {
                    mut.Unlock()
                }()
                mut.Lock()
                counter++
                wg.Done()
            }()
        }
        wg.Wait()
        t.Logf("counter = %d", counter)
    
    }
    

    CSP并发机制

    CSP vs. Actor

    传统的并发模型主要分为 Actor 模型和CSP模型,CSP模型全称为 communicating sequential processes,CSP 模型由并发执行实体(进程,线程或协程),和消息通道组成,实体之间通过消息通道发送消息进行通信。

    • 和Actor的直接通讯不同,CSP模式则是通过channel进⾏通讯的,channel作为中间者,更松耦合⼀些。
    • Go中channel是有容量限制并且独⽴于处理Groutine,⽽如Erlang,Actor模式 中的mailbox容量是⽆限的,接收进程也总是被动地处理消息。
    channel分为两种:unbuffered channelbuffered channel
    unbuffered channel
    如图unbuffered channel中,通信双方同时都在Channel上,否则协程会阻塞,直到双方完成通信。
    buffered channel
    如图buffered channel中,我们给channel设置一个容量,只要容量未满,发送消息者就可以往里面放消息,相反对于接收方,只要里面有消息,他就可以取。放容量满了以后,发送方只能等接收方取走一条消息后才可以发送,接收方也只能等里面有消息后才会去取。
    channel 的基本操作

    先讲一下channel的基本操作:

    //创建channel
    ch := make(chan int)
    
    // 写入channel
    ch <- x
    
    // 从channel读取
    x <- ch
    
    // another way to read
    x = <- ch
    
    //关闭channel
    close(ch)
    

    channel 一定要初始化后才能进行读写操作,否则会永久阻塞。

    func service() string {
        time.Sleep(time.Millisecond * 50)
        return "Done"
    }
    
    func otherTask() {
        fmt.Println("working on something else")
        time.Sleep(time.Millisecond * 100)
        fmt.Println("Task is done.")
    }
    
    func AsyncService() chan string {
        retCh := make(chan string)
        go func() {
            ret := service()
            fmt.Println("returned result.")
            retCh <- ret
            fmt.Println("service exited.")
        }()
        return retCh
    }
    
    func TestAsynService(t *testing.T) {
        fmt.Println(service())
        otherTask()
        retCh := AsyncService()
        otherTask()
        fmt.Println(<-retCh)
        time.Sleep(time.Second * 1)
    }
    =======打印结果=======
    Done
    working on something else
    Task is done.
    working on something else
    returned result.
    Task is done.
    Done
    service exited.
    

    TestAsynService方法中前两行是串行的,大家可见打印结果即使有延时,也是按顺序打印。AsyncService的方法返回了一个channel,此channel是unbuffered channel,因此会协程阻塞,因此打印“service exited”会一直等到打印“Done”之后。

    func AsyncService() chan string {
        retCh := make(chan string, 1)
        go func() {
            ret := service()
            fmt.Println("returned result.")
            retCh <- ret
            fmt.Println("service exited.")
        }()
        return retCh
    }
    =======打印结果=======
    Done
    working on something else
    Task is done.
    working on something else
    returned result.
    service exited.
    Task is done.
    Done
    

    我把AsyncService里面初始化的channel变为buffered channel,可见打印“service exited.”就不会被阻塞,打印在了“Done”前。

    channel的关闭

    有关 channel 的关闭,你需要注意以下事项:

    • 关闭一个未初始化(nil) 的 channel 会产生 panic
    • 重复关闭同一个 channel 会产生 panic
    • 向一个已关闭的 channel 中发送消息会产生 panic
    • 从已关闭的 channel 读取消息不会产生 panic,且能读出 channel 中还未被读取的消息,若消息均已读出,则会读到类型的零值。从一个已关闭的 channel 中读取消息永远不会阻塞。
      v, ok <-ch; ok 为 bool 值,true 表示正常接受,false 表示通道关闭
    • 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息
    func dataProducer(ch chan int, wg *sync.WaitGroup) {
        go func() {
            for i := 0; i < 10; i++ {
                ch <- i
            }
            close(ch)
            wg.Done()
        }()
    }
    
    func dataReceiver(ch chan int, wg *sync.WaitGroup) {
        go func() {
            for {
                if data, ok := <-ch; ok {
                    fmt.Println(data)
                } else {
                    break
                }
            }
            wg.Done()
        }()
    }
    
    func TestCloseChannel(t *testing.T) {
        var wg sync.WaitGroup
        ch := make(chan int)
        wg.Add(1)
        dataProducer(ch, &wg)
        wg.Add(1)
        dataReceiver(ch, &wg)
        wg.Wait()
    }
    

    多路选择和超时

    select是go语言中常用的一个关键字,官方解释:select语句用来选择哪个case中的发送或接收操作可以被立即执行。它类似于switch语句,但是它的case涉及到channel有关的I/O操作。

    多路选择

    对于select的理解有以下几点:

    • 每个case都必须是一个通信
    • 所有channel表达式都会被求值
    • 所有被发送的表达式都会被求值
    • 如果任意某个通信可以进行,它就执行;其他被忽略。
    • 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。否则:
      1. 如果有default子句,则执行该语句。
      2. 如果没有default子句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。
    //channel
    func AsyncService() chan string {
        retCh := make(chan string)
        go func() {
            time.Sleep(time.Millisecond * 50)
            retCh <- "Done"
        }()
        return retCh
    }
    
    //测试select含有default
    func TestSelectDefault(t *testing.T) {
        select {
        case ret := <- AsyncService():
            t.Logf("result: %s", ret)
        default:
            t.Error("No one returned")
        }
    }
    ---------------------------------------------------------
    打印结果:No one returned
    ---------------------------------------------------------
    
    //测试select不含有default
    func TestSelect(t *testing.T) {
        select {
        case ret := <- AsyncService():
            t.Logf("result: %s", ret)
        }
    }
    ---------------------------------------------------------
    打印结果:result: Done
    ---------------------------------------------------------
    

    打印结果发现有default子句,执行default;如果没有default子句,select将阻塞,直到channel返回值。

    超时

    select可以设置超时,具体代码如下

    func AsyncService() chan string {
        retCh := make(chan string)
        go func() {
            time.Sleep(time.Millisecond * 500)
            retCh <- "Done"
        }()
        return retCh
    }
    
    func TestTimeOut(t *testing.T) {
        select {
        case ret := <- AsyncService():
            t.Logf("result: %s", ret)
        case <- time.After(time.Millisecond * 100):
            t.Error("time out")
        }
    }
    ---------------------------------------------------------
    打印结果:time out
    ---------------------------------------------------------
    

    因为channel延迟了500毫秒,因此超时了,所以走到了超时的case中;如果把上面延迟500毫秒改成50毫秒,则正常走到了打印channel的case中。
    另外,select是可以使用break,case中使用了break后,走到此case中执行到break后就不执行break之后的代码。

    Context 与任务取消

    树状图
    如何取消中间任务handle(Req1),并且同时取消子任务,这时可以用到Context
    func isCancelled(ctx context.Context) bool {
        select {
        case <-ctx.Done():
            return true
        default:
            return false
        }
    }
    
    func TestCancel(t *testing.T) {
        ctx, cancel := context.WithCancel(context.Background())
        for i := 0; i < 5; i++ {
            go func(i int, ctx context.Context) {
                for {
                    if isCancelled(ctx) {
                        break
                    }
                    time.Sleep(time.Millisecond * 5)
                }
                fmt.Println(i, "Cancelled")
            }(i, ctx)
        }
        cancel()
        time.Sleep(time.Second * 1)
    }
    ---------------------------------------------------------
    1 Cancelled
    4 Cancelled
    3 Cancelled
    2 Cancelled
    0 Cancelled
    ---------------------------------------------------------
    
    • 根 Context:通过 context.Background () 创建
    • 子 Context:context.WithCancel(parentContext) 创建
    • ctx, cancel := context.WithCancel(context.Background())
    • 当前 Context 被取消时,基于他的⼦ context 都会被取消
    • 接收取消通知 <-ctx.Done()

    参考

    Golang 之协程详解
    由浅入深剖析 go channel

    相关文章

      网友评论

        本文标题:Go语言学习笔记七(并发编程)

        本文链接:https://www.haomeiwen.com/subject/gqfljctx.html