美文网首页Golang
golang并发三板斧系列之三:context用于退出

golang并发三板斧系列之三:context用于退出

作者: 白想519 | 来源:发表于2019-05-29 09:38 被阅读3次

    这是本系列文章的第三篇,第一篇在此golang并发三板斧系列之一:channel用于通信和同步,第二篇在此golang并发三板斧系列之二:goroutine池用于并发

    前文描述了手工作坊的时代和工业时代,现在我们进入信息时代。

    万恶的资本主义

    前文描述的工业时代其实是资本主义,来到世间的每个毛孔都在滴血。不信你看前文的代码,gen之类的函数创建了一堆任务之后就扔给下游的works处理了,也不管他们要处理多久,是不是加班到深夜,是不是996ICU。

    func BenchmarkCPUPool(b *testing.B) {
        channum := 100
        gonum := runtime.NumCPU()
        for i := 0; i < b.N; i++ {
            f := func(w *Work) {
                if v, ok := w.input.(float64); ok {
                    cpubound(v)
                }
            }
            list := benchmarkList()
            c := genPoolChanBuffer(list)
    
            p := InitPool(channum, gonum, f)
            p.RunWorker()
            p.FeedWorker(c)
            p.Wait()
        }
    }
    

    人民当家作主

    感谢毛主席,解放之后我们进入了社会主义,我们有了工会这个关爱人民的组织。人民的工会爱人民,人民的工会力量大,工会可以在合适的时候给大家放假,让大家休息,对应到程序中就是按下了神圣的ctrl+c

    如下是最粗暴的模型,工会一喊放假,大家都放下手上的工作开心的玩耍了。但是有的工作做了一半就放弃了,这确实是很没有职业操守的:

    var WorkPool chan work
    
    type work struct {
        sth string
    }
    
    func (w work) Working() {
        log.Printf("start %s", w.sth)
    
        time.Sleep(2 * time.Second)
    
        log.Printf("end %s", w.sth)
    }
    
    func RunWorkerSimple() {
        workers := 2
        for i := 0; i < workers; i++ {
            go HandleWorkerSimple()
        }
    }
    
    func HandleWorkerSimple() {
        for {
            select {
            case work := <-WorkPool:
                work.Working()
            }
        }
    
        return
    }
    
    func TestWorkerSimple(t *testing.T) {
        WorkPool = make(chan work)
        go RunWorkerSimple()
    
        go func() {
            list := benchmarkList()
            for _, l := range list {
                WorkPool <- work{fmt.Sprint(l)}
            }
        }()
    
        select {}
    }
    

    很明显从结果可以看出,有工作编号为2和3的没有完成就被扔掉了,其余没有启动的工作都放弃了。

    C02S259EFVH3:go_concurrency baixiao$ go test -run TestWorkerSimple
    2019/04/08 22:56:11 start 1
    2019/04/08 22:56:11 start 0
    2019/04/08 22:56:13 end 0
    2019/04/08 22:56:13 start 2
    2019/04/08 22:56:13 end 1
    2019/04/08 22:56:13 start 3
    ^Csignal: interrupt
    FAIL    _/Users/baixiao/Go/src/github.com/baixiaoustc/go_concurrency    2.790s
    

    要对得起这份工作

    人民的工人为人民,所以即便工会保障了工人的权益,该做好的工作还是要认真做完啊。对应到程序中,收到ctrl+c中断后,每个worker应该完成手上正在做的工作,并且由BOSS(主goroutine)把剩余队列中的工作保存起来(比如写数据库或者文件),留待明天上班继续做。

    代码写起来就复杂多了,要监控系统的中断信号,要等待所有worker处理完手上的事情,最后再把剩余的事情保存起来。需要用两个chan来通信,stopChan 用于通知workers下班啦,stoppedChan 用于所有worker处理完之后告知BOSS,再由BOSS保存剩余的工作。

    func (w work) Saving() {
        log.Printf("save %s", w.sth)
    }
    
    func RunWorkerHold(stop, stopped chan struct{}) {
        var wg sync.WaitGroup
        workers := 2
        for i := 0; i < workers; i++ {
            wg.Add(1)
            go HandleWorkerHold(stop, &wg)
        }
        wg.Wait()
        stopped <- struct{}{}
    }
    
    func HandleWorkerHold(stop chan struct{}, wg *sync.WaitGroup) {
        defer wg.Done()
    
        for {
            select {
            case work := <-WorkPool:
                work.Working()
            case <-stop:
                log.Println("worker: caller has told us to stop")
                return
            }
        }
    
        return
    }
    
    func TestWorkerHold(t *testing.T) {
        WorkPool = make(chan work)
        stopChan := make(chan struct{})
        stoppedChan := make(chan struct{})
        go RunWorkerHold(stopChan, stoppedChan)
    
        go func() {
            list := benchmarkList()
            for _, l := range list {
                WorkPool <- work{fmt.Sprint(l)}
            }
        }()
    
        // listen for C-c
        c := make(chan os.Signal, 1)
        signal.Notify(c, os.Interrupt)
        <-c
        log.Println("main: received C-c - shutting down")
    
        // tell the goroutine to stop
        log.Println("main: telling workers to stop")
        close(stopChan)
        // and wait for them to reply back
        <-stoppedChan
        log.Println("main: workers has told us they've finished")
    
        for work := range WorkPool {
            work.Saving()
        }
    
        return
    }
    

    这里没有把打印贴完,最终的结果是所有工作都save好了的:

    C02S259EFVH3:go_concurrency baixiao$ go test -run TestWorkerHold
    2019/04/08 23:32:02 start 1
    2019/04/08 23:32:02 start 0
    2019/04/08 23:32:04 end 1
    2019/04/08 23:32:04 start 2
    2019/04/08 23:32:04 end 0
    2019/04/08 23:32:04 start 3
    ^C2019/04/08 23:32:05 main: received C-c - shutting down
    2019/04/08 23:32:05 main: telling workers to stop
    2019/04/08 23:32:06 end 2
    2019/04/08 23:32:06 worker: caller has told us to stop
    2019/04/08 23:32:06 end 3
    2019/04/08 23:32:06 start 4
    2019/04/08 23:32:08 end 4
    2019/04/08 23:32:08 start 5
    2019/04/08 23:32:10 end 5
    2019/04/08 23:32:10 worker: caller has told us to stop
    2019/04/08 23:32:10 main: workers has told us they've finished
    2019/04/08 23:32:10 save 6
    2019/04/08 23:32:10 save 7
    2019/04/08 23:32:10 save 8
    2019/04/08 23:32:10 save 9
    2019/04/08 23:32:10 save 10
    2019/04/08 23:32:10 save 11
    2019/04/08 23:32:10 save 12
    2019/04/08 23:32:10 save 13
    2019/04/08 23:32:10 save 14
    2019/04/08 23:32:10 save 15
    。。。
    

    但是值得注意的是,不是close(stopChan)一执行,马上所有的worker都能结束工作了。如上其中一个worker在接着完成work4和work5之后才走了退出流程,是因为对于select来讲,如果多个chan都准备好了的话,是随机选择其中一个,所以会有概率一直接着work的。

    更进一步

    上面的模式还是有缺陷,如果worker下面还有徒弟怎么办(又新开了goroutine)?最后BOSS在做剩余工作的保存时也不想耽误太久怎么办?保存工作写数据库也想受控制(比如database/sql包提供了相应支持)怎么办?

    终于我们的主角登场了,golang提供了context模式用于解决goroutine的高效且安全退出问题,教程在网上很多了,不用细讲,只贴一下主要函数:

    // 带cancel返回值的Context,一旦cancel被调用,即取消该创建的context

    func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

    // 带有效期cancel返回值的Context,即必须到达指定时间点调用的cancel方法才会被执行

    func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)

    // 带超时时间cancel返回值的Context,类似Deadline,前者是时间点,后者为时间间隔
    // 相当于WithDeadline(parent, time.Now().Add(timeout)).

    func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

    最后进化到我们的代码,注意SavingDB()方法只是一个伪代码示意:

    import "database/sql"
    
    //fake, just a example
    func (w work) SavingDB(ctx context.Context) {
        log.Printf("save %s", w.sth)
    
        stmt := "select name from db.table"
        db := sql.DB{}
        conn, _ := db.Conn(ctx)
        rows, err := conn.QueryContext(ctx, stmt)
        if err != nil {
            if err == context.DeadlineExceeded {
                // context canceled
            }
            return
        }
    
        var name string
        for rows.Next() {
            if err := rows.Scan(&name); err != nil {
                if err == context.DeadlineExceeded {
                    log.Println("scan canceled")
                }
            }
        }
    }
    
    func RunWorkerContext(ctx context.Context, stopped chan struct{}) {
        var wg sync.WaitGroup
        workers := 2
        for i := 0; i < workers; i++ {
            wg.Add(1)
            go HandleWorkerContext(ctx, &wg)
        }
        wg.Wait()
        stopped <- struct{}{}
    }
    
    func HandleWorkerContext(ctx context.Context, wg *sync.WaitGroup) {
        defer wg.Done()
    
        for {
            select {
            case work := <-WorkPool:
                work.Working()
            case <-ctx.Done():
                log.Println("worker: caller has told us to stop")
                return
            }
        }
    
        return
    }
    
    func TestWorkerContext(t *testing.T) {
        WorkPool = make(chan work)
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
        stoppedChan := make(chan struct{})
        go RunWorkerContext(ctx, stoppedChan)
    
        go func() {
            list := benchmarkList()
            for _, l := range list {
                WorkPool <- work{fmt.Sprint(l)}
            }
        }()
    
        // listen for C-c
        c := make(chan os.Signal, 1)
        signal.Notify(c, os.Interrupt)
        <-c
        log.Println("main: received C-c - shutting down")
    
        // tell the goroutine to stop
        log.Println("main: telling workers to stop")
        cancel()
        // and wait for them to reply back
        <-stoppedChan
        log.Println("main: workers has told us they've finished")
    
        ctxTimeout, cancelTimeout := context.WithTimeout(ctx, 100*time.Microsecond)
        defer cancelTimeout()
        for {
            select {
            case work := <-WorkPool:
                work.SavingDB(ctxTimeout)
            case <-ctxTimeout.Done():
                log.Println("main: cann't wait any more")
                return
            }
        }
    
        return
    }
    

    通过cancel()方法通知该context.Context其下的所有goroutine进入退出流程,并可以启动带timeout的ctx开始保存工作流程,所有流程都是受控的。


    这像不像是现代企业的扁平化管理,BOSS直接控制所有员工,提升所有的工作效率?


    所有代码都在https://github.com/baixiaoustc/go_concurrency/blob/master/third_post_test.go中能找到。

    相关文章

      网友评论

        本文标题:golang并发三板斧系列之三:context用于退出

        本文链接:https://www.haomeiwen.com/subject/azqetctx.html