并发进阶

作者: 帅气的昵称都有人用了 | 来源:发表于2019-04-09 16:57 被阅读3次

    前面讲述了一些Go语言中并发的基础内容,今天来讲一下Go语言并发的进阶内容。

    多核并行化

    我们在执行并行计算的时候,虽然我们使用了多个goroutine,但这并不意味着执行的时间就会降低,这是因为当前版本的Go语言编译器还不能很智能的去发现和利用多核的优势,我们所设定的多个goroutine,他们都是在一个CPU上执行的,在一个goroutine得到时间片执行的时候,其他goroutine都会处于等待状态。
    但是我们可以自己进行设置环境变量GOMAXPROCS来控制使用多少个CPU核心,具体代码如下:

    runtime.GOMAXPROCS(16)
    

    但我们到底该设置多少个核心呢,其实在runtime包中还有一个NumCPU()函数可以帮助我们获取核心CPU的数量。


    协程同步

    通道是可以被关闭的,但他们并不是每次在执行时都需要关闭,只有发送者才需要关闭通道,接受者永远都不会需要。

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
        ch := make(chan string)
        go sendData(ch)
        go getData(ch)
        time.Sleep(1e9)
    }
    
    func sendData(ch chan string) {
        ch <- "New York"
        ch <- "Beijing"
        ch <- "Tokyo"
        ch <- "Paris"
        ch <- "London"
    }
    
    func getData(ch chan string) {
        var input string
        for {
            input = <- ch
            fmt.Printf("%s ", input)
        }
    }
    /** The result is:
    New York Beijing Tokyo Paris London 
     */
    

    那么问题是如何在通道的sendData()完成时发送一个信号,getData()又如何检测到通道是否关闭或阻塞?
    第一种方法是使用close(ch)函数:

    ch := make(chan float64)
    defer close(ch)  
    

    第二种方法可以使用逗号,ok的操作符:

    v, ok := <-ch  // 如果v接收到一个值,ok的值则为true
    

    通常和if语句一同使用

    if v, ok := <-ch; ok {
      process(v)
    }
    

    或者是当关闭或者阻塞的时候使用break

    v, ok := <-ch
    if !ok {
      break
    }
    process(v)
    

    当然,我们还可以使用_ = ch <- v来实现非阻塞发送,因为空标识符获取到了发送给ch的所有内容。

    根据以上的几种方法,我们对前面的程序进行一定的修改:

    package main
    
    import (
        "fmt"
    )
    
    func main() {
        ch := make(chan string)
        go sendData(ch)
        getData(ch)
    }
    
    func sendData(ch chan string) {
        ch <- "New York"
        ch <- "Beijing"
        ch <- "Tokyo"
        ch <- "Paris"
        ch <- "London"
        close(ch)
    }
    
    func getData(ch chan string) {
        for {
            input, open := <- ch
            if !open {
                break
            }
            fmt.Printf("%s ", input)
        }
    }
    /** The result is:
    New York Beijing Tokyo Paris London
     */
    

    输出的结果肯定是和前面是一样的,但是我们对原有程序做了如下修改:
    1)现在只有sendData()是协程,getData()main()在同一线程中
    2)在sendData()函数的最后,我们通过close(ch)关闭了通道
    3)在for循环的getData()中,在每次接受通道的数据之前都使用if !open来进行检测
    当然,使用for range语句来读取通道是一个更好的办法,因为这会自动检测通道是否已经关闭:

    for input := range ch {
      process(input)
    }
    

    阻塞和生产者-消费者模式:在通道迭代中,两个协程经常是一个阻塞另外一个。如果程序工作在多核的机器上,大部分时间只用到一个处理器。可以通过使用带缓冲的通道来进行完善。


    协程的恢复

    在协程工作的过程中,如果其中的一个协程因为失败而停止了,但他并不会影响其他协程的工作。

    func server(workchan <-chan *Work) {
        for work := range workchan {
            go safelyDo(work) // goroutine开始工作
        }
    } 
    func safelyDo(work *Work) {
        defer func() {
            if err := recover(); err != nil {
                log.Printf("%s failed in %v", err, work)
            }
        }()
        do(work)
    }
    

    比如在这个程序当中,如果do(work)发生了panic,错误就会被记录且协程会退出并释放,而其他协程不受影响。而且这段代码中safelyDo(work *Work)函数中加入了恢复模式,函数do可以通过调用panic来摆脱不好的情况。但是恢复是在panic的协程内部的,它并不能被另一个协程恢复。

    相关文章

      网友评论

        本文标题:并发进阶

        本文链接:https://www.haomeiwen.com/subject/lpqiiqtx.html