美文网首页
Dubbo的服务调用(Reponse的同步、异步)

Dubbo的服务调用(Reponse的同步、异步)

作者: 就这些吗 | 来源:发表于2020-07-28 02:14 被阅读0次

    本系列主要参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。Dubbo版本为2.6.1。

    Dubbo的线程模型、handler
    我们已经讨论过了当收到请求的时候是怎么从Netty转到Dubbo的逻辑来的,再介绍了Handler的调用链路,并且分析了如何将解码这一IO操作Dubbo的业务线程池做的

    Dubbo的服务调用(心跳、编码、解码)介绍了解码编码的逻辑、以及消息头的内容。值得说明的是,当消费者发送Request到服务端,服务端进行解码还是服务端执行完毕之后将Reponse交给消费者 ,消费者进行解码,这两个操作并无太大区别,在DubboCodec#decodeBody的方法中会对这两者进行判断然后进入不同的if-else块,但两者逻辑极为相像。一个是返回了一个DecodeableRpcInvocation,一个是返回了DecodeableRpcResult

    这一章,我们关注的是,当消费者发送请求后,他是怎么收到这个请求对应的Reponse的。还可分为同步、异步两种。

    文章内容顺序:
    1. 同步接收Reponse
      1.1 DubboInvoker#doInvoke
      1.2 HeaderExchangeChannel#request
      1.3 Future接口及实现
      1.4 DefaultFuture
      1.5 HeaderExchangeHandler#received(DefaultFuture被调用的时机)
    2. 异步(回调)接收Reponse
      2.1 异步调用的XML配置
      2.2 DubboInvoker#doInvok
      2.3 RpcContext
      2.4 FutureFilter
      2.5 那么我们再来回顾下什么时候回调用到这个done方法呢?
      2.6 异步调用的代码例子
    

    1.同步接收Reponse

    1.1DubboInvoker#doInvoke

    先来看看同步的调用是怎么样的。

    image.png
    这里的currentClient#request最后调用的是HeaderExchangeChannel里的方法,具体的调用过程可看我的另一篇文章里的Dubbo创建过程Dubbo中的服务暴露

    1.2 HeaderExchangeChannel#request

    final class HeaderExchangeChannel implements ExchangeChannel {
        @Override
        public ResponseFuture request(Object request) throws RemotingException {
            return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
        }
    
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request. 创建请求
            Request req = new Request();
            req.setVersion("2.0.0");
            req.setTwoWay(true); // 需要响应
            req.setData(request);
            // 创建 DefaultFuture 对象
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                // 发送请求
                channel.send(req);
            } catch (RemotingException e) { // 发生异常,取消 DefaultFuture
                future.cancel();
                throw e;
            }
            // 返回 DefaultFuture 对象
            return future;
        }
    //省略其他代码
    }
    

    并没有啥特殊的,就是将request填充进了DefaultFuture对象,往下执行发送的链路后,接着就将这个Future返回了。

    1.3 Future接口及实现

    这个Future的接口及实现如下:


    image.png

    1.4 DefaultFuture

    那么就来看看这个DefaultFuture

    public class DefaultFuture implements ResponseFuture {
    
        /**
         * Future 集合
         *
         * key:请求编号
         */
        private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
        /**
         * 请求编号
         */
        // invoke id.
        private final long id;
        /**
         * 请求
         */
        private final Request request;
        /**
         * 超时
         */
        private final int timeout;
        /**
         * 锁
         */
        private final Lock lock = new ReentrantLock();
        /**
         * 完成 Condition
         */
        private final Condition done = lock.newCondition();
        /**
         * 创建开始时间
         */
        private final long start = System.currentTimeMillis();
        /**
         * 发送请求时间
         */
        private volatile long sent;
        /**
         * 响应
         */
        private volatile Response response;
        /**
         * 回调
         */
        private volatile ResponseCallback callback;
    
        public DefaultFuture(Channel channel, Request request, int timeout) {
            this.channel = channel;
            this.request = request;
            this.id = request.getId();
            this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // put into waiting map.
            FUTURES.put(id, this);
            CHANNELS.put(id, channel);
        }
        public boolean isDone() {
            return response != null;
        }
    
        public static void received(Channel channel, Response response) {
            try {
                // 移除 FUTURES
                DefaultFuture future = FUTURES.remove(response.getId());
                // 接收结果
                if (future != null) {
                    future.doReceived(response);
                } else {
                    logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response " + response
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                            + " -> " + channel.getRemoteAddress()));
                }
            // 移除 CHANNELS
            } finally {
                CHANNELS.remove(response.getId());
            }
        }
    
        private void doReceived(Response res) {
            // 锁定
            lock.lock();
            try {
                // 设置结果
                response = res;
                // 通知,唤醒等待
                if (done != null) {
                    done.signal();
                }
            } finally {
                // 释放锁定
                lock.unlock();
            }
            // 调用回调
            if (callback != null) {
                invokeCallback(callback);
            }
        }
    
        @Override
        public Object get() throws RemotingException {
            return get(timeout);
        }
    
        @Override
        public Object get(int timeout) throws RemotingException {
            if (timeout <= 0) {
                timeout = Constants.DEFAULT_TIMEOUT;
            }
            // 若未完成,等待
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    // 等待完成或超时
                    while (!isDone()) {
                        done.await(timeout, TimeUnit.MILLISECONDS);
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                // 未完成,抛出超时异常 TimeoutException
                if (!isDone()) {
                    throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
                }
            }
            // 返回响应
            return returnFromResponse();
        }
    
        private Object returnFromResponse() throws RemotingException {
            Response res = response;
            if (res == null) {
                throw new IllegalStateException("response cannot be null");
            }
            // 正常,返回结果
            if (res.getStatus() == Response.OK) {
                return res.getResult();
            }
            // 超时,抛出 TimeoutException 异常
            if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
                throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
            }
            // 其他,抛出 RemotingException 异常
            throw new RemotingException(channel, res.getErrorMessage());
        }
    //省略其他代码
    }
    
    • 上文提到,new了个DefaultFuture,并且将request填充进来,这里的构造方法我们看到,从request中获得了一个ID,这个ID是在AbstractClusterInvoker#invoke的时候就设置进来的,用来获取当前request对应的线程。

    • 而后调用了get方法,其中的isDone方法仅仅是判断response这一值是否为空,如果为空,说明调用未完成,会将当前线程加锁而后进入while循环,如果此时response不为空了,那么就会跳出这个while,进入Finally解锁后调用returnFromResponse方法返回结果。

    • 当我们收到Respouse的时候,在调用HeaderExchangeHandler#received时候会调用到该received方法,此时通过ID得到对应的future,会将得到的数据赋值给response,此时我们的response就不为空了,再通过LockCondition对线程进行signal,这样我们的get方法就能往下执行了。
      (注意到这里的doReceived方法中还有堆callback的判断,留待我们异步的时候再讲)

    1.5HeaderExchangeHandler#received(DefaultFuture被调用的时机)

    关于这个HeaderExchangeHandler#received还是简单贴一下代码,用以说明什么时候会调用到我们的DefaultFuture#received

    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
        
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                if (message instanceof Request) {
                    // 处理请求,省略
                } else if (message instanceof Response) {
                    // 处理响应
                    handleResponse(channel, (Response) message);
                } else if (message instanceof String) {
                    // telnet 相关,忽略
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    
        static void handleResponse(Channel channel, Response response) throws RemotingException {
            if (response != null && !response.isHeartbeat()) {
                // 继续向下调用
                DefaultFuture.received(channel, response);
            }
        }
    }
    
    

    以上,我们的同步调用简介至此。

    2. 异步(回调)接收Reponse

    接下来来看看异步,这边还是提一下,Dubbo在2.7版本里已经用了更好的异步方式,参照如下:如何基于Dubbo实现全异步调用链

    这边还是仅介绍下2.6版本中的异步方式。
    如官网链接所说的:事件通知,异步的方式需要我们配置 oninvokeonreturnonthrow这些参数。
    在调用之前、调用之后、出现异常时,会触发 oninvokeonreturnonthrow 三个事件,可以配置当事件发生时,通知哪个类的哪个方法。

    2.1 异步调用的XML配置

    XML的配置如下:

    <bean id="callBack" class="com.alibaba.dubbo.demo.CallBackImpl"/>
        <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"  mock="com.alibaba.dubbo.demo.DemoServiceImplMock">
    <dubbo:method name="sayHello" async="true" onreturn="callBack.onReturn"/>
    </dubbo:reference>
    

    callBack是我们实现的回调的方法,dubbo:method中的name表示对引用的方法的配置(比如这个sayHello就是DemoService中实现的方法,我们要对这个方法进行回调)
    async表示是否开启异步,而后onreturn标签则表示在回调的实现里面执行哪个方法。

    有了以上的基础知识我们再来看代码。
    异步调用需要大幅关联FutureFilter的代码,事实上这个过滤器在同步调用(就是asyncfalse的时候,这是默认值)时也会被调用,只是介绍同步时这些并不影响理解,所以放到了异步里。

    2.2 DubboInvoker#doInvok

    在介绍FutureFilter之前,我们这边暂时回到DubboInvoker#doInvok方法

    image.png
    注意这里他是在RpcContext里设置了一个FutureAdapter,并且直接返回了个空的RpcResult。

    2.3 RpcContext

    那这个RpcContext又是什么呢?

    public class RpcContext {
          private Future<?> future;
        private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
            @Override
            protected RpcContext initialValue() {
                return new RpcContext();
            }
    
        };
        public static RpcContext getContext() {
            return LOCAL.get();
        }
    
        public <T> Future<T> getFuture() {
            return (Future<T>) future;
        }
        public void setFuture(Future<?> future) {
            this.future = future;
        }
    //省略其他代码
    }
    

    可以看到这个RpcContext本质上是一个ThreadLocal,往里面存一个Future就是向ThreadLocal存一个Future,只有本线程能拿到。

    2.4 FutureFilter

    @Activate(group = Constants.CONSUMER)
    public class FutureFilter implements Filter {
    
        protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
    
        @Override
        public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
            // 获得是否异步调用
            final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
    
            // 触发前置方法
            fireInvokeCallback(invoker, invocation);
            // 调用方法
            Result result = invoker.invoke(invocation);
    
            // 触发回调方法
            if (isAsync) { // 异步回调
                asyncCallback(invoker, invocation);
            } else { // 同步回调
                syncCallback(invoker, invocation, result);
            }
            return result;
        }
        private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
            // 获得 Future 对象
            Future<?> f = RpcContext.getContext().getFuture();
            if (f instanceof FutureAdapter) {
                ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
                // 触发回调
                future.setCallback(new ResponseCallback() {
    
                    /**
                     * 触发正常回调方法
                     *
                     * @param rpcResult RPC 结果
                     */
                    public void done(Object rpcResult) {
                        if (rpcResult == null) {
                            logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                            return;
                        }
                        // must be rpcResult
                        if (!(rpcResult instanceof Result)) {
                            logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                            return;
                        }
                        Result result = (Result) rpcResult;
                        if (result.hasException()) { // 触发异常回调方法
                            fireThrowCallback(invoker, invocation, result.getException());
                        } else { // 触发正常回调方法
                            fireReturnCallback(invoker, invocation, result.getValue());
                        }
                    }
    
                    /**
                     * 触发异常回调方法
                     *
                     * @param exception 异常
                     */
                    public void caught(Throwable exception) {
                        fireThrowCallback(invoker, invocation, exception);
                    }
                });
            }
        }
        /**
         * 触发正常回调方法
         *
         * @param invoker Invoker 对象
         * @param invocation Invocation 对象
         * @param result RPC 结果
         */
        @SuppressWarnings("Duplicates")
        private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
            // 获得 `onreturn` 方法和对象
            final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
            final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
            //not set onreturn callback
            if (onReturnMethod == null && onReturnInst == null) {
                return;
            }
            if (onReturnMethod == null || onReturnInst == null) {
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
            }
            if (!onReturnMethod.isAccessible()) {
                onReturnMethod.setAccessible(true);
            }
    
            // 参数数组
            Object[] args = invocation.getArguments();
            Object[] params;
            Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
            if (rParaTypes.length > 1) {
                if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { // TODO 芋艿,泛化调用
                    params = new Object[2];
                    params[0] = result;
                    params[1] = args;
                } else {
                    params = new Object[args.length + 1];
                    params[0] = result;
                    System.arraycopy(args, 0, params, 1, args.length);
                }
            } else {
                params = new Object[]{result};
            }
    
            // 调用方法
            try {
                onReturnMethod.invoke(onReturnInst, params);
            } catch (InvocationTargetException e) {
                fireThrowCallback(invoker, invocation, e.getTargetException());
            } catch (Throwable e) {
                fireThrowCallback(invoker, invocation, e);
            }
        }
    
    
    
    • 这里就有的说了,可以看到这边FutureFilter#invoke先是往下调用,等返回一个result的时候再进行下面的操作,这个时候DubboInvoker#doInvoke方法返回了一个空的result

    • 进入了FutureFilter#asyncCallback方法,在这个方法里面拿到了我们刚才存的FutureAdapter(这就是为了适配 ResponseFuture 。通过这样的方式,对上层调用方,透明化 ResponseFuture的存在),并且给里面的ResponseFuture设置了一个匿名的ResponseCallback

    • 注意这个ResponseCallback#done方法会调用fireReturnCallback
      这个方法里会调用到我们最初在XML里配置的onrerun的方法。

    2.5 那么我们再来回顾下什么时候回调用到这个done方法呢?

    同样是在DefaultFuture#doReceived ,前面的调用链路与上面同步的调用都是一样的,

        private void doReceived(Response res) {
            // 锁定
            lock.lock();
            try {
                // 设置结果
                response = res;
                // 通知,唤醒等待
                if (done != null) {
                    done.signal();
                }
            } finally {
                // 释放锁定
                lock.unlock();
            }
            // 调用回调
            if (callback != null) {
                invokeCallback(callback);
            }
        }
    
        private void invokeCallback(ResponseCallback c) {
            ResponseCallback callbackCopy = c;
            if (callbackCopy == null) {
                throw new NullPointerException("callback cannot be null.");
            }
            Response res = response;
            if (res == null) {
                throw new IllegalStateException("response cannot be null. url:" + channel.getUrl());
            }
    
            // 正常,处理结果
            if (res.getStatus() == Response.OK) {
                try {
                    //这里调用了done方法
                    callbackCopy.done(res.getResult());
                } catch (Exception e) {
                    logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e);
                }
            // 超时,处理 TimeoutException 异常
            } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
                try {
                    TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
                    callbackCopy.caught(te);
                } catch (Exception e) {
                    logger.error("callback invoke error ,url:" + channel.getUrl(), e);
                }
            // 其他,处理 RemotingException 异常
            } else {
                try {
                    RuntimeException re = new RuntimeException(res.getErrorMessage());
                    callbackCopy.caught(re);
                } catch (Exception e) {
                    logger.error("callback invoke error ,url:" + channel.getUrl(), e);
                }
            }
        }
    

    当收到Response后,会唤醒线程,如果设置了callback对象,则会调用这个callbackdone方法(response正常的情况下,异常的话会走异常的链路),通过这个done方法再调用我们在XML中配置的方法。

    2.6 异步调用的代码例子

    这里再提一点:


    image.png
    • 比如Consumer进行如图的远程调用,
      如果没有配置异步调用async默认为falsehello这个值会正常打印
    • 如果设置了异步调用但是没有配置doreturn方法,那么这个hello会打印出null
    • 如果设置了异步调用且配置了doreturn方法,如下图
      image.png
      那么控制台就会打印
      image.png
      上面的null仍然是那个空的RpcResult,由main方法打印出来,下面的helloworld则是服务端的返回值,由CallBackImpl的打印语句打印了出来,而CallBackImplreturn值,似乎没啥用。

    相关文章

      网友评论

          本文标题:Dubbo的服务调用(Reponse的同步、异步)

          本文链接:https://www.haomeiwen.com/subject/ddkwkktx.html