美文网首页Go 并发编程
Go 并发编程:通道应用范式之管道模式

Go 并发编程:通道应用范式之管道模式

作者: GoFuncChan | 来源:发表于2020-03-05 00:20 被阅读0次

    管道模式

    一、类Unix串行管道:使用通道实现串行管道功能

    我们在使用类Unix系统时常常用到管道命令,如"ls |grep 'path/to' ",它可以让数据在多个命令操作中串行处理。Go的通道也可以做到如此,利用通道通信的特性我们可以创建多个连续通道,让一个函数的输出作为另一个函数的输入,而另一个函数的输出也可以作为其他函数的输入。

    Go标准库中的io.Pipe()可以创建类Unix风格管道,它适合纯粹的IO系统原语的管道操作。然而go语言原语中的通道也可以做到类似的操作,以下是Go并发编程的范式之一,可以普适到更多的应用场景。

    // 管道过滤器范式
    func Demo() {
        a(b(c(source("source1", "source2", "source3"))))
    }
    
    func a(in <-chan string) {
        for i := range in {
            fmt.Println("a" + i)
        }
    }
    
    func b(in <-chan string) <-chan string {
        out := make(chan string, cap(in))
        go func() {
            defer close(out)
            for i := range in {
                out <- "b" + i
            }
        }()
        return out
    }
    
    func c(in <-chan string) <-chan string {
        out := make(chan string, cap(in))
        go func() {
            defer close(out)
            for i := range in {
                out <- "c" + i
            }
        }()
        return out
    }
    
    // 管道输入源
    func source(inputs ...string) <-chan string {
        out := make(chan string, len(inputs))
    
        go func() {
            defer close(out)
            for _, item := range inputs {
                out <- item
                fmt.Println("source input:", item)
            }
        }()
        return out
    }
    

    运行输出:

    === RUN   TestDemo21
    source input: source1
    source input: source2
    source input: source3
    abcsource1
    abcsource2
    abcsource3
    --- PASS: TestDemo21 (0.00s)
    PASS
    

    二、构建管道最佳实践

    以上为最简单的模拟管道,但仔细一看还是存在问题的,我们在《防止Goroutine泄露》中讨论过,管道每个阶段都有可能出现协程泄露的风险,我们可以引入done管道解决这个问题,下面看一个比较有实质性的例子:通过一个生成器产生一些值并进入管道运算,最后输出值。

    // 生成器
    generator := func(done <-chan interface{}, integers ...int) <-chan int {
        intStream := make(chan int)
        go func() {
            defer close(intStream)
            for _, i := range integers {
                select {
                case <-done:
                    return
                case intStream <- i:
                }
            }
        }()
        return intStream
    }
    
    // 乘法阶段
    multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
        multipliedStream := make(chan int)
        go func() {
            defer close(multipliedStream)
            for i := range intStream {
                select {
                case <-done:
                    return
                case multipliedStream <- i * multiplier:
                }
            }
        }()
    
        return multipliedStream
    }
    
    // 加法阶段
    add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
        addedStream := make(chan int)
        go func() {
            defer close(addedStream)
            for i := range intStream {
                select {
                case <-done:
                    return
                case addedStream <- i + additive:
                }
            }
        }()
        return addedStream
    }
    
    // 防止协程泄露的done管道
    done := make(chan interface{})
    defer close(done)
    
    // 生成一些值,并进行管道运算
    intStream := generator(done, 1, 2, 3, 4)
    pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
    
    // 输出管道运算结果
    for v := range pipeline {
        fmt.Println(v)
    }
    

    在这个管道模式中,我们看到两件事:

    • 在管道的末尾,可以使用range语句来提取值;
    • 在每个阶段可以安全地并发执行,因为输入和输出在并发上下文中是安全的。

    看到关闭done通道是如何影响到管道的了么?这是通过管道每个阶段的两件事情实现的:

    • 对传入的频道进行遍历。当输入通道关闭时,遍历操作将退出。
    • 发送操作与done通道共享select语句。

    无论流水线阶段处于等待数据通道的状态,还是处在等待发送通道关闭的状态,都会强制管道各阶段终止。这里有一个复发关系。在管道开始时,我们已经确定必须将传入的切片值转换为通道。在这个过程中有两点必须是可抢占的:

    • 在生成器通道上创建值。
    • 在其频道上发送离散值。

    在管道开始和结束之间,代码总是在一个通道上遍历,并在包含done通道的select语句内的另一个通道上发送。如果某个阶段在传入通道检索到值时被阻塞,则该通道关闭时它将变为未阻塞状态。 如果某个阶段在发送值时被阻塞,则由于select语句而可抢占。因此,整个管道始终可以通过关闭done通道来抢占。

    三、生成器模式:

    在上面的generator中我们看到一个简单的生成器,咋一看还比较死板,我们可以利用通道构建一个可获取特定重复值的生成器。

    以下函数会重复你传给它的值,直到你告诉它停止:

    var repeat = func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
    
        valueStream := make(chan interface{})
        go func() {
            defer close(valueStream)
            for {
                for _, v := range values {
                    select {
                    case <-done:
                        return
                    case valueStream <- v:
                    }
                }
            }
        }()
        return valueStream
    }
    

    以下函数会从其传入的valueStream中取出第一个元素然后退出:

    var take = func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {
    
        takeStream := make(chan interface{})
        go func() {
            defer close(takeStream)
            for i := 0; i < num; i++ {
                select {
                case <-done:
                    return
                case takeStream <- <-valueStream:
                }
            }
        }()
        return takeStream
    }
    

    OK,让我们组合它们使用,看一个用例:

    func  Demo() {
        done := make(chan interface{})
        defer close(done)
    
        for num := range take(done, repeat(done, 1), 10) {
            fmt.Printf("%v ", num)
        }
    }
    

    在这个基本的例子中,我们创建了一个repeat生成器来生成无限数量的重复生成器,但是只取前10个。repeat生成器由take接收。虽然我们可以生成无线数量的流,但只会生成n+1个实例,其中n是我们传入take的数量。

    除了生成特定的固定数量的值,我们还可以扩展一下,如果把repeat扩展成repeatFn,我们可以生成任何数据:

    var repeatFn = func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
    
        valueStream := make(chan interface{})
        go func() {
            defer close(valueStream)
            for {
                select {
                case <-done:
                    return
                case valueStream <- fn():
                }
            }
        }()
        return valueStream
    }
    

    继续看用例:

    func Demo() {
        done := make(chan interface{})
        defer close(done)
        rand := func() interface{} {
            return rand.Int()
        }
        for num := range take(done, repeatFn(done, rand), 10) {
            fmt.Println(num)
        }
    }
    

    相关文章

      网友评论

        本文标题:Go 并发编程:通道应用范式之管道模式

        本文链接:https://www.haomeiwen.com/subject/jgonkhtx.html