美文网首页
Go语言基础6 - 并发

Go语言基础6 - 并发

作者: 张云飞Vir | 来源:发表于2020-03-12 20:28 被阅读0次

    概述

    我们将用几节来学习Go语言基础,本文结构如下:

    1. 并发
      通过通信共享内存
      Go程
      信道
      信道中的信道
      并行化
      可能泄露的缓冲区
    2. 错误
      Panic
      恢复
    

    1. 并发

    1.1 通过通信共享内存

    在并发编程中,为实现对共享变量的正确访问需要精确的控制,这在多数环境下都很困难。

    实际上,多个独立执行的线程从不会主动共享。Go语言另辟蹊径,它将共享的值通过信道传递, 在任意给定的时间点,只有一个Go程能够访问该值,数据竞争从设计上就被杜绝了。

    例如,引用计数通过为整数变量添加互斥锁来很好地实现。 取而代之的是,通过信道来控制访问能够让你写出更简洁的程序。

    Go将它简化为一句口号:

    不要通过共享内存来通信,而应通过通信来共享内存。

    1.2 Go程

    Go程具有简单的模型:

    • 它是与其它Go程并发运行在同一地址空间的函数。
    • 它是轻量级的, 消耗几乎就只有栈空间的分配。
    • 而且栈最开始是非常小的,所以它们很廉价, 仅在需要时才会随着堆空间的分配(和释放)而变化。

    Go程在多线程操作系统上可实现多路复用,因此若一个线程阻塞,比如说等待I/O, 那么其它的线程就会运行。
    Go程的设计隐藏了线程创建和管理的诸多复杂性。

    在函数或方法前添加 go 关键字能够在新的Go程中调用它。当调用完成后, 该Go程也会安静地退出,示例:

    go list.Sort()  // 并发运行 list.Sort,无需等它结束。
    

    函数字面在Go程调用中非常有用。
    备注:可理解 为匿名函数的调用。下面的方法先声明了一个匿名方法,然后立即调用。

    func Announce(message string, delay time.Duration) {
          go func() {
            time.Sleep(delay)
            fmt.Println(message)
        }()  // 注意括号 - 必须调用该函数。
    }
    

    在Go中,函数字面都是闭包:其实现在保证了函数内引用变量的生命周期与函数的活动时间相同。

    1.3 信道( chan )

    1.3.1 格式: make(chan int)

    信道与映射一样,也需要通过 make 来分配内存,make 后的返回值是对底层数据结构的引用。
    若提供了一个可选的整数形参,它就会为该信道设置缓冲区大小。
    缓冲区大小的默认值是零,表示不带缓冲的或同步的信道。

    示例:

    ci := make(chan int)            // 整数类型的无缓冲信道
    cj := make(chan int, 0)         // 整数类型的无缓冲信道
    cs := make(chan *os.File, 100)  // 指向文件指针的带缓冲信道
    

    无缓冲信道在通信时会同步交换数据,它能确保(两个Go程的)计算处于确定状态。

    1.3.2 阻塞等待Go程( 无缓冲区的示例 )

    示例:使用 go 程,在后台启动了排序操作,等待排序完成。

    c := make(chan int)  // 分配一个信道
    // 在Go程中启动排序。当它完成后,在信道上发送信号。
    go func() {
        list.Sort()
        c <- 1  // 发送信号,什么值无所谓。
    }()
    doSomethingForAWhile()
    <-c   // 等待排序结束,丢弃发来的值。
    
    • 接收者在收到数据前会一直阻塞。
    • 若信道是不带缓冲的,那么在接收者收到值前, 发送者会一直阻塞;
    • 若信道是带缓冲的,则发送者仅在值被复制到缓冲区前阻塞;
    • 若缓冲区已满,发送者会一直等待直到某个接收者取出一个值为止。

    1.3.3 控制吞吐量的例子( 带缓冲的示例 )

    带缓冲的信道可被用作信号量。例如限制吞吐量。

    示例:
    var sem = make(chan int, MaxOutstanding)

      func handle(r *Request) {               
        sem <- 1 // 等待活动队列清空。            #2  占据,阻塞
        process(r)  // 可能需要很长时间。
        <-sem    // 完成;使下一个请求可以运行。    #3  解除占据
      }
    
      func Serve(queue chan *Request) {
        for {
            req := <-queue
            go handle(req)  // 无需等待 handle 结束。 #1 每个请求对应一个 Go程
        }
      }
    

    上面的例子中:

    • 进入的请求 req 会被传递给 handle。
    • handle 中 #2 等待一个信号继续(当缓冲区满时)
    • handle 中 #3 后,发送信号,使得 被阻塞的另一个 go程 开始进入到process
    • 信道缓冲区的容量决定了同时调用 process 的数量上限

    备注:
    这个示例一次开始了全部多个go程,然后根据缓冲区大小阻塞等待,当缓冲区可以进入时继续进行。

    1.3.4 继续改良的例子( 采用匿名方法 )

    若请求来得很快, 上面的程序就会无限地消耗资源。为了弥补这种不足,我们可以通过修改 Serve 来限制创建Go程:

      func Serve(queue chan *Request) {
        for req := range queue {
            sem <- 1
            go func() {
                process(req) // 这儿有Bug,解释见下。
                <-sem
            }()
        }
      }
    

    Bug出现在Go的 for 循环中,该循环变量在每次迭代时会被重用,因此 req 变量会在所有的Go程间共享,这不是我们想要的。我们需要确保 req 对于每个Go程来说都是唯一的。
    有一种方法能够做到,就是将 req 的值作为实参传入到该Go程的闭包中:

    func Serve(queue chan *Request) {
    for req := range queue {
    sem <- 1
    go func(req *Request) {
    process(req)
    <-sem
    }(req)
    }
    }

    闭包的处理
    比较前后两个版本,观察该闭包声明和运行中的差别。 另一种解决方案就是以相同的名字创建新的变量,如例中所示:

    func Serve(queue chan *Request) {
        for req := range queue {
            req := req // 为该Go程创建 req 的新实例。
            sem <- 1
            go func() {
                process(req)
                <-sem
            }()
        }
    }
    

    它的写法看起来有点奇怪

    req := req
    

    但在Go中这样做是合法且惯用的。你用相同的名字获得了该变量的一个新的版本, 以此来局部地刻意屏蔽循环变量,使它对每个Go程保持唯一。

    1.3.5 固定数据的go程,同时读取

    另一种管理资源的好方法:

    • 启动固定数量的 handle Go程,一起从请求信道中读取数据。

    • Go程的数量限制了同时调用 process 的数量。

    • Serve 同样会接收一个通知退出的信道, 在启动所有Go程后,它将阻塞并暂停从信道中接收消息。

      func handle(queue chan *Request) {
      for r := range queue {
      process(r)
      }
      }

      func Serve(clientRequests chan *Request, quit chan bool) {
      // 启动处理程序,固定数量
      for i := 0; i < MaxOutstanding; i++ {
      go handle(clientRequests)
      }
      <-quit // 等待通知退出。
      }

    1.4 信道中的信道

    这种特性通常被用来实现安全、并行的多路分解。

    在上一节的例子中,handle 是个非常理想化的请求处理程序, 但我们并未定义它所处理的请求类型。若该类型包含一个可用于回复的信道, 那么每一个客户端都能为其回应提供自己的路径。以下为 Request 类型的大概定义。

    type Request struct {
        args        []int
        f           func([]int) int
        resultChan  chan int
    }
    

    客户端提供了一个函数及其实参,此外在请求对象中还有个接收应答的信道。

    func sum(a []int) (s int) {
        for _, v := range a {
            s += v
        }
        return
    }
    
    request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
    // 发送请求
    clientRequests <- request
    // 等待回应
    fmt.Printf("answer: %d\n", <-request.resultChan)
    

    服务端的处理
    On the server side, the handler function is the only thing that changes.

    func handle(queue chan *Request) {
        for req := range queue {
            req.resultChan <- req.f(req.args)
        }
    }
    

    1.5 并行化

    这些设计的另一个应用是在多CPU核心上实现并行计算。如果计算过程能够被分为几块 可独立执行的过程,它就可以在每块计算结束时向信道发送信号,从而实现并行处理。

    1.6 可能泄露的缓冲区

    --

    2. 错误

    Go语言具有多值返回特性, 使得它可以在返回常规的值,和详细的错误描述。

    按照约定,错误的类型通常为 error,这是一个内建的简单接口。

    type error interface {
        Error() string
    }
    

    库的编写者通过更丰富的底层模型可以轻松实现这个接口,这样不仅能看见错误, 还能提供一些上下文。

    例如,os.Open 可返回一个 os.PathError。

    /* 定义结构体 */
    // PathError 记录一个错误以及产生该错误的路径和操作。
    type PathError struct {
        Op string    // "open"、"unlink" 等等。
        Path string  // 相关联的文件。
        Err error    // 由系统调用返回。
    }
    
    /* 实现 Error接口 */
    func (e *PathError) Error() string {
        return e.Op + " " + e.Path + ": " + e.Err.Error()
    }
    

    这样,PathError的 Error 会生成如下错误信息:

    open /etc/passwx: no such file or directory
    

    错误字符串应尽可能地指明它们的来源,解释清楚错误的情况。

    若调用者想知道更多细节,可使用类型选择或者类型断言来查看特定错误,和处理。

    for try := 0; try < 2; try++ {
        file, err = os.Create(filename)
        if err == nil {
            return
        }
        if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
            deleteTempFiles()  // 恢复一些空间。
            continue
        }
        return
    }
    

    上面的第5行,即第2条 if 是另一种类型断言。
    若它失败, ok 将为 false,而 e 则为nil. 若它成功, ok 将为 true

    2.1 Panic

    有时程序就是不能继续运行。为此,可以使用内建的 panic 函数,它会产生一个运行时错误并终止程序。

    该函数接受一个任意类型的实参(一般为字符串),并在程序终止时打印输出。格式:

    Panic( 字符串 )
    

    实际使用中,库函数应避免 panic。若问题可以被屏蔽或解决, 最好就是让程序继续运行,而不是终止。

    一个反例的情况就是初始化中: 若某个库真的不能让自己工作,那就触发Panic 吧,比如

    var user = os.Getenv("USER")
    
        func init() {
          if user == "" {
          panic("no value for $USER")
        }
    }
    

    2.2 恢复

    当 panic 被调用后, 程序将立刻终止当前函数的执行,并开始回溯Go程的栈,运行任何被推迟的函数。 若回溯到达Go程栈的顶端,程序就会终止。

     //  我自己画的不太严谨的图例,帮助理解。
     //  假如在 main 函数里调用了 方法1,在 方法1 里又调用了 方法2
      |                              |
      |                              |
      | #4           方法2            |        // 假如在这里触发了 Panic
      | #3           方法2的defer     |    //在 defer 时,仍然有机会调用 recover函数来恢复
      | #2      方法1                |
      | #1   main                    |    //到这里就程序终止了
      -------------------------------
    

    不过我们可以用内建的 recover 函数来 取回Go程的控制权限 并使其恢复正常执行。

    调用 recover函数 将停止回溯过程,它的返回值是错误信息(实际是调用 panic 函数时的参数)。

    由于在回溯时,只有被推迟的函数( defer )在运行,因此 recover 只能在被推迟(defer)的函数中才有效。

    在 Go程 内通过 recover 来终止失败的Go程,而无需让整个程序崩溃。

    先看示例代码:

    func server(workChan <-chan *Work) {
        for work := range workChan {
            go safelyDo(work)
        }
    }
    
    func safelyDo(work *Work) {
        defer func() {
            if err := recover(); err != nil {
                log.Println("work failed:", err)
            }
        }()
        do(work)
    }
    

    在此例中,若 do(work) 触发了Panic,其结果就会被记录(打印输出), 而该Go程会被干净利落地结束,不会干扰到其它Go程。我们无需在推迟的闭包中做任何事情, recover 会处理好这一切。

    相关文章

      网友评论

          本文标题:Go语言基础6 - 并发

          本文链接:https://www.haomeiwen.com/subject/lnxljhtx.html