美文网首页
27.Dubbo全链路异步

27.Dubbo全链路异步

作者: 山海树 | 来源:发表于2020-09-18 08:14 被阅读0次

    异步调用流程图

    image.png

    在消费端使用get()生成invoker的时候,如果开启了async=true,则会在DubboInvoker中设置RpcContext.getContext的future为AdapterFuture


    image.png

    通过查看AdapterFuture的构造方法

     public FutureAdapter(ResponseFuture future) {
            this.future = future;
            this.resultFuture = new CompletableFuture<>();
            future.setCallback(new ResponseCallback() {
                @Override
                public void done(Object response) {
                    Result result = (Result) response;
                    FutureAdapter.this.resultFuture.complete(result);
                    V value = null;
                    try {
                        value = (V) result.recreate();
                    } catch (Throwable t) {
                        FutureAdapter.this.completeExceptionally(t);
                    }
                    FutureAdapter.this.complete(value);
                }
    
                @Override
                public void caught(Throwable exception) {
                    FutureAdapter.this.completeExceptionally(exception);
                }
            });
        }
    

    可以看到实际上还是通过一个ResponseFuture 完成数据的回调赋值给CompleteFuture;


    image.png
    image.png
    image.png

    从以上三张图可以看到,生成Invoker的时候设置的ResponseFuture,其实是一个DefaultFuture,在初始化DefaultFuture的时候回根据请求id存储一份到DefaultFuture对象中,此时由于是异步调用,因此迅速返回,当有结果返回的时候,


    image.png

    消费端reveived方法接到数据,总终调用到DefaultFuture的reveived方法,此时根据requestId拿到response,后调invokeCallback()触发回调方法。

    服务端

    void handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
            Response res = new Response(req.getId(), req.getVersion());
            Object data;
            if (req.isBroken()) {
                data = req.getData();
                String msg;
                if (data == null) {
                    msg = null;
                } else if (data instanceof Throwable) {
                    msg = StringUtils.toString((Throwable)data);
                } else {
                    msg = data.toString();
                }
    
                res.setErrorMessage("Fail to decode request due to: " + msg);
                res.setStatus((byte)40);
                channel.send(res);
            } else {
                data = req.getData();
    
                try {
                    CompletableFuture<Object> future = this.handler.reply(channel, data);
                    if (future.isDone()) {
                        res.setStatus((byte)20);
                        res.setResult(future.get());
                        channel.send(res);
                        return;
                    }
    
                    future.whenComplete((result, t) -> {
                        try {
                            try {
                                if (t == null) {
                                    res.setStatus((byte)20);
                                    res.setResult(result);
                                } else {
                                    res.setStatus((byte)70);
                                    res.setErrorMessage(StringUtils.toString(t));
                                }
    
                                channel.send(res);
                            } catch (RemotingException var8) {
                                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + var8);
                            }
    
                        } finally {
                            ;
                        }
                    });
                } catch (Throwable var6) {
                    res.setStatus((byte)70);
                    res.setErrorMessage(StringUtils.toString(var6));
                    channel.send(res);
                }
    
            }
        }
    

    dubbo当方法有返回值的时候回调用HeaderExchangeHandler处理,在HeaderExchangeHandler中reveived最终调用到handleRequest()来处理请求。
    通过调用dubbo的reply()方法


    image.png

    该方法中将对方法的执行是否开启异步做出判断并进一步处理


    image.png
    此方法将对返回值做出判断
    image.png
    然后在HeaderExchangeHandler中对返回是否完成做判断,并发送数据。

    相关文章

      网友评论

          本文标题:27.Dubbo全链路异步

          本文链接:https://www.haomeiwen.com/subject/fnspektx.html