美文网首页
Dubbo使用了CompletableFuture,实现了真异步

Dubbo使用了CompletableFuture,实现了真异步

作者: 九点半的马拉 | 来源:发表于2021-03-01 16:29 被阅读0次

    Dubbo在服务调用时支持同步调用和异步调用等方式。

    在Dubbo2.6版本及之前的版本在实现异步调用时存在一定的缺点,实际上是一种假异步。

    下面列举一个异步案例。

    // 此方法应该返回Foo,但异步后会立刻返回NULL
    fooService.findFoo(fooId);
    // 立刻得到当前调用的Future实例,当发生新的调用时这个东西将会被覆盖
    Future<Foo> fooFuture = RpcContext.getContext().getFuture();
    
    // 调用另一个服务的方法
    barService.findBar(barId);
    // 立刻得到当前调用的Future
    Future<Bar> barFuture = RpcContext.getContext().getFuture();
     
    // 此时,两个服务的方法在并发执行
    // 等待第一个调用完成,线程会进入Sleep状态,当调用完成后被唤醒。
    Foo foo = fooFuture.get();
    // 同上
    Bar bar = barFuture.get();
    // 假如第一个调用需要等待5秒,第二个等待6秒,则整个调用过程完成的时间是6秒。
    

    当调用服务方法后,Dubbo会创建一个DefaultFuture,并将该Future存放到RpcContext中,在用户线程中,如果用户想获取调用结果时,会从RpcContext中获取该Future,并调用get方法,但是如果此时该服务仍没有处理完毕,则会出现阻塞,直到结果返回或调用超时为止。发生阻塞时,该方法的后续步骤则得不到执行。对于异步来说,这显然是不合理的。理想中的异步是如果服务没有处理好,会继续执行用户线程的后续方法,不会阻塞等待。

    从Dubbo2.7开始,Dubbo的异步调用开始以CompletableFuture为基础进行实现

    DubboInvoker是一个执行体,通过它可以发起远程调用。
    在Dubbo2.6的远程调用中,部分代码如下所示(只保留了部分代码):

    DubboInvoker类
    protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            //忽略部分代码
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            //忽略部分代码
            //单向调用,无返回值
            if (isOneway) {
               boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
               currentClient.send(inv, isSent);
               RpcContext.getContext().setFuture(null);
               return new RpcResult();
            // 异步调用
            } else if (isAsync) {
               ResponseFuture future = currentClient.request(inv, timeout);
               RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
               return new RpcResult();
            // 同步调用
            } else {
               RpcContext.getContext().setFuture(null);
               return (Result) currentClient.request(inv, timeout).get();
            }     
    }
    

    在Dubbo2.6版本及之前的版本中,不管同步调用还是异步调用,都会调用HeaderExchangeClient.request方法,返回一个DefaultFuture对象,不同的点是:异步调用会将该future存放到RpcContext中,并先返回一个空的RpcResult结果。而同步调用不会将该future存放到RpcContext中,而是直接调用该future的get方法,阻塞等待调用结果。

    HeaderExchangeChannel类 
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = new DefaultFuture(channel, req, timeout); 
            channel.send(req);
            //忽略了部分代码
            return future;
    }
    
    DefaultFuture类(忽略了部分代码)
    public Object get(int timeout) throws RemotingException {
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    while (!isDone()) {
                        done.await(timeout, TimeUnit.MILLISECONDS);
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
            }
            return returnFromResponse();
    }
    
    

    当服务端处理完信息后,HeaderExchangeHandler会处理发送过来的Response,根据requestId获取对应的DefaultFuture对象,最终调用doReceived方法对结果赋值。利用AQS的条件锁机制,唤醒阻塞线程。

    DefaultFuture类
    private void doReceived(Response res) {
            lock.lock();
            try {
                response = res;
                if (done != null) {
                    done.signal();
                }
            } finally {
                lock.unlock();
            }
            if (callback != null) {
                invokeCallback(callback);
            }
    }
    

    在Dubbo2.7版本中,对异步调用进行了改良,使用了CompletableFuture

    Dubbo2.7异步调用的一个样例:

    // 此调用会立即返回null
    asyncService.sayHello("world");
    // 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
    CompletableFuture<String> helloFuture = RpcContext.getContext().getCompletableFuture();
    // 为Future添加回调
    helloFuture.whenComplete((retValue, exception) -> {
        if (exception == null) {
            System.out.println(retValue);
        } else {
            exception.printStackTrace();
        }
    });
    

    同样是DubboInvoker发起远程调用,在doInvoke方法中进行了改进:

    DubboInvoker2.7.9版本
    protected Result doInvoke(final Invocation invocation) throws Throwable {
         RpcInvocation inv = (RpcInvocation) invocation;
         final String methodName = RpcUtils.getMethodName(invocation);
         boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
         //单向调用
         if (isOneway) {
             boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
             currentClient.send(inv, isSent);
             return AsyncRpcResult.newDefaultAsyncResult(invocation);
          //同步调用和异步调用
          } else {
             ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                    CompletableFuture<AppResponse> appResponseFuture =
                            currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);           FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        } 
    }
    

    在Dubbo2.7版本中,DubboInvolnvoker对同步调用和异步调用进行了统一处理,封装成CompletableFuture,并以 AsyncRpcResult返回。

    Dubbo2.7版本下HeaderExchangeChannel.request方法与2.6版本相差不大,只是DeafultFuture对象有一点不同,即后续版本继承了 CompletableFuture类。

    对于同步调用和异步调用的处理交给AsyncToSyncInvoker类处理。

    public Result invoke(Invocation invocation) throws RpcException {
            // 调用DubboInvoker等Invoker返回的调用结果
            Result asyncResult = invoker.invoke(invocation);
            try {
                // 如果是同步调用
                if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                    // 不能使用CompletableFuture#get()方法,否则性能会出现严重下降。
                    asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
                }
            }
            //忽略了部分代码
            return asyncResult;
        }
    

    不同与Dubbo2.6版本,Dubbo2.7在处理服务端返回结果时放弃了AQS的条件锁机制,改用CompletableFuture类的complete方法去实现。

    DefaultFuture类
    private void doReceived(Response res) {
            //忽略部分代码
            if (res.getStatus() == Response.OK) {
                // 对CompletableFuture赋值结果
                this.complete(res.getResult());
            } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
                this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
            } else {
                this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
            }
          //忽略部分代码
    }
    

    对于上述的Result接口,有两个实现对象,我们在这里进行简单对比分析下。

    AsyncRpcResult

    此类表示未完成的RPC调用,它将保留此调用的一些上下文信息,例如RpcContext和Invocation,因此,当调用完成并且返回结果时,它可以确保与调用时相同地恢复所有上下文, 是在调用任何回调之前进行的。

    当Result实现CompletionStage时,AsyncRpcResult允许您轻松构建异步过滤器链,其状态将完全由基础RPC调用的状态驱动。

    AsyncRpcResult不包含任何具体值(由CompletableFuture带来的基础值除外),请将其视为状态传输节点。#getValue()#getException()都是从Result接口继承的,主要实现它们出于兼容性考虑。 因为许多旧式Filter实现很可能直接调用getValue。

    AppResponse

    Duboo3.0.0中引入了AsyncRpcResult来替换RpcResult,并且RpcResult被替换为AppResponse:AsyncRpcResult是在调用链中实际传递的对象,
    AppResponse仅代表业务结果。

    AsyncRpcResult是表示未完成的RPC调用的未来,而AppResponse是此调用的实际返回类型。
    从理论上讲,AppResponse不必实现Result接口,这主要是出于兼容性目的。

    在Dubbo服务暴露中,ProtocolFilterWrapper会构建拦截器链Filter,在调用实际的DubboInvoker之前,会先调用一些构造的Filter,比如ExecuteLimitFilter,限制每个服务中每个方法的最大并发数。下面是Dubbo2.6构建拦截器器链的逻辑:

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
           for (int i = filters.size() - 1; i >= 0; i--) {
               final Filter filter = filters.get(i);
               final Invoker<T> next = last;
               ast = new Invoker<T>() {
                     //忽略部分代码
                     @Override
                     public Result invoke(Invocation invocation) {
                            return filter.invoke(next, invocation);
                     }
                    };
                }
            }
            return last;
    }
    

    但是在Dubbo2.6版本进行异步调用中,会出现一些问题,因为Dubbo2.6在进行异步调用时,会先返回一个空的RpcResult对象,当某些Filter需要对返回的结果进行处理时,显然在该情景下无法处理结果。Dubbo2.7对这种情况进行了改进。

    Dubbo2.7构建拦截器链的逻辑如下所示:

    ProtocolFilterWrapper类
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (!filters.isEmpty()) {
                for (Filter filter : filters) {
                    last = new FilterNode<T>(invoker, last, filter);
                }
            }
            return last;
    }
    

    然后解释下在FilterNode中的invoke方法:

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
          Result asyncResult;
          asyncResult = filter.invoke(next, invocation);
          //忽略部分代码
          return asyncResult.whenCompleteWithContext((r, t) -> {
                //忽略部分代码
                } else if (filter instanceof Filter.Listener) {
                    Filter.Listener listener = (Filter.Listener) filter;
                    if (t == null) {
                        listener.onResponse(r, invoker, invocation);
                    } else {
                        listener.onError(t, invoker, invocation);
                    }
                }
            });
    }
    

    当异步调用时,以AsyncRpcResult对象传递,通过CompletableFuture#whenComplete实现异步下的逻辑处理。

    public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn){
         // 是CompletableFuture类
         this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
                beforeContext.accept(v, t);
                fn.accept(v, t);
                afterContext.accept(v, t);
          });
          return this;
    }
    

    Dubbo异步分析到这里就结束了,感谢大家的阅读。

    相关文章

      网友评论

          本文标题:Dubbo使用了CompletableFuture,实现了真异步

          本文链接:https://www.haomeiwen.com/subject/hspafltx.html