美文网首页
27.Dubbo全链路异步

27.Dubbo全链路异步

作者: 山海树 | 来源:发表于2020-09-18 08:14 被阅读0次

异步调用流程图

image.png

在消费端使用get()生成invoker的时候,如果开启了async=true,则会在DubboInvoker中设置RpcContext.getContext的future为AdapterFuture


image.png

通过查看AdapterFuture的构造方法

 public FutureAdapter(ResponseFuture future) {
        this.future = future;
        this.resultFuture = new CompletableFuture<>();
        future.setCallback(new ResponseCallback() {
            @Override
            public void done(Object response) {
                Result result = (Result) response;
                FutureAdapter.this.resultFuture.complete(result);
                V value = null;
                try {
                    value = (V) result.recreate();
                } catch (Throwable t) {
                    FutureAdapter.this.completeExceptionally(t);
                }
                FutureAdapter.this.complete(value);
            }

            @Override
            public void caught(Throwable exception) {
                FutureAdapter.this.completeExceptionally(exception);
            }
        });
    }

可以看到实际上还是通过一个ResponseFuture 完成数据的回调赋值给CompleteFuture;


image.png
image.png
image.png

从以上三张图可以看到,生成Invoker的时候设置的ResponseFuture,其实是一个DefaultFuture,在初始化DefaultFuture的时候回根据请求id存储一份到DefaultFuture对象中,此时由于是异步调用,因此迅速返回,当有结果返回的时候,


image.png

消费端reveived方法接到数据,总终调用到DefaultFuture的reveived方法,此时根据requestId拿到response,后调invokeCallback()触发回调方法。

服务端

void handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        Object data;
        if (req.isBroken()) {
            data = req.getData();
            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable)data);
            } else {
                msg = data.toString();
            }

            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus((byte)40);
            channel.send(res);
        } else {
            data = req.getData();

            try {
                CompletableFuture<Object> future = this.handler.reply(channel, data);
                if (future.isDone()) {
                    res.setStatus((byte)20);
                    res.setResult(future.get());
                    channel.send(res);
                    return;
                }

                future.whenComplete((result, t) -> {
                    try {
                        try {
                            if (t == null) {
                                res.setStatus((byte)20);
                                res.setResult(result);
                            } else {
                                res.setStatus((byte)70);
                                res.setErrorMessage(StringUtils.toString(t));
                            }

                            channel.send(res);
                        } catch (RemotingException var8) {
                            logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + var8);
                        }

                    } finally {
                        ;
                    }
                });
            } catch (Throwable var6) {
                res.setStatus((byte)70);
                res.setErrorMessage(StringUtils.toString(var6));
                channel.send(res);
            }

        }
    }

dubbo当方法有返回值的时候回调用HeaderExchangeHandler处理,在HeaderExchangeHandler中reveived最终调用到handleRequest()来处理请求。
通过调用dubbo的reply()方法


image.png

该方法中将对方法的执行是否开启异步做出判断并进一步处理


image.png
此方法将对返回值做出判断
image.png
然后在HeaderExchangeHandler中对返回是否完成做判断,并发送数据。

相关文章

网友评论

      本文标题:27.Dubbo全链路异步

      本文链接:https://www.haomeiwen.com/subject/fnspektx.html