美文网首页
如何把golang的Channel玩出async和await的f

如何把golang的Channel玩出async和await的f

作者: 海之方 | 来源:发表于2020-02-10 23:30 被阅读0次

    引言

    如何优雅的同步化异步代码,一直以来都是各大编程语言致力于优化的点,记得最早是C# 5.0加入了async/await来简化TPL的多线程模型,后来Javascript的Promise也吸取这一语法糖,在ES 6中也加入了async和await.

    那么,被大家一称赞并发性能好、异步模型独树一帜的golang,能否也有async和await呢?

    其实,这对于golang的CSM来说一点也不难!

    核心代码如下:

    done := make(chan struct{})
    go func() {
        // do work asynchronously here
        //
        close(done)
    }()
    <-done
    

    是不是很简单呢? go rountine负责async, channel的负责await, 简直是完美!

    但这个代码看起来还是有点丑,而且这个go func(){}还没有返回值,虽说可以通过闭包来接收返回值,但那个代码就更难维护了。

    Go Promise

    代码难看不要紧,只要Don't repeat yourself (DRY),封装一下不就好了?

    type WorkFunc func() (interface{}, error)
    
    func NewPromise(workFunc WorkFunc) *Promise {
        promise := Promise{done: make(chan struct{})}
        go func() {
            defer close(promise.done)
            promise.res, promise.err = workFunc()
        }()
        return &promise
    }
    
    func (p *Promise) Done() (interface{}, error) {
        <-p.done
        return p.res, p.err
    }
    

    调用的代码如下:

    promise := NewPromise(func() (interface{}, error) {
        // do work asynchronously here
        //
        return res, err
    })
    
    // await
    res, err := promise.Done()
    

    是不是美观了许多呢?

    这个实现和Javascript的Promise的API是有很大差距,使用体验上因为golang没有泛型,也需要转来转去的,但为了不辜负Promise这个名字,怎么能没有then呢?

    type SuccessHandler func(interface{}) (interface{}, error)
    
    type ErrorHandler func(error) interface{}
    
    func (p *Promise) Then(successHandler SuccessHandler, errorHandler ErrorHandler) *Promise {
        newPromise := &Promise{done: make(chan struct{})}
        go func() {
            res, err := p.Done()
            defer close(newPromise.done)
            if err != nil {
                if errorHandler != nil {
                    newPromise.res = errorHandler(err)
                } else {
                    newPromise.err = err
                }
            } else {
                if successHandler != nil {
                    newPromise.res, newPromise.err = successHandler(res)
                } else {
                    newPromise.res = res
                }
            }
        }()
    
        return newPromise
    }
    

    有了then可以chain起来,是不是找到些Promise的感觉呢?

    完整代码请查看 promise.go

    Actor

    本来我的理解也就到些了,然后前段时间(说来也是一月有余了),看了Go并发设计模式之 Active Object这篇文章后, 发现如果有一个常驻协程在异步的处理任务,而且是FIFO的,那么这其实是相当于一个无锁的设计,可以简化对临界资源的操作。

    于是,我照着文章的思路,实现了下面的代码:

    // Creates a new actor
    func NewActor(setActorOptionFuncs ...SetActorOptionFunc) *Actor {
        actor := &Actor{buffer: runtime.NumCPU(), quit: make(chan struct{}), wg: &sync.WaitGroup{}}
        for _, setOptionFunc := range setActorOptionFuncs {
            setOptionFunc(actor)
        }
    
        actor.queue = make(chan request, actor.buffer)
    
        actor.wg.Add(1)
        go actor.schedule()
    
        return actor
    }
    
    // The long live go routine to run.
    func (actor *Actor) schedule() {
    loop:
        for {
            select {
            case request := <-actor.queue:
                request.promise.res, request.promise.err = request.work()
                close(request.promise.done)
            case <-actor.quit:
                break loop
            }
        }
        actor.wg.Done()
    }
    
    // Do a work.
    func (actor *Actor) Do(workFunc WorkFunc) *Promise {
        methodRequest := request{work: workFunc, promise: &Promise{
            done: make(chan struct{}),
        }}
        actor.queue <- methodRequest
        return methodRequest.promise
    }
    
    // Close actor
    func (actor *Actor) Close() {
        close(actor.quit)
        actor.wg.Wait()
    }
    

    一个简单的没啥意义的纯粹为了demo的测试用例如下:

    func TestActorAsQueue(t *testing.T) {
        actor := NewActor()
        defer actor.Close()
    
        i := 0
        workFunc := func() (interface{}, error) {
            time.Sleep(1 * time.Second)
            i++
            return i, nil
        }
    
        promise := actor.Do(workFunc)
        promise2 := actor.Do(workFunc)
    
        res2, _ := promise2.Done()
        res1, _ := promise.Done()
    
        if res1 != 1 {
            t.Fail()
        }
    
        if res2 != 2 {
            t.Fail()
        }
    }
    

    完整代码请查看 actor.go

    总结

    每个语言都有它的独特之处,在我的理解中,玩转golang的CSM模型,channel一定要用的6。

    于是,我创建了Channelx这个repo, 包含了对channel常用场景的封装,欢迎大家审阅,喜欢的就点个star。

    此系列其它文章:

    如何用Golang的channel实现消息的批量处理

    如何把Golang的channel用的如nodejs的stream一样丝滑

    相关文章

      网友评论

          本文标题:如何把golang的Channel玩出async和await的f

          本文链接:https://www.haomeiwen.com/subject/ujqtfhtx.html