美文网首页
Future、CompletableFuture与dubbo异步

Future、CompletableFuture与dubbo异步

作者: 北交吴志炜 | 来源:发表于2019-07-28 21:31 被阅读0次

    Future

    在java 8之前,我们可以使用Callable+Future来异步执行任务和获取结果,比如

    ExecutorService service = new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));
    Future<String> f = service.submit(()->{
                        Thread.sleep(200);
                        return "helloWorld";
                    }
            );
            System.out.println(f.get(300,TimeUnit.MILLISECONDS));
    

    其获取结果,get方法实现本质是轮询校验结果状态积,阻塞实现依赖的是LockSupport.park()方法。
    那么在dubbo交给Apache进行孵化之前的版本中,比如2.6.1版本中,其异步调用机制ResponseFuture的实现就借鉴了jdk的Future的模式,以DubboInvoker#doInvoke方法为例

    if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    ResponseFuture future = currentClient.request(inv, timeout);
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    RpcContext.getContext().setFuture(null);
                    return (Result) currentClient.request(inv, timeout).get();
                }
    

    可以看到,同步与异步的本质区别就是调用get()方法的时机不同,同步调用的话,请求的同时由dubbo线程直接调用get方法阻塞,获取结果;而异步调用,dubbo直接返回RpcResult,后续由业务线程再来调用get方法获取结果。

    dubbo虽然借鉴了jdk的Future,但是代码全部是自己写的,以DefaultFuture#get()为例

    public Object get(int timeout) throws RemotingException {
            if (timeout <= 0) {
                timeout = Constants.DEFAULT_TIMEOUT;
            }
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    while (!isDone()) {
                        done.await(timeout, TimeUnit.MILLISECONDS);
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                if (!isDone()) {
                    throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
                }
            }
            return returnFromResponse();
        }
    

    可以看到,dubbo的DefaultFuture实现,主要依赖lock+condition的模式,不是jdk Future的LockSupport.park()模式。
    这种模式的缺点有很多,最大的缺点就是结果获取是阻塞的。

    CompletableFuture

    在java 8之后,jdk引入了CompletableFuture类,可以看到其实现了Future和CompletionStage,所以我们可以继续像使用Future一样使用CompletableFuture。

    public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 
    
    

    那么CompletionStage 是做什么的呢,用类文件注释的第一句话说,其代表一种异步阶段,执行一些行为或者计算,执行完毕后,会触发其他CompletionStage的执行。

    A stage of a possibly asynchronous computation, that performs an
     action or computes a value when another CompletionStage completes.
    

    相较于Future,CompletableFuture提供的很多新特性都依赖与这个CompletionStage,这里主要介绍其在dubbo异步调用中的应用,其他特性不多介绍,重点介绍下其回调机制,先看用法

    CompletableFuture<String> f = new CompletableFuture();
            try {
                f.whenComplete((v,t)->{
                    if(t!=null){
                        System.out.println("Exception");
    
                    }else{
                        System.out.println(v);
                    }
    
                });
                f.complete("HelloWorld");
    

    当CompletableFuture拿到结果的时候,会回调whenComplete方法注册的回调逻辑,其核心实现见CompletableFuture#postComplete, 用注释的话说,每一步,这个stack会pop and run。回调也是基于此实现(Doug Lea大神的作品不是简单能说明白的,后续再开一文研究)

    /**
         * Pops and tries to trigger all reachable dependents.  Call only
         * when known to be done.
         */
        final void postComplete() {
            /*
             * On each step, variable f holds current dependents to pop
             * and run.  It is extended along only one path at a time,
             * pushing others to avoid unbounded recursion.
             */
            CompletableFuture<?> f = this; Completion h;
            while ((h = f.stack) != null ||
                   (f != this && (h = (f = this).stack) != null)) {
                CompletableFuture<?> d; Completion t;
                if (f.casStack(h, t = h.next)) {
                    if (t != null) {
                        if (f != this) {
                            pushStack(h);
                            continue;
                        }
                        h.next = null;    // detach
                    }
                    f = (d = h.tryFire(NESTED)) == null ? this : d;
                }
            }
        }
    

    那么dubbo的异步调用是怎么利用这个回调机制的呢?见DubboInvoker#doInvoke (2.7.3版本)

     if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    return AsyncRpcResult.newDefaultAsyncResult(invocation);
                } else {
                    AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                    CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                    asyncRpcResult.subscribeTo(responseFuture);
                    // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                    FutureContext.getContext().setCompatibleFuture(responseFuture);
                    return asyncRpcResult;
                }
    

    之前2.6.1版本中,同步异步的区别是谁来调get()方法,那么在2.7.3版本,DubboInvoker对同步异步调用的处理直接统一了,都会返回一个AsyncRpcResult, 这个AsyncRpcResult本身就继承自CompletableFuture,同时其会subscribe一个响应的CompletableFuture,这里就有了两个CompletableFuture;那么subscribe做了什么呢?

    public void subscribeTo(CompletableFuture<?> future) {
            future.whenComplete((obj, t) -> {
                if (t != null) {
                    this.completeExceptionally(t);
                } else {
                    this.complete((Result) obj);
                }
            });
        }
    

    subscribe会对响应CompletableFuture注册了一个回调,响应完成时,触发这个回调;这个回调逻辑就是执行AsyncRpcResult自身的complete方法,那么如果AsyncRpcResult也有注册回调,此时就会被链式触发。
    新版本的dubbo既然在DubboInvoker这里对于同步异步的处理是一样的,都是直接返回一个AsyncRpcResult,那么对于我们使用者来说,怎么来区别同步和异步呢?其实关键就在于怎么用这个AsyncRpcResult。如果我们拿到AsyncRpcResult直接get,可以认为这就是同步调用,如果我们拿到AsyncRpcResult,不去调用get,而是去注册一个回调函数,等待链式触发,用回调的方式拿结果,那么这就是异步。

    总结:老版本dubbo的异步调用可以认为是假异步,因为结果的获取是阻塞的,新版本随着jdk引入CompletableFuture,由于回调机制的存在,我们业务代码使用dubbo时候,也可以注册回调,实现真正的异步非阻塞。

    相关文章

      网友评论

          本文标题:Future、CompletableFuture与dubbo异步

          本文链接:https://www.haomeiwen.com/subject/krcmrctx.html