美文网首页
(转)Concurrency in Go 3--Channels

(转)Concurrency in Go 3--Channels

作者: one_zheng | 来源:发表于2018-06-26 10:33 被阅读8次

    通道也可以声明为仅支持单向数据流——即你可以定义仅支持发送或仅支持接收数据的通道。我将在本节的末尾解释单向数据流的重要性。

    要声明一个只能被读取的单向通道,你可以简单的使用 <- 符号,如下所示:

    var dataStream <-chan interface{}
    dataStream := make(<-chan interface{})
    

    与之相对应,要声明一个只能被发送的单向通道,把 <-放在 chan关键字的右侧即可:

    var dataStream chan<- interface{}
    dataStream := make(chan<- interface{})
    

    你不会经常看到实例化的单向通道,但你会经常看到它们被用作函数参数和返回类型,这是非常有用的,因为Go可以在需要时将双向通道隐式转换为单向通道,比如这样:

    var receiveChan <-chan interface{}
    var sendChan chan<- interface{}
    dataStream := make(chan interface{})
    
    // 这样做是有效的
    receiveChan = dataStream
    sendChan = dataStream
    

    尝试将值写入只读通道或从只写通道读取值都是错误的。如果我们尝试编译下面的例子,Go的编译器会报错:

    writeStream := make(chan<- interface{})
    readStream := make(<-chan interface{})
    
    <-writeStream
    readStream <- struct{}{}
    

    这会输出:

    invalid operation: <-writeStream (receive from send-only type chan<- interface {})
    invalid operation: readStream <- struct {} literal (send to receive-only type <-chan interface {})
    

    < - 运算符的接收形式也可以选择返回两个值,如下所示:

    stringStream := make(chan string)
    go func() {
        stringStream <- "Hello channels!"
    }()
    salutation, ok := <-stringStream //1
    fmt.Printf("(%v): %v", ok, salutation)
    

    我们在这里接收一个字符串salutation和一个布尔值ok。
    这会输出:

    (true): Hello channels!
    

    布尔值代表读取操作的一个标识,用于指示读取的通道是由过程中其他位置的写入生成的值,还是由已关闭通道生成的默认值。等一下; 一个已关闭的通道,那是什么?

    在程序中,能够指示还有没有更多值将通过通道发送是非常有用的。 这有助于下游流程知道何时移动,退出,或在新的通道上重新开启通信等。我们可以通过为每种类型提供特殊的标识符来完成此操作,但这会开发人员的工作产生巨大的重复性,如果能够内置将产生极大的便利,因此关闭通道就像是一个万能的哨兵,它说:“嘿,上游不会写更多的数据啦,做你想做的事吧。”要关闭频道,我们使用close关键字,就像这样:

    valueStream := make(chan interface{})
    close(valueStream)
    

    有趣的是,我们也可以从已关闭的通道读取。 看这个例子:

    intStream := make(chan int)
    close(intStream)
    integer, ok := <- intStream // 1
    fmt.Printf("(%v): %v", ok, integer)
    

    1.这里我们从已关闭的通道读取。
    这会输出:

    (false): 0
    

    注意我们在关闭通道前并没有把任何值放入通道。即便如此我们依然可以执行读取操作,而且尽管通道处在关闭状态,我们依然可以无限期地在此通道上执行读取操作。这是为了支持单个通道的上游写入器可以被多个下游读取器读取(在第四章我们会看到这是一种常见的情况)。第二个返回值——即布尔值ok——表明收到的值是int的零值,而非被放入流中传递过来。

    这为我们开辟了一些新的模式。首先是通道的range操作。与for语句一起使用的range关键字支持将通道作为参数,并且在通道关闭时自动结束循环。这允许对通道上的值进行简洁的迭代。 我们来看一个例子:

    intStream := make(chan int)
    go func() {
        defer close(intStream) // 1
        for i := 1; i <= 5; i++ {
            intStream <- i
        }
    }()
    
    for integer := range intStream { // 2
        fmt.Printf("%v ", integer)
    }
    

    在这里我们在通道退出之前保证正常关闭。这是一种很常见的Go惯用法。
    这里对intStream进行迭代。
    正如你所看到的,所有的值被打印后程序退出:

    1 2 3 4 5
    

    缓冲通道

    无缓冲的通道也可以按缓冲通道定义:无缓冲的通道可以视作一个容量为0的缓冲通道。就像下面这样:

    a := make(chan int)
    b := make(chan int, 0)
    

    这两个通道都是int“类型”的。请记住我们在讨论“阻塞”时所代表的含义,我们说向一个已满的通道写入,会出现阻塞,从一个已空的通道读取,也会出现阻塞。这里的“满”和“空”是针对容量或缓冲区大小而言的。无缓冲的通道所拥有的容量为0,所以任何写入行为之后它都会是满的。一个容量为4的缓冲通道在4次写入后会是满的,并且会在第5次写入时出现阻塞,因为它已经没有其他位置可以放置第5个元素,这时它表现出的行为与无缓冲通道一样:由此可见,缓冲通道和无缓冲通道的区别在于,通道为空和满的前提条件是不同的。通过这种方式,缓冲通道可以在内存中构建用于并发进程通信的FIFO队列。

    下表列举了通道上的操作对应状态的通道会发生什么。

    注意:表中的用词都很简短,为了减少不必要的歧义或混乱,并未对该表进行不必要的翻译,此外,正如上面例子所展现的,该表的操作结果默认都是在main函数下操作。请以批判的眼光审视下表。


    c_3.jpg

    防止goroutine

    正如我们在“Goroutines”一节中介绍的那样,goroutines占用资源较少且易于创建。运行时将多个goroutine复用到任意数量的操作系统线程,以便我们不必担心抽象级别。但是他们会花费成本资源,并且goroutine不会被运行时垃圾收集,所以无论内存占用多少,我们都不想让他们对我们的进程撒谎。 那么我们如何去确保他们被清理干净?

    让我们从头开始,一步一步思考:为什么会有一个goroutine? 在第二章中,我们确定,goroutines代表可能并行或不可以并行运行的工作单元。 该goroutine有几条路径终止:

    当它完成任务。
    当它遇到不可恢复的错误无法继续它的任务。
    当它被告知停止当前任务。
    前两条我们已经知晓,可以通过算法实现。但如何取消当前任务?由于网络效应,这最重要的一点是:如果你已经开始了一个goroutine,那么它很可能以某种有组织的方式与其他几个goroutines合作。我们甚至可以把这种相互连接表现为一张图表,这时该goroutine能否停下来还取决于处在交互的其他goroutines。我们将在下一章中继续关注大规模并发产生的相互依赖关系,但现在让我们考虑如何确保保证单个goroutine得到清理。 让我们从一个简单的goroutine泄漏开始:

    doWork := func(strings <-chan string) <-chan interface{} {
        completed := make(chan interface{})
        go func() {
            defer fmt.Println("doWork exited.")
            defer close(completed)
            for s := range strings {
                fmt.Println(s)
            }
        }()
        return completed
    }
    
    doWork(nil)
    // 这里还有其他任务执行
    fmt.Println("Done.")
    

    我们看到doWork被传递了一个nil通道。所以strings通道永远无法读取到其承载的内容,而且包含doWork的goroutine将在这个过程的整个生命周期中保留在内存中(如果我们在doWork和主goutoutine中加入了goroutine,我们甚至会死锁)。

    在这个例子中,整个进程的生命周期很短,但是在一个真正的程序中,goroutines可以很容易地在一个长期生命的程序开始时启动,导致内存利用率下降。

    解决这种情况的方法是建立一个信号,按照惯例,这个信号通常是一个名为done的只读通道。父例程将该通道传递给子例程,然后在想要取消子例程时关闭该通道。 这是一个例子:

    doWork := func(done <-chan interface{}, strings <-chan string) <-chan interface{} { //1
        terminated := make(chan interface{})
        go func() {
            defer fmt.Println("doWork exited.")
            defer close(terminated)
            for {
                select {
                case s := <-strings:
                    // Do something interesting
                    fmt.Println(s)
                case <-done: //2
                    return
                }
            }
        }()
        return terminated
    }
    
    done := make(chan interface{})
    terminated := doWork(done, nil)
    
    go func() { //3
        // Cancel the operation after 1 second.
        time.Sleep(1 * time.Second)
        fmt.Println("Canceling doWork goroutine...")
        close(done)
    }()
    
    <-terminated //4
    fmt.Println("Done.")
    
    1. 这里我们传递done通道给doWork函数。作为惯例,这个通道被作为首个参数。
    2. 这里我们看到使用了for-select的使用模式之一。我们的目的是检查done通道有没有发出信号。如果有的话,我们退出当前goroutine。
    3. 在这里我们创建另一个goroutine,一秒后就会取消doWork中产生的goroutine。
    4. 这是我们在main goroutine中调用doWork函数返回结果的地方。
      这会输出:
    Canceling doWork goroutine... 
    doWork exited.
    Done.
    

    你可以看到尽管向doWork传递了nil给strings通道,我们的goroutine依然正常运行至结束。与之前的例子不同,本例中我们把两个goroutine连接在一起之前,我们建立了第三个goroutine以取消doWork中的goroutine,并成功消除了泄漏问题。

    前面的例子很好地处理了在通道上接收goroutine的情况,但是如果我们正在处理相反的情况:在尝试向通道写入值时阻塞goroutine会怎样?

    newRandStream := func() <-chan int {
        randStream := make(chan int)
        go func() {
            defer fmt.Println("newRandStream closure exited.") // 1
            defer close(randStream)
            for {
                randStream <- rand.Int()
            }
        }()
    
        return randStream
    }
    
    randStream := newRandStream()
    fmt.Println("3 random ints:")
    for i := 1; i <= 3; i++ {
        fmt.Printf("%d: %d\n", i, <-randStream)
    }
    

    1.当goroutine成功执行时我们打印一行消息。
    这会输出:

    3 random ints:
    1: 5577006791947779410
    2: 8674665223082153551
    3: 6129484611666145821
    

    你可以看到注释1所在的打印语句并未执行。在循环的第三次迭代之后,我们的goroutine块试图将下一个随机整数发送到不再被读取的通道。我们无法告知它停下来,解决方案是为生产者提供一条通知它退出的通道:

    newRandStream := func(done <-chan interface{}) <-chan int {
        randStream := make(chan int)
        go func() {
            defer fmt.Println("newRandStream closure exited.")
            defer close(randStream)
    
            for {
                select {
                case randStream <- rand.Int():
                case <-done:
                    return
                }
            }
    
        }()
    
        return randStream
    }
    
    done := make(chan interface{})
    randStream := newRandStream(done)
    fmt.Println("3 random ints:")
    for i := 1; i <= 3; i++ {
        fmt.Printf("%d: %d\n", i, <-randStream)
    }
    
    close(done)
    //模拟正在进行的工作
    time.Sleep(1 * time.Second)
    

    这会输出:

    3 random ints:
    1: 5577006791947779410
    2: 8674665223082153551
    3: 6129484611666145821
    newRandStream closure exited.
    

    我们现在看到该goroutine被妥善清理。
    现在我们知道如何确保goroutine不泄漏,我们可以制定一个约定:如果goroutine负责创建goroutine,它也负责确保它可以停止goroutine。

    这个约定有助于确保程序在组合和扩展时可用。我们将在“管道”和“context包”中重新讨论这种技术和规则。我们该如何确保goroutine能够被停止根据goroutine的类型和用途而有所不同,但是它们 所有这些都是建立在传递done通道基础上的。

    相关文章

      网友评论

          本文标题:(转)Concurrency in Go 3--Channels

          本文链接:https://www.haomeiwen.com/subject/grjyyftx.html