美文网首页异步编程
异步编程二:promise模式

异步编程二:promise模式

作者: 青_雉 | 来源:发表于2020-01-18 22:51 被阅读0次

    书接上回,我们聊了为什么要有异步编程模式以及异步编程的魅力。
    本文主要聊一聊异步编程中的promise模式;promise模式广泛应用于前端开发
    ,所以故事先从前端开发讲起。
    因为前端的ajax技术,与服务端通信皆是基于异步编程,需要处理各种回调;
    回调本身并不可怕,但是如果需要对多个回调结果进行compose, 那就比较麻烦了,会出现所谓的回调地狱;在异步编程方面,前端同学先痛起来了,所以,在这个方向上,前端同学确实走的也比较靠前,模式也比较成熟。
    那么对于java而言,先不谈协程的玩法,异步编程可选的模式就不多了,其中一个便是promise模式,以及接下来会进行介绍的reactor模式。

    接下来将按照案例对promise模式进行安利。
    (特别提示,本系列文章皆是采用kotlin编写,写过kotlin之后,真的很难切回java,由俭入奢易,由奢入俭难啊)

    案例一

    有一远程服务名曰“计算器”,提供加法和减法两个http接口

    • 计算 n + m :/add?a=n&b=m
    • 计算 n - m :/sub?a=n&b=m
      基于上面的计算器服务,需要在我们的服务中开发一个接口,计算 m + n - l, 采用异步代码该如何写呢?此处基于vert.x进行示范
     override fun start(startPromise: Promise<Void>?) {
        webClient = WebClient.create(vertx)
    
        var eventBus = vertx.eventBus()
        eventBus.consumer<JsonObject>("calc.promise"){ msg ->
            var msgBody = msg.body()
            var m = msgBody.getInteger("m", 0)
            var n = msgBody.getInteger("n", 0)
            var l = msgBody.getInteger("l", 0)
    
            webClient.get(7777, "pi", "/add?a=$m&b=$n").send{
                if (it.succeeded()) {
                    var `m + n` = it.result().bodyAsString().toInt()
                    // 此处是vertx的webClient,是一个异步的http client
                    // 7777是端口,pi是host(在下部署在树莓派上做的测试),后面是uri
                    webClient.get(7777, "pi", "/sub?a=$`m + n`&b=$l").send{
                        if (it.succeeded()) {
                            var `m + n - l` = it.result().bodyAsString().toInt()
                            replyHandler(`m + n - l`.toString())
                        } else {
                            replyHandler("calc error: m + n - l")
                        }
                    }
                } else {
                    replyHandler("calc error: m + n")
                }
            }
        }
    }
    

    上面是一段基于callback的写法,略微有一些回调地狱迷之缩进
    那么如果采用promise模式,在本案例的场景下,是可以缓解回调地狱迷之缩进,大家感受一下(此处同样也是使用了vert.x里的promise类):

     override fun start(startPromise: Promise<Void>?) {
        webClient = WebClient.create(vertx)
    
        var eventBus = vertx.eventBus()
        eventBus.consumer<JsonObject>("calc.promise"){ msg ->
            var msgBody = msg.body()
            var m = msgBody.getInteger("a", 0)
            var n = msgBody.getInteger("b", 0)
            var l = msgBody.getInteger("c", 0)
    
            add(m, n).future().onSuccess {
                sub(it, l).future().onSuccess {
                    msg.reply(it.toString())
                }
            }
        }
    }
    
    fun add(a: Int, b: Int) : Promise<Int> {
        var promise = Promise.promise<Int>()
    
        webClient.get(7777, "pi", "/add?a=$a&b=$b").send{
            if (it.succeeded()) {
                var addResult = it.result().bodyAsString().toInt()
                promise.complete(addResult)
            } else {
                promise.fail("calc failed $a add $b")
            }
        }
    
        return promise
    }
    
    fun sub(a: Int, b: Int) : Promise<Int> {
        var promise = Promise.promise<Int>()
    
        webClient.get(7777, "pi", "/sub?a=$a&b=$b").send{
            if (it.succeeded()) {
                var addResult = it.result().bodyAsString().toInt()
                promise.complete(addResult)
            } else {
                promise.fail("calc failed $a sub $b")
            }
        }
    
        return promise
    }
    

    上面这种写法是基于promise模式的,虽然有所缓解,但主要功效还是在把一些代码抽离了,核心的计算逻辑,依然还是迷之缩进的,莫着急,后面还有内容。
    此处先仔细看下这两段代码,vertx原生提供的io.vertx.core.Promise类,就是用来做promise模式的, 那么promise相比于callback,改变到底是啥?经过日夜揣摩,在下得出结论:

    当调用一个异步方法时,promise 模式可以把针对该方法的执行结果的处理逻辑从入参(回调函数),变为返回值;这样带来的编程变化是:基于返回值的处理,更方便的支持链式写法,把缩进改为一条链

    这个话题先聊到这里,稍微找一找promise的感觉,咱们接着案例往下聊

    案例二

    基于计算器服务,实现一个接口: a + ((b -c)+ d) -e -f + g
    先看看基于回调怎么写:

    //源码地址:https://github.com/HongkaiWen/vertx-kotlin/blob/promise/src/main/kotlin/com/github/hongkaiwen/reactor/vk/calc/CallbackVerticle.kt
    
    class CallbackVerticle : AbstractVerticle(){
    
        lateinit var webClient: WebClient
    
        override fun start(startPromise: Promise<Void>?) {
            webClient = WebClient.create(vertx)
    
            var eventBus = vertx.eventBus()
            eventBus.consumer<JsonObject>("calc.callback"){
                var msgBody = it.body()
                var a = msgBody.getInteger("a", 0)
                var b = msgBody.getInteger("b", 0)
                var c = msgBody.getInteger("c", 0)
                var d = msgBody.getInteger("d", 0)
                var e = msgBody.getInteger("e", 0)
                var f = msgBody.getInteger("f", 0)
                var g = msgBody.getInteger("g", 0)
                calc(a, b, c, d, e, f, g){reply ->
                    it.reply(reply)
                }
            }
        }
    
        fun calc(a: Int, b: Int, c: Int, d: Int, e: Int, f: Int, g: Int, replyHandler: (String) -> Unit){
            webClient.get(7777, "pi", "/sub?a=$b&b=$c").send{
                if (it.succeeded()) {
                    var `b-c` = it.result().bodyAsString().toInt()
                    webClient.get(7777, "pi", "/add?a=$`b-c`&b=$d").send{
                        if (it.succeeded()) {
                            var `(b-c)+d` = it.result().bodyAsString().toInt()
                            webClient.get(7777, "pi", "/add?a=$a&b=$`(b-c)+d`").send{
                                if (it.succeeded()) {
                                    var `a+(b-c)+d` = it.result().bodyAsString().toInt()
                                    webClient.get(7777, "pi", "/sub?a=$`a+(b-c)+d`&b=$e").send{
                                        if (it.succeeded()) {
                                            var `a+(b-c)+d-e` = it.result().bodyAsString().toInt()
                                            webClient.get(7777, "pi", "/sub?a=$`a+(b-c)+d-e`&b=$f").send{
                                                if (it.succeeded()) {
                                                    var `a+(b-c)+d-e-f` = it.result().bodyAsString().toInt()
                                                    webClient.get(7777, "pi", "/add?a=$`a+(b-c)+d-e-f`&b=$g").send{
                                                        if (it.succeeded()) {
                                                            var `a+(b-c)+d-e-f+g` = it.result().bodyAsString().toInt()
                                                            replyHandler(`a+(b-c)+d-e-f+g`.toString())
                                                        } else {
                                                            replyHandler("calc error: a + (b - c) + d -e -f + g")
                                                        }
                                                    }
                                                } else {
                                                    replyHandler("calc error: a + (b - c) + d -e -f")
                                                }
                                            }
    
                                        } else {
                                            replyHandler("calc error: a + (b - c) + d -e")
                                        }
                                    }
                                } else {
                                    replyHandler("calc error: a + (b - c) + d")
                                }
                            }
                        } else {
                            replyHandler("calc error: (b - c) + d")
                        }
                    }
                } else {
                    replyHandler("calc error: b - c")
                }
            }
        }
    
    }
    

    上面这段代码,实在让人吐血,终于知道啥叫地狱了, 针对上面的方法,基于promise来做改进,效果如何呢:

    //源码地址:https://github.com/HongkaiWen/vertx-kotlin/blob/promise/src/main/kotlin/com/github/hongkaiwen/reactor/vk/calc/PromiseVerticle.kt
    
    class PromiseVerticle : AbstractVerticle(){
    
    
        lateinit var webClient: WebClient
    
        override fun start(startPromise: Promise<Void>?) {
            webClient = WebClient.create(vertx)
    
            var eventBus = vertx.eventBus()
            eventBus.consumer<JsonObject>("calc.promise"){ msg ->
                var msgBody = msg.body()
                var a = msgBody.getInteger("a", 0)
                var b = msgBody.getInteger("b", 0)
                var c = msgBody.getInteger("c", 0)
                var d = msgBody.getInteger("d", 0)
                var e = msgBody.getInteger("e", 0)
                var f = msgBody.getInteger("f", 0)
                var g = msgBody.getInteger("g", 0)
    
                sub(b, c).future().onSuccess {
                    add(it, d).future().onSuccess {
                        add(a, it).future().onSuccess {
                            sub(it, e).future().onSuccess {
                                sub(it, f).future().onSuccess {
                                    add(it, g).future().onSuccess {
                                        msg.reply(it.toString())
                                    }
                                }
                            }
                        }
                    }
                }
    
            }
        }
    
        fun add(a: Int, b: Int) : Promise<Int> {
            var promise = Promise.promise<Int>()
    
            webClient.get(7777, "pi", "/add?a=$a&b=$b").send{
                if (it.succeeded()) {
                    var addResult = it.result().bodyAsString().toInt()
                    promise.complete(addResult)
                } else {
                    promise.fail("calc failed $a add $b")
                }
            }
    
            return promise
        }
    
        fun sub(a: Int, b: Int) : Promise<Int> {
            var promise = Promise.promise<Int>()
    
            webClient.get(7777, "pi", "/sub?a=$a&b=$b").send{
                if (it.succeeded()) {
                    var addResult = it.result().bodyAsString().toInt()
                    promise.complete(addResult)
                } else {
                    promise.fail("calc failed $a sub $b")
                }
            }
    
            return promise
        }
    
    
    }
    

    大地狱和小地狱的区别而已,因此就在思忖,貌似使用姿势不对,基于promise的代码依然是迷之缩进的,除非我们可以把promise的处理逻辑做成链式的

    JavaScript的promise

    感觉上面的路子还是不对,所以我们来参考下前端同学是如何玩promise的;因为在下对JavaScript不感兴趣,所以不求甚解的从大神的博客 摘取一段,如下:

    function multiply(input) {
        return new Promise(function (resolve, reject) {
            log('calculating ' + input + ' x ' + input + '...');
            setTimeout(resolve, 500, input * input);
        });
    }
    
    // 0.5秒后返回input+input的计算结果:
    function add(input) {
        return new Promise(function (resolve, reject) {
            log('calculating ' + input + ' + ' + input + '...');
            setTimeout(resolve, 500, input + input);
        });
    }
    
    var p = new Promise(function (resolve, reject) {
        log('start new Promise...');
        resolve(123);
    });
    
    p.then(multiply)
     .then(add)
     .then(multiply)
     .then(add)
     .then(function (result) {
        log('Got value: ' + result);
    });
    

    发现JavaScript里的promise是链式的(就应该这样嘛),那么上面的例子里的姿势一定是错了,问题在哪呢?
    JavaScript里的用法,promise是通过then函数进行顺序链式条用的,那么这个then函数,输入是一个处理函数,返回的是一个新的promise才行,这样可以针对一环一环的处理作出管道的效果来。
    因为我采用的vertx的promise来做研究,这个类本身处理promise的结果的方法 onSuccess返回的是future本身,并不是一个新的promise的future,所以不好做链式调用:

      @Fluent
      default Future<T> onSuccess(Handler<T> handler) {
        return onComplete(ar -> {
          if (ar.succeeded()) {
            handler.handle(ar.result());
          }
        });
      }
    

    改进方案

    仔细看了一圈io.vertx.core.Promise的相关方法, 发现compose正是我们的解药。

    class PromiseLineVerticle3 : AbstractVerticle(){
    
    
        lateinit var webClient: WebClient
    
        override fun start(startPromise: Promise<Void>?) {
            webClient = WebClient.create(vertx)
    
            var eventBus = vertx.eventBus()
            eventBus.consumer<JsonObject>("calc.promise.line3"){ msg ->
                var msgBody = msg.body()
                var a = msgBody.getInteger("a", 0)
                var b = msgBody.getInteger("b", 0)
                var c = msgBody.getInteger("c", 0)
                var d = msgBody.getInteger("d", 0)
                var e = msgBody.getInteger("e", 0)
                var f = msgBody.getInteger("f", 0)
                var g = msgBody.getInteger("g", 0)
    
                //只要一个onFailure即可
                (b asyncSub c)
                    .compose { it asyncAdd d }
                    .compose { it asyncAdd a }
                    .compose { it asyncSub e }
                    .compose { it asyncSub f }
                    .compose { it asyncAdd g }
                    .onSuccess { msg.reply(it.toString()) }
                    .onFailure{msg.fail(500, it.message)}
            }
        }
    
        infix fun Int.asyncAdd(input : Int) : Future<Int> {
            return calc(this, input, CalcOperator.add)
        }
    
        infix fun Int.asyncSub(input : Int) : Future<Int> {
            return calc(this, input, CalcOperator.sub)
        }
    
        /**
         * 所有异常必须被处理
         */
        fun calc(a: Int, b: Int, operator: CalcOperator) : Future<Int> {
            var promise = Promise.promise<Int>()
    
            webClient.get(7777, "pi", "/${operator.name}?a=$a&b=$b").send{
                if (it.succeeded()) {
                    try{
                        var addResult = it.result().bodyAsString().toInt()
                        promise.complete(addResult)
                        println("$a - $b = $addResult")
                    } catch (e: Exception) {
                        promise.fail(e)
                    }
                } else {
                    it.cause().printStackTrace()
                    promise.fail("calc failed $a add $b")
                }
            }
    
            return promise.future()
        }
    }
    
    enum class CalcOperator {
        add, sub
    }
    

    通过compose函数代码清爽了许多。
    特别要提一下的是,上面的写法it asyncAdd d是kotlin的中缀表达式,另外这个地方的it,java程序员一定会问怎么没有声明过就使用?对的,kotlin中的lambda如果只有一个参数,就可以默认用it代替,写法简单而且避免不知道该其起啥变量名。

    基于CompletableFuture实现promise

    因为笔者最近vert.x用的多,各种例子都是基于vert.x的;但其实如果把java和promise一起做搜索的话,得到的结果一般可能是jdk的CompletableFuture实现;而且vert.x社区也有roadmap准备向jdk的CompletionStage靠拢(RFC),所以再写一个CompletableFuture的实现版本:

    class PromiseLineVerticle4 : AbstractVerticle(){
    
    
        lateinit var webClient: WebClient
    
        override fun start(startPromise: Promise<Void>?) {
            webClient = WebClient.create(vertx)
    
            var eventBus = vertx.eventBus()
            eventBus.consumer<JsonObject>("calc.promise.line4"){ msg ->
                var msgBody = msg.body()
                var a = msgBody.getInteger("a", 0)
                var b = msgBody.getInteger("b", 0)
                var c = msgBody.getInteger("c", 0)
                var d = msgBody.getInteger("d", 0)
                var e = msgBody.getInteger("e", 0)
                var f = msgBody.getInteger("f", 0)
                var g = msgBody.getInteger("g", 0)
    
    
                (b asyncSub c)
                    .thenCompose { it asyncAdd d }
                    .thenCompose { it asyncAdd a }
                    .thenCompose { it asyncSub e }
                    .thenCompose { it asyncSub f }
                    .thenCompose { it asyncAdd g }
                    .thenAccept { msg.reply(it.toString()) }
                    .exceptionally {
                        msg.fail(500, it.message)
                        null
                    }
            }
        }
    
    
    
        infix fun Int.asyncAdd(input : Int) : CompletableFuture<Int> {
            return calc(this, input, CalcOperator.add)
        }
    
        infix fun Int.asyncSub(input : Int) : CompletableFuture<Int> {
            return calc(this, input, CalcOperator.sub)
        }
    
        /**
         * 所有异常必须被处理
         */
        fun calc(a: Int, b: Int, operator: CalcOperator) : CompletableFuture<Int> {
            var promise = CompletableFuture<Int>()
    
            webClient.get(7777, "pi", "/${operator.name}?a=$a&b=$b")
                .expect(ResponsePredicate.SC_OK).send{
                if (it.succeeded()) {
                    try{
                        var addResult = it.result().bodyAsString().toInt()
                        promise.complete(addResult)
                        println("$a - $b = $addResult")
                    } catch (e: Exception) {
                        promise.completeExceptionally(e)
                    }
                } else {
                    it.cause().printStackTrace()
                    promise.completeExceptionally(RuntimeException("calc failed $a add $b"))
                }
            }
    
            return promise
        }
    }
    

    其实无论是vert.x的promise还是jdk的CompletableFuture, 都是promise模式,写发生也比较类似,代码整洁度上也相似,当然笔者很乐于看到java生态圈能够对此形成标准,就和JS的ES6统一定义了前端的promise一样,免得很多口水。

    实践上升理论

    基于上面的实践,相信对promise是有一点感觉的了。那么需要实践上升理论了,理论参见:
    https://en.wikipedia.org/wiki/Futures_and_promises
    理论说明相关文章很多,笔者理解主要是状态机的维护,笔者就不再献丑了,后续如果得空在写一篇vert.x的promise源码分析的文章。

    promise是异步编程的一个模式,在某些场景下可以解决回调地狱的问题。
    接下来的一篇文章讲来聊聊reactor模式。

    系列文章快速导航:
    异步编程一:异步编程的魅力
    异步编程二:promise模式
    异步编程三:reactor模式
    异步编程四:协程

    相关文章

      网友评论

        本文标题:异步编程二:promise模式

        本文链接:https://www.haomeiwen.com/subject/jtbonctx.html