基于dubbo实现异步调用(1)

作者: shysheng | 来源:发表于2018-12-02 21:43 被阅读1300次

    1.前言

    Java中常见的实现异步调用的方式:
    1.ThreadPool
    2.CompletableFuture
    3.MQ
    4.BlockingQueue
    5.Fork/Join

    那么作为一款优秀的RPC框架,dubbo是如何实现异步调用的呢?本文将介绍2.6.x版本以来dubbo异步调用方式的演进。
    1.增加consumer配置
    2.参数回调(2.7.0已废弃,本文将不展开)
    3.事件通知
    4.直接定义返回CompletableFuture的服务接口
    5.利用AsyncFor注解实现客户端的同步转异步
    6.利用RpcContext.startAsync()实现服务端的同步转异步
    其中前面3种方式在2.6.x版本中就已支持,但参数回调在2.7.0版本中已废弃,后面3种则是在2.7.0版本中新增的方式。

    2.基于dubbo实现异步调用

    2.1 增加consumer配置

    这种方式很简单,只需要在服务引用时增加<dubbo:method>配置即可,如下所示,其中name为需要异步调用的方法名,async表示是否启用异步调用。

    <dubbo:reference id="asyncService" check="false" interface="com.alibaba.dubbo.demo.AsyncService" url="localhost:20880">
        <dubbo:method name="sayHello" async="true" />
    </dubbo:reference>
    

    此时consumer端有3种调用方式:

    • 由于配置了异步调用,因此此时直接调用将返回null:
    String result = asyncService.sayHello("world");
    
    • 通过RpcContext获取Future对象,调用get方法时阻塞知道返回结果:
    asyncService.sayHello("world");
    Future<String> future = RpcContext.getContext().getFuture();
    String result = future.get();
    
    • 通过ResponseFuture设置回调,执行完成会回调done方法,抛异常则会回调caught方法:
    asyncService.sayHello("world");
    ResponseFuture responseFuture = ((FutureAdapter)RpcContext.getContext().getFuture()).getFuture();
    responseFuture.setCallback(new ResponseCallback() {
        @Override
        public void done(Object response) {
            System.out.println("done");
        }
    
        @Override
        public void caught(Throwable exception) {
            System.out.println("caught");
        }
    });
    
    try {
        System.out.println("result = " + responseFuture.get());
    } catch (RemotingException e) {
        e.printStackTrace();
    }
    

    如果只想异步调用,不需要返回值,则可以配置 return="false",这样可以避免Future对象的创建,此时RpcContext.getContext().getFuture()将返回null;

    2.2 直接定义返回CompletableFuture的服务接口

    在上述方式中,想获取异步调用的结果,需要从RpcContext中获取,使用起来不是很方便。基于java 8中引入的CompletableFuture,dubbo在2.7.0版本中也增加了对CompletableFuture的支持,我们可以直接定义一个返回CompletableFuture类型的接口。

    public interface AsyncService {
    
        String sayHello(String name);
    
        CompletableFuture<String> sayHelloAsync(String name);
    }
    

    服务端实现如下:

    public class AsyncServiceImpl implements AsyncService {
    
        @Override
        public String sayHello(String name) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return name;
        }
    
    
        @Override
        public CompletableFuture<String> sayHelloAsync(String name) {
            return CompletableFuture.supplyAsync(() -> name);
        }
    }
    

    如此一来,我们就实现了服务端的异步,客户端直接调用接口即可,不需要再从RpcContext中获取返回值:

    CompletableFuture<String> completableFuture = asyncService.sayHelloAsync("async");
    String result = completableFuture.get();
    
    2.3 事件通知

    dubbo允许consumer 端在调用之前、调用之后或出现异常时,触发 oninvoke、onreturn、onthrow 三个事件。类似于Spring中的前置增强、后置增强和异常抛出增强。只需要在服务引用时,增加以下配置指定事件通知的方法即可:

    <dubbo:reference id="asyncService" check="false" interface="com.alibaba.dubbo.demo.AsyncService" url="localhost:20880">
        <dubbo:method name="sayHello" 
                      oninvoke="notifyServiceImpl.onInvoke" 
                      onreturn="notifyServiceImpl.onReturn" 
                      onthrow="notifyServiceImpl.onThrow" />
    </dubbo:reference>
    

    事件通知服务如下:

    public class NotifyServiceImpl implements NotifyService {
    
        // 方法参数与调用方法参数相同
        @Override
        public void onInvoke(String name) {
            System.out.println("onInvoke: " + name);
        }
    
        // 第一个参数为调用方法的返回值,其余为调用方法的参数
        @Override
        public void onReturn(String retName, String name) {
            System.out.println("onReturn: " + name);
        }
    
        // 第一个参数为调用异常,其余为调用方法的参数
        @Override
        public void onThrow(Throwable ex, String name) {
            System.out.println("onThrow: " + name);
        }
    }
    

    与Spring增强不同的是,dubbo中的事件通知也可以是异步,只需要将调用方法配置为async="true"即可,但oninvoke方法无法异步执行。

    2.4 异步调用源码分析

    dubbo中的异步调用实际上是通过引入一个FutureFilter来实现的,关键源码如下。

    2.4.1 调用前获取方法信息
    @Activate(group = Constants.CONSUMER)
    public class FutureFilter implements PostProcessFilter {
    
        protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
    
        @Override
        public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
            fireInvokeCallback(invoker, invocation);
            // need to configure if there's return value before the invocation in order to help invoker to judge if it's
            // necessary to return future.
            return postProcessResult(invoker.invoke(invocation), invoker, invocation);
        }
        ...
    }
    

    在fireInvokeCallback()方法中,会首先调用getAsyncMethodInfo()获取目标方法的方法信息,看是否有配置事件通知:

    private ConsumerMethodModel.AsyncMethodInfo getAsyncMethodInfo(Invoker<?> invoker, Invocation invocation) {
        // 首先获取消费者信息
        final ConsumerModel consumerModel = ApplicationModel.getConsumerModel(invoker.getUrl().getServiceKey());
        if (consumerModel == null) {
            return null;
        }
    
        // 获取消费者对应的方法信息
        ConsumerMethodModel methodModel = consumerModel.getMethodModel(invocation.getMethodName());
        if (methodModel == null) {
            return null;
        }
    
        // 获取消费者对应方法的事件信息,即是否有配置事件通知
        final ConsumerMethodModel.AsyncMethodInfo asyncMethodInfo = methodModel.getAsyncInfo();
        if (asyncMethodInfo == null) {
            return null;
        }
        return asyncMethodInfo;
    }
    
    2.4.2 同步触发oninvoke事件

    获取到调用方法对应的信息后,回到fireInvokeCallback()方法:

    private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
        final ConsumerMethodModel.AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
        if (asyncMethodInfo == null) {
            return;
        }
    
        // 获取事件配置信息
        final Method onInvokeMethod = asyncMethodInfo.getOninvokeMethod();
        final Object onInvokeInst = asyncMethodInfo.getOninvokeInstance();
    
        if (onInvokeMethod == null && onInvokeInst == null) {
            return;
        }
        if (onInvokeMethod == null || onInvokeInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a oninvoke callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (!onInvokeMethod.isAccessible()) {
            onInvokeMethod.setAccessible(true);
        }
    
        // 获取方法参数
        Object[] params = invocation.getArguments();
        try {
            // 触发oninvoke事件
            onInvokeMethod.invoke(onInvokeInst, params);
        } catch (InvocationTargetException e) {
            // 触发onthrow事件
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);
        }
    }
    
    2.4.3 调用结果处理

    方法调用完成后,会回到postProcessResult()方法:

    @Override
    public Result postProcessResult(Result result, Invoker<?> invoker, Invocation invocation) {
    
        // 如果是异步调用,返回结果会被封装成AsyncRpcResult类型的对象,具体在哪里封装的,后面会讲到
        if (result instanceof AsyncRpcResult) {
            AsyncRpcResult asyncResult = (AsyncRpcResult) result;
            asyncResult.thenApplyWithContext(r -> {
                asyncCallback(invoker, invocation, r);
                return r;
            });
            return asyncResult;
        } else {
            syncCallback(invoker, invocation, result);
            return result;
        }
    }
    

    syncCallback和asyncCallback里面的逻辑比较简单,就是根据方法是正常返回还是抛异常,触发对应的事件。可以看到,如果被调用方法是同步的,则这两个事件也是同步的,反之亦然。

    2.4.4 方法调用核心过程

    在postProcessResult()方法中,第一个参数是invoker.invoke(invocation),这里就会走到下一个Filter链完成filter链的处理,最终调到原始服务,走到DubboInvoker#doInvoke方法:

    protected Result doInvoke(final Invocation invocation) throws Throwable {
        ...
        try {
            // 读取async配置
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // 读取future_generated/future_returntype配置,还没搞明白是干啥的
            boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
            // 读取return配置
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                // 如果配置return="true",future对象就直接设置为null
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                // 如果配置async="true",构建future对象
                ResponseFuture future = currentClient.request(inv, timeout);
                // For compatibility
                FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);
    
                // 同时将返回结果包装为AsyncResult对象
                Result result;
                if (isAsyncFuture) {
                    // register resultCallback, sometimes we need the asyn result being processed by the filter chain.
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                // 否则就是同步调用,future当然也是null
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        }
        ...
    }
    

    通过这个过程不难发现,不管是同步调用还是异步调用,最终都会走到ExchangeClient#send方法,再往下会走到HeaderExchangeChannel#request方法,这个一个异步方法,返回ResponseFuture对象。

        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request.
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
            try {
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }
    

    看到这里我才恍然大悟,原来dubbo中同步调用也是通过异步调用来实现,只是同步调用发起后,直接调用future#get的方法来同步等待结果的返回,而异步调用只返回Future Response,在用户需要关心其结果时才调用get方法。

    参考:
    http://dubbo.apache.org/zh-cn/blog/dubbo-invoke.html
    http://dubbo.apache.org/zh-cn/blog/dubbo-new-async.html
    https://mp.weixin.qq.com/s?__biz=MzUzNTY4NTYxMA==&mid=2247484959&idx=5&sn=654b4ae76e76ac1a436f2ceb1774f4a6&chksm=fa80f69acdf77f8c5371d4a929557d6ec7fba3020c3bbb1490f0835218e8aef7af285e91d097&scene=21#wechat_redirect

    相关文章

      网友评论

        本文标题:基于dubbo实现异步调用(1)

        本文链接:https://www.haomeiwen.com/subject/rcvscqtx.html