美文网首页Go 并发编程
Go 并发编程:通道常见应用范式

Go 并发编程:通道常见应用范式

作者: GoFuncChan | 来源:发表于2020-03-04 22:46 被阅读0次

    通道经典应用

    一、闭包实现通道访问限制

    在Go的并发编程中,创建通道和开辟协程是非常方便且容易的,正因如此,有可能会导致开发者滥用。如果在团队开发中没有良好的协商和规范,更可能会导致并发数据不安全。
    例如:

    func Demo() {
        ch := make(chan int, 0)
        go dosomething(ch, 10)
        go dosomething(ch, 20)
        dosomething(ch, 30)
    }
    
    func dosomething(ch chan int, num int) {
        for i := 1; i < num; i++ {
            ch <- i
        }
        close(ch1)
    }
    

    以上是个非常明显的不规范使用通道的例子,你能得到的只有死锁!

    fatal error: all goroutines are asleep - deadlock!
    

    那么能不能限制通道的使用呢,即特定通道只让特定协程使用?
    有几种解决方案:

    • 团队协商,在代码规范上下功夫,显然这是不安全的。
    • 使用同步功能
    • 闭包实现访问受限的通道

    以下我们使用第三种方案:
    闭包实现访问受限的通道,只允许特定协程使用

    func Demo() {
        // 生产者:producter 内部开辟一条协程往里面发送数据,并返回一个只读通道
        producter := func() <-chan int {
            results := make(chan int, 5) // 该通道只作用于特定闭包内的作用域
            go func() {
                defer close(results)
                for i := 0; i <= 5; i++ {
                    results <- i
                }
            }()
            return results
        }
    
        // 消费者:运行这个闭包时需要传入一个只读通道
        consumer := func(results <-chan int) { 
            for result := range results {
                fmt.Printf("Received: %d\n", result)
            }
            fmt.Println("Done receiving!")
        }
            
        consumer(producter())
    }
    

    二、for-select范式:关于多通道操作的整合

    我们知道select语句是go专门为多通道操作提供的原语,单个select语句可以一次性的从多个通道选取一个来读写,只要哪个通道先不处于阻塞状态便选取哪个通道读写。而结合for循环语句构成的for-select结构可以循环不断的从多通道读写数据,直到特定条件退出。

    for-select循环模式如下所示:

    for { // 无限循环或遍历
        select {
        // 对通道进行操作
        }
    }
    

    常见的几种for-select循环的用法:
    a. 在通道上发送迭代变量

    for _, s := range []string{"a", "b", "c"} {
        select {
        case <-done:
            return
        case stringStream <- s:   // slice数据循环迭代写入channel
        }
    }
    

    b. 无限循环等待停止

    // 第一种方式
    for {
        select {
        case <-done: 
            return   // 停止返回
        default:
        }
        // 执行非抢占任务
    }
    
    // 第二种方式
    for {
        select {
        case <-done:
            return 
        default:    
          // 将要执行的任务放入default分支中
          // 执行非抢占任务
        }
    }
    

    通过善用通道,我们可以在许多并发过程中尽量避免使用同步锁,select原语可以集中处理多个通道,大大提高了开发和运行效率。

    三、or-channel :递归多个通道的或读取,只要有一个通道返回即完成

    
    /*
    递归多个通道的或读取,只要有一个通道返回即完成
    */
    func Demo() {
        var or func(channels ...<-chan interface{}) <-chan interface{}
        // 建立了名为or的递归函数,接收数量可变的通道并返回单个通道。
        or = func(channels ...<-chan interface{}) <-chan interface{} 
        // 两个递归终止条件     
        switch len(channels) {
            case 0:  // 如果传入的切片是空的,我们简单的返回一个nil通道
                return nil
            case 1:  // 如果切片只含有一个元素,我们就返回给元素
                return channels[0]
            }
          
            orChannel := make(chan interface{})
            // 建立一个goroutine,以便可以不受阻塞地等待我们通道上的消息
            go func() { 
                defer close(orDone)
    
                switch len(channels) {
                case 2: // 由于我们这里是递归的,每次递归调用将至少有两个通道。作为保持goroutine数量受到限制的优化方法,们在这里为仅使用两个通道的时设置了一个特殊情况。
                    select {
                    case <-channels[0]:
                    case <-channels[1]:
                    }
                default: // 递归地在第三个索引之后,从切片中的所有通道中创建一个or通道,然后从中选择。递归操作会逐层累计直到取到第一个通道元素。我们在其中传递了orChannel通道,这样当该树状结构顶层的goroutines退出时,结构底层的goroutines也会退出。
                    select {
                    case <-channels[0]:
                    case <-channels[1]:
                    case <-channels[2]:
                    case <-or(append(channels[3:], orChannel)...): 
                    }
                }
            }()
            return orDone
        }
    
    
    
        // 下面这个例子将经过一段时间后关闭通道,然后使用or函数将这些通道合并到一个关闭的通道中:
        sig := func(after time.Duration) <-chan interface{} { // 创建了一个通道,当后续时间中指定的时间结束时将关闭该通道
            c := make(chan interface{})
            go func() {
                defer close(c)
                time.Sleep(after)
            }()
            return c
        }
    
        start := time.Now() // 设置追踪自or函数的通道开始阻塞的起始时间
        <-or(sig(2*time.Hour), sig(5*time.Minute), sig(1*time.Second), sig(1*time.Hour), sig(1*time.Minute))
        fmt.Printf("done after %v", time.Since(start)) // 打印阻塞发生的时间
    }
    
    

    这是一种奇妙的做法,你可以将任意数量的通道组合到单个通道中,只要任何作为组件的通道关闭或被写入,整个通道就会关闭。

    四、chRange 封装安全的通道遍历读取

    有时你会与来自系统不同部分的通道交互。与管道不同的是,当你使用的代码通过done通道取消操作时,你无法对通道的行为方式做出判断。也就是说,你不知道正在执行读取操作的goroutine现在是什么状态。出于这个原因,正如我们在“防止Goroutine泄漏”中所阐述的那样,需要用select语句来封装我们的读取操作和done通道。可以简单的写成这样:

    for val := range myChan {
        // 对 val 进行处理
    }
    

    展开后可以写成这样:

    loop:
        for {
            select {
            case <-done:
                break loop
            case maybeVal, ok := <-myChan:
                if ok == false {
                    return // or maybe break from for
                }
                // Do something with val
            }
        }
    

    这样做可以快速退出嵌套循环。继续使用goroutines编写更清晰的并发代码,而不是过早优化的主题,我们可以用一个goroutine来解决这个问题。 我们封装了细节,以便其他人调用更方便:

    /*
    封装一个通用的安全的通道读取器,以便于可安全地for range遍历任意通道
    */
    var chRange = func(done, ch <-chan interface{}) <-chan interface{} {
        valStream := make(chan interface{})
        // 使用协程闭包封装安全的读取通道 
        go func() {
            defer close(valStream)
            for {
                select {
                case <-done:
                    return
                case v, ok := <-ch:
                    if ok == false {
                        return
                    }
                    select {
                    case valStream <- v:
                    case <-done:
                    }
                }
            }
        }()
    
        return valStream
    }
    
    

    调用示例:这样对任意通道我们都可以简单安全的读取

    func Demo() {
        done := make(chan interface{})
        defer close(done)
    
        ch := make(chan interface{})
        go func() {
            defer close(ch)
            for i := 0; i < 10; i++ {
                ch <- i
            }
        }()
    
        for val := range chRange(done, ch) {
            fmt.Printf("read %v \n", val)
        }
    }
    

    五、tee-channel 分割通道数据流

    tee-channel类似Linux的tee命令,分割来自通道的值,以便将它们发送到两个独立区域。想象一下:你可能想要在一个通道上接收一系列操作指令,将它们发送给执行者,同时记录操作日志。

    var tee = func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
    
        out1 := make(chan interface{})
        out2 := make(chan interface{})
    
        go func() {
            defer close(out1)
            defer close(out2)
            for val := range chRange(done, in) {
                select {
                case <-done:
                default:
                    out1 <- val
                    out2 <- val
                }
    
            }
        }()
        return out1, out2
    }
    

    注意写入out1和out2是紧密耦合的。 直到out1和out2都被写入,迭代才能继续。 通常这不是问题,因为无论如何,处理来自每个通道的读取流程的吞吐量应该是tee之外的关注点,但值得注意。 这是一个快速调用示例:

    func Demo()  {
        done := make(chan interface{})
        defer close(done)
    
        ch := make(chan interface{})
        go func() {
            for i := 0; i < 10; i++ {
                ch <- i
            }
            defer close(ch)
        }()
    
        out1, out2 := tee(done, ch)
    
        for val1 := range out1 {
            fmt.Printf("out1: %v, out2: %v\n", val1, <-out2)
        }
    
    

    利用这种模式,很容易使用通道作为系统数据的连接点。

    六、bridge-channel

    在某些情况下,你可能会发现自己想要使用一系列通道,即你可能需要从一个通道中获取多个通道的值:

    <-chan <-chan interface{}
    

    这与将某个通道的数据切片合并到一个通道中稍有不同,这种调用方式意味着一系列通道有序的写入操作。从通道读取一系列通道的值 ,类似多通道过独木桥。

    // 通道桥接
    var bridge = func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
    
        valStream := make(chan interface{}) // 1
        go func() {
            defer close(valStream)
            for { // 2
                var stream <-chan interface{}
                select {
                case maybeStream, ok := <-chanStream:
                    if ok == false {
                        return
                    }
                    stream = maybeStream
                case <-done:
                    return
                }
                for val := range chDone(done, stream) { // 3
                    select {
                    case valStream <- val:
                    case <-done:
                    }
                }
            }
        }()
        return valStream
    }
    

    使用示例:

    func Demo() {
        genVals := func() <-chan <-chan interface{} {
    
            chanStream := make(chan (<-chan interface{}))
    
            go func() {
                defer close(chanStream)
                for i := 0; i < 10; i++ {
                    stream := make(chan interface{}, 1)
                    stream <- i
                    close(stream)
                    chanStream <- stream
                }
            }()
            return chanStream
        }
    
        for v := range bridge(nil, genVals()) {
            fmt.Printf("%v ", v)
        }
    }
    

    相关文章

      网友评论

        本文标题:Go 并发编程:通道常见应用范式

        本文链接:https://www.haomeiwen.com/subject/vznnkhtx.html