美文网首页Go 并发编程
Go 并发编程:Goroutine常见应用范式

Go 并发编程:Goroutine常见应用范式

作者: GoFuncChan | 来源:发表于2020-03-06 00:50 被阅读0次

    一、多独立协程并发——worker分工模式

    并发协程独立运行且互不通信,主协程等待处理独立子协程的结果

    并发编程有一种常见方式就是许多工作子协程都是独立的,互不干扰,但他们又是“同一时间”处理。
    例如http服务器的工作模式就是这种,在go标准包net/http中,http服务启动后,它所接收的每一个请求都是独立协程处理的,每个请求的运行协程之间都互不通信。
    我们知道,go http服务的请求处理协程都是动态创建的,但很多情况下,我们是需要一些固定数量的协程去独立处理任务,所以这里我们先说固定数量协程独立运行的情况。

    范式实现:
    • 创建三个通道,分别处理“任务单元”、“任务结果”、“任务完成状态”三种通信数据;
    • 启动新增任务协程,把任务单元数据发送给jobs通道;
    • 各任务单元分别由独立的worker协程处理,执行独立任务,并把任务处理结果发送给results通道;
    • 启动等待任务结果协程,从done管道接收任务处理完成的数据;
    • 主协程显示处理结果。

    我们知道go有个select原语,专门用来处理阻塞或非阻塞通道数据的,我们可以使用for...select...范式最大效率地不断从通道中读出数据。代码如下:

    func Demo() {
        run("读书", "看报", "撸代码")
    }
    
    type TaskJob struct {
        task    string
        results chan<- Result
    }
    
    func (j *TaskJob) do() {
        fmt.Println("Do Task:", j.task)
        // Do Something...
        j.results <- Result{j.task, 200, "Successful"}
    }
    
    type Result struct {
        task    string
        code    int
        message string
    }
    
    // 逻辑核心数作为并发worker数
    var workers = runtime.NumCPU()
    
    func run(tasks ...string) {
        jobs := make(chan TaskJob, workers)      // 任务通道
        results := make(chan Result, len(tasks)) // 执行结果输出通道
        done := make(chan struct{}, workers)     // 任务完成状态通道
    
        // 子协程添加任务
        go addJob(jobs, tasks, results)
    
        // 使用单独的worker协程执行并发任务
        for i := 0; i < workers; i++ {
            go doJobs(done, jobs)
        }
        // 主协程等待并处理结果
        waitAndProcessResults1(done, results)
    }
    // 添加任务
    func addJob(jobs chan<- TaskJob, tasks []string, results chan<- Result) {
        for _, task := range tasks {
            jobs <- TaskJob{task: task, results: results}
        }
        // 因为此为发送端,新增完任务后在此关闭通道
        close(jobs)
    }
    
    // 执行任务
    func doJobs(done chan<- struct{}, jobs <-chan TaskJob) {
        for job := range jobs {
            job.do()
        }
        done <- struct{}{}
    }
    
    // 等待并处理子协程记过:for...select...范式同时处理results和done通道,合并awaitCompletion()和showResults()
    func waitAndProcessResults1(done <-chan struct{}, results <-chan Result) () {
        // 【阻塞】运行worker时:要么就在处理结果<-results,要么处理完成<-done
        for i := workers; i > 0; {
            select {
            case result := <-results:
                fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
            case <-done:
                i--
            }
        }
    
        // 【非阻塞】worker全部做完后:results全部清空后退出
    DONE:
        for {
            select {
            case result := <-results:
                fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
            default:
                break DONE
            }
        }
    }
    

    执行结果:

    === RUN   TestDemo31
    Do Task: 撸代码
    Do Task: 读书
    Task Job :撸代码, Result:200,Successful
    Task Job :读书, Result:200,Successful
    Do Task: 看报
    Task Job :看报, Result:200,Successful
    --- PASS: TestDemo31 (0.00s)
    PASS
    

    可以看到,当有好几个不同的通道需要处理时,使用select原语是非常方便的。在这里我们集中处理results和done通道。

    添加子协程超时处理:

    当对worker任务子协程有超时要求时,也可在select中添加超时操作,对waitAndProcessResults()修改如下:

    // 合并awaitCompletion()和showResults(),并增加超时处理
    func waitAndProcessResults(timeout time.Duration, done <-chan struct{}, results <-chan Result) () {
        // 超时通道
        finish := time.After(time.Duration(timeout))
    
        // 【阻塞】运行worker时:要么就在处理结果<-results,要么处理完成<-done,要么超时
        for i := workers; i > 0; {
            select {
            case result := <-results:
                fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
            case <-finish:
                fmt.Println("worker任务执行超时!")
                return
            case <-done:
                i--
            }
        }
    
        // 【非阻塞】worker全部做完后:results全部清空后退出
        for {
            select {
            case result := <-results:
                fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
            case <-finish:
                fmt.Println("worker任务执行超时!")
                return
            default:
                return
            }
        }
    }
    
    

    二、地铁闸机——限流模式

    开启多条子协程, 往有限的通道发送/接收数据,这种情形类似地铁站内的地铁闸机模式,闸机就那么几个,行人分组从闸机通过,通常哪个闸机人少,人们倾向于排哪个闸门的队,或者哪个闸机通行效率高就排哪个闸门的队。

    在这种并发模式中,协程相当于行人,通道相当于地铁闸门,多条通道从有限闸门中通过。
    具体代码如下:

    func Demo() {
        var workers = 100000
        ch0 := make(chan int, 0)
        ch1 := make(chan int, 1)
        ch2 := make(chan int, 2)
        ch3 := make(chan int, 3)
    
        counter := make([]int, 4)
    
        // 开workers条协程,发送消息到四条通道
        for i := 0; i < workers; i++ {
            go func(n int) {
                select {
                case ch0 <- n:
                    send(n, 0)
                case ch1 <- n:
                    send(n, 1)
                case ch2 <- n:
                    send(n, 2)
                case ch3 <- n:
                    send(n, 3)
                }
            }(i)
        }
    
        // 主协程从四条通道接收workers条子协程发送的消息
        for i := 0; i < workers; i++ {
            select {
            case rec := <-ch0:
                receive(rec, 0)
                counter[0]++
            case rec := <-ch1:
                receive(rec, 1)
                counter[1]++
            case rec := <-ch2:
                receive(rec, 2)
                counter[2]++
            case rec := <-ch3:
                receive(rec, 3)
                counter[3]++
            }
        }
        fmt.Println("channel counter:", counter)
    }
    
    func send(i, j int) {
        fmt.Printf("goroutine#%d send %d to ch%d\n", i, i, j)
    }
    
    func receive(routine, ch int) {
        fmt.Printf("receive from goroutine#%d to ch%d channel\n", ch, routine)
    }
    
    

    运行结果:

    ...
    receive from goroutine#92612 to ch2 channel
    receive from goroutine#92426 to ch0 channel
    receive from goroutine#92733 to ch0 channel
    receive from goroutine#92531 to ch0 channel
    goroutine#92531 send 92531 to ch0
    goroutine#92616 send 92616 to ch3
    goroutine#92425 send 92425 to ch2
    goroutine#92612 send 92612 to ch2
    goroutine#92733 send 92733 to ch0
    goroutine#92424 send 92424 to ch2
    receive from goroutine#92423 to ch3 channel
    receive from goroutine#92614 to ch1 channel
    receive from goroutine#92734 to ch1 channel
    goroutine#92617 send 92617 to ch3
    goroutine#92735 send 92735 to ch1
    goroutine#92530 send 92530 to ch3
    ...
    channel counter: [25030 25073 25011 24886]
    

    以上结果表明,多条协程数据比较均衡地从四条通道中通过


    三、扇入扇出——分流模式

    我们通常会遇到许多耗时任务,如一个通道的数据流向一个执行函数时,当前函数执行时长较长,我们可以把该通道的数据流拆分流向多个通道,并给每个通道启动相应的goroutine处理;随后把所有通道汇总到一个通道输出,以加大该耗时任务的执行效率。
    以上过程我们成为扇入扇出过程:

    • 扇出(Fan-out):将一个通道的数据分流给多个通道并启动多个goroutine处理;
    • 扇入(Fan-in):将多个goroutines返回的通道的结果组合到一个通道并输出;

    我们看一个简单示例:模拟管道中某一阶段的耗时任务

    // 模拟耗时任务
    func takeUpTimeTask(done <-chan interface{}, inStream <-chan interface{}) <-chan interface{} {
        outStream := make(chan interface{})
        go func() {
            defer close(outStream)
            for {
                select {
                case <-done:
                    return
                case val ,ok := <-inStream:
                    if !ok {
                        return
                    }
                    select {
                    case <-done:
                        return
                    case outStream <- val:
                        // 模拟耗时任务
                        time.Sleep(time.Second)
                    }
    
                }
            }
        }()
        return outStream
    }
    

    我们来模拟调用运行它

    // 没有扇入扇出处理
    func Demo1() {
        done := make(chan interface{})
        defer close(done)
        inStream := make(chan interface{})
    
        go func() {
            defer close(inStream)
            for i := 0; i < 20; i++ {
                inStream <- i
            }
        }()
    
        // 对数据流执行耗时任务
        resultChan := takeUpTimeTask(done, inStream)
    
        // 使用chRange安全遍历打印
        for val := range chRange(done,resultChan){
            fmt.Printf("%v ", val)
        }
    }
    

    结果输出:处理20个数据花费20s

    === RUN   TestDemo1
    0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 --- PASS: TestDemo81 (20.05s)
    PASS
    

    这种情况很显然是可以用并发对耗时任务进行改进的,使用扇入扇出的思路可以这样做:

    
    // 扇出处理耗时任务
    func fanOut(done <-chan interface{}, chanStream chan interface{}) []<-chan interface{} {
        numFinders := runtime.NumCPU()
        finders := make([]<-chan interface{}, numFinders)
        for i := 0; i < numFinders; i++ {
            // 耗时任务的分流管道
            finders[i] = takeUpTimeTask(done, chanStream)
        }
        return finders
    }
    
    // 扇入汇流结果通道
    func fanIn(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
        var wg sync.WaitGroup
        multiplexedStream := make(chan interface{})
        // 管道汇流处理
        multiplex := func(c <-chan interface{}) {
            defer wg.Done()
            for i := range c {
                select {
                case <-done:
                    return
                case multiplexedStream <- i:
                }
            }
        }
        // 从所有的通道中取数据
        wg.Add(len(channels))
        for _, c := range channels {
            go multiplex(c)
        }
        // 等待所有数据汇总完毕
        go func() {
            wg.Wait()
            close(multiplexedStream)
        }()
        return multiplexedStream
    }
    

    改进之后我们再来调用它们:

    // 扇入扇出处理
    func Demo2() {
        done := make(chan interface{})
        defer close(done)
        inStream := make(chan interface{})
    
        go func() {
            defer close(inStream)
            for i := 0; i < 20; i++ {
                inStream <- i
            }
        }()
    
        // 扇入扇出执行耗时任务
        resultChan := fanIn(done, fanOut(done, inStream)...)
    
        // 使用chRange安全遍历打印
        for val := range chRange(done,resultChan){
            fmt.Printf("%v ", val)
        }
    }
    

    结果输出:四个逻辑单元并发运行耗时5s,效率提升4倍

    === RUN   TestDemo82
    0 1 3 2 4 6 5 7 8 10 9 11 12 13 14 15 16 18 17 19 --- PASS: TestDemo82 (5.02s)
    PASS
    

    四、动态创建协程

    根据需要来动态创建goroutine,并限制可并发的协程数。

    // 设置默认最多开启5子协程
    const maxRoutineNum = 5
    
    func autoRoutine(wg *sync.WaitGroup, inStream <-chan int) {
        for {
            in, ok := <-inStream
            if !ok {
                break
            }
            // 自动开启协程的条件:输入流为偶数
            if in%2 == 0 && runtime.NumGoroutine() < maxRoutineNum{
                wg.Add(1)
                go process(in, func() { wg.Done() })
            } else {
                process(in, nil)
            }
        }
    }
    
    func process(in int, callback func()) {
        if callback != nil {
            defer callback()
            fmt.Printf("sub routine process:%d \n", in)
            time.Sleep(1000000 * time.Microsecond)
        }else{
            fmt.Printf("parent routine process:%d \n", in)
            time.Sleep(100000 * time.Microsecond)
        }
    }
    
    

    调用示例:

    // 动态创建协程
    func Demo() {
        wg := sync.WaitGroup{}
        inStream := make(chan int)
        wg.Add(1)
        go func() {
            defer wg.Done()
            defer close(inStream)
            for i := 1; i < 10; i++ {
                inStream <- i
            }
        }()
        autoRoutine(&wg, inStream)
        wg.Wait()
    }
    

    运行结果:可以看到,程序根据要求最多开不超过5个子协程

    === RUN   TestDemo121
    parent routine process:1 
    sub routine process:2 
    parent routine process:3 
    parent routine process:5 
    sub routine process:4 
    parent routine process:6 
    parent routine process:7 
    parent routine process:8 
    parent routine process:9 
    --- PASS: TestDemo121 (1.21s)
    PASS
    

    以上只是演示该模式的一个简单例子,但它可以扩展到许多应用,如对不定数量的资源进行计算,其中某些资源比较耗时,需要开启协程提高执行效率,但协程数不能无限扩张。一个具体的例子就是使用go标准包的filepath.Walk()对系统特点目录的文件进行哈希计算,显然对某个目录的文件数是事先不确定的,且大文件的哈希计算耗时比较大,这就需要开启子协程计算,而协程又不能无限制扩张,因为系统对打开文件数又有限制。这种场景就非常适合使用这种模式,感兴趣的可自己实现一下。

    相关文章

      网友评论

        本文标题:Go 并发编程:Goroutine常见应用范式

        本文链接:https://www.haomeiwen.com/subject/ntwnkhtx.html