美文网首页Go 并发编程
Go 并发编程:防止Goroutine泄露

Go 并发编程:防止Goroutine泄露

作者: GoFuncChan | 来源:发表于2020-03-04 13:39 被阅读0次

    防止Goroutine泄露

    Goroutine开启后一般会一直执行到它终止,也有遇到不可恢复的错误(如协程内部错误或父协程退出)时被迫终止。如果没有一定的手段,父协程是没法控制子协程的,有没有什么方式让父协程控制或感知子协程的运行呢?答案是有的,一般有三种典型方式:

    • 使用done通道控制
    • 使用sync.WaitGroup同步组
    • 使用Context

    一、使用done通道控制

    创建一个监控通道done,使主协程知道其下的工作子协程的完成状态。

    最简单的经典并发写法,使用一个监控通道done,工作协程完成具体任务,主协程只监视工作子协程的完成状况,为了最大化通道的吞吐量,通道的缓冲数等于子协程数。

    func Demo11() {
        JobList := make([]Job, 0, 5)
        JobList = append(JobList, Job{"task1"}, Job{"task2"}, Job{"task3"}, Job{"task4"}, Job{"task5"})
    
        jobs := make(chan Job)
        done := make(chan bool, len(JobList))
    
        // 发送任务协程
        go func() {
            for _, job := range JobList {
                jobs <- job
            }
            // 发送完所有任务后关闭通道
            close(jobs)
        }()
    
        // 处理任务协程
        go func() {
            // 遍历通道直到管道被关闭
            for job := range jobs {
                fmt.Println("Doing ", job.task)
                done <- true
            }
        }()
    
        for i := 0; i < len(JobList); i++ {
            // 阻塞,等待接收任务完成的信号
            <-done
        }
        fmt.Println("All Task Done!")
    }
    

    使用done通道,我们可以在很大程度上防止协程泄露,即某些子协程失去控制以致未能正常关闭的情况。

    二、使用sync.WaitGroup同步组

    以上写法也能用等待组方式处理:
    关于等待组的用法,我们已在《Go基础系列》的《Go并发编程(三): Go并发的传统同步机制》已经简述,这里再做一个示例

    func Demo12() {
        JobList := make([]Job, 0, 5)
        JobList = append(JobList, Job{"task1"}, Job{"task2"}, Job{"task3"}, Job{"task4"}, Job{"task5"})
        jobs := make(chan Job)
    
        wg := sync.WaitGroup{}
    
        wg.Add(1)
        // 发送任务协程
        go func() {
            for _, job := range JobList {
                jobs <- job
            }
            // 发送完所有任务后关闭通道
            close(jobs)
            wg.Done()
        }()
    
        wg.Add(1)
        // 处理任务协程
        go func() {
            // 遍历通道直到通道被关闭
            for job := range jobs {
                fmt.Println("Doing ", job.task)
            }
            wg.Done()
        }()
    
        wg.Wait()
        fmt.Println("All Task Done!")
    }
    

    无论是done通道控制还是等待组,都是最常见的Go并发编程范式了。

    三、Contenxt 上下文

    关于Context,它类似与使用done通道防止协程泄露的方法,不过它的功能更加强大,它出现在Go标准库可见Go团队希望context成为默认的控制多协程的解决方案。我们已经在《Go进阶系列的》的《Go Context 上下文)已有介绍,这里再做一个演示:

    
    func Demo13() {
        JobList := make([]Job, 0, 5)
        JobList = append(JobList, Job{"task1"}, Job{"task2"}, Job{"task3"}, Job{"task4"}, Job{"task5"})
        jobs := make(chan Job)
        var sendCount, doneCount int
        var err error
        // 两种方式关闭子协程:(1)超时(2)执行cancelFunc函数
        ctx, cancelFunc := context.WithTimeout(context.TODO(), 5*time.Second)
        defer func() {
            fmt.Printf("JobCount:%d,SendCount:%d,DoneCount:%d\n", len(JobList), sendCount, doneCount)
            cancelFunc()
        }()
    
        go func() {
            sendCount, err = sendJobs(ctx, jobs, JobList)
            if err != nil {
                fmt.Println("SendJobs Error:", err.Error(), ",SendCount:", sendCount)
            }
        }()
    
        go func() {
            doneCount, err = do(ctx, jobs)
            if err != nil {
                fmt.Println("DoneJobs Error:", err.Error(), ",DoneCount:", doneCount)
            }
        }()
    
        time.Sleep(time.Duration(len(JobList)) * time.Second)
    
    }
    
    // 发送任务
    func sendJobs(ctx context.Context, jobs chan<- Job, jobList []Job) (int, error) {
        var sendCount int
        for _, job := range jobList {
            // 模拟耗时每秒添加一次任务
            time.Sleep(time.Second)
            select {
            case <-ctx.Done():
                return sendCount, ctx.Err()
            case jobs <- job:
                sendCount++
                fmt.Println("Send Job: ", job.task)
            }
        }
        // 发送完所有任务后关闭管道
        close(jobs)
        return sendCount, nil
    }
    
    // 处理任务
    func do(ctx context.Context, jobs <-chan Job) (int, error) {
        var doneCount int
        // 遍历管道直到管道被关闭
        for {
            select {
            case <-ctx.Done():
                return doneCount, ctx.Err()
            case job, ok := <-jobs:
                if !ok {
                    return doneCount, nil
                } else {
                    doneCount++
                    fmt.Println("Done Job: ", job.task)
                }
            }
        }
    }
    
    

    执行结果:

    === RUN   TestDemo13
    Done Job:  task1
    Send Job:  task1
    Send Job:  task2
    Done Job:  task2
    Send Job:  task3
    Done Job:  task3
    Send Job:  task4
    Done Job:  task4
    DoneJobs Error: context deadline exceeded ,DoneCount: 4
    JobCount:5,SendCount:0,DoneCount:4
    --- PASS: TestDemo13 (5.00s)
    PASS
    

    相关文章

      网友评论

        本文标题:Go 并发编程:防止Goroutine泄露

        本文链接:https://www.haomeiwen.com/subject/msvnkhtx.html