美文网首页Java技术程序员码农的世界
深度学习Java Future (二)

深度学习Java Future (二)

作者: 一字马胡 | 来源:发表于2017-12-08 18:32 被阅读263次

    作者: 一字马胡
    转载标志 【2017-12-08】

    更新日志

    日期 更新内容 备注
    2017-12-08 学习Future的总结 关于Future的深入学习内容

    导入

    深度学习Java Future 系列:

    第一篇文章基于FutureTask的Future基本实现来分析了Java Future的基本原理,FutureTask只是Future接口的一个基本实现,并且是作为一个Task对象存在的,FutureTask本身并不管理执行线程池相关的内容,我们生成一个FutureTask对象的动机是我们希望将我们的task包装成一个FutureTask对象,使得我们可以借助FutureTask的特性来控制我们的任务。虽然FutureTask较为简单,但是可以从FutureTask的具体实现中学习一些Future的知识,至少对于Future的定位应该是更进一步的,在进行接下来的内容之前,需要再次重申的是,Future是一个可以代表异步计算结果的对象,并且Future提供了一些方法来让调用者控制任务,比如可以取消任务的执行(当然可能取消会失败),或者设置超时时间来取得我们的任务的运行结果。本文是深度学习Java Future 系列的第二篇文章,和第一篇文章借助FutureTask的具体实现来学习一样,本文也将借助一个具体的Future实现来分析总结,因为CompletableFuture在平时的开发中使用的频率较高,所以本文将选择使用CompletableFuture的具体实现来继续分析Future,试图通过分析CompletableFuture的某些方法的实现来学习关于Future更为深层次的知识。

    下面的图片展示了CompletableFuture的类图关系:

    可以看到,CompletableFuture同时实现了两个接口,分别为Future和CompletionStage,CompletionStage是CompletableFuture提供的一些非常丰富的接口,可以借助这些接口来实现非常复杂的异步计算工作,基于本文的主题是Future,所以本文不会过多的分析关于CompletionStage的内容,如果想要了解CompletableFuture中关于CompletionStage的一些细节内容,可以参考文章Java CompletableFuture,该文章详细完整的描述了CompletableFuture中关于CompletionStage接口的实现情况。

    CompletableFuture

    首先来分析一下CompletableFuture的get方法的实现细节,CompletableFuture实现了Future的所有接口,包括两个get方法,一个是不带参数的get方法,一个是可以设置等待时间的get方法,首先来看一下CompletableFuture中不带参数的get方法的具体实现:

    
        public T get() throws InterruptedException, ExecutionException {
            Object r;
            return reportGet((r = result) == null ? waitingGet(true) : r);
        }
    
    
    

    result字段代表任务的执行结果,所以首先判断是否为null,为null则表示任务还没有执行结束,那么就会调用waitingGet方法来等待任务执行完成,如果result不为null,那么说明任务已经成功执行结束了,那么就调用reportGet来返回结果,下面先来看一下waitingGet方法的具体实现细节:

    
       /**
         * Returns raw result after waiting, or null if interruptible and
         * interrupted.
         */
        private Object waitingGet(boolean interruptible) {
            Signaller q = null;
            boolean queued = false;
            int spins = -1;
            Object r;
            while ((r = result) == null) {
                if (spins < 0)
                    spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                        1 << 8 : 0; // Use brief spin-wait on multiprocessors
                else if (spins > 0) {
                    if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                        --spins;
                }
                else if (q == null)
                    q = new Signaller(interruptible, 0L, 0L);
                else if (!queued)
                    queued = tryPushStack(q);
                else if (interruptible && q.interruptControl < 0) {
                    q.thread = null;
                    cleanStack();
                    return null;
                }
                else if (q.thread != null && result == null) {
                    try {
                        ForkJoinPool.managedBlock(q);
                    } catch (InterruptedException ie) {
                        q.interruptControl = -1;
                    }
                }
            }
            if (q != null) {
                q.thread = null;
                if (q.interruptControl < 0) {
                    if (interruptible)
                        r = null; // report interruption
                    else
                        Thread.currentThread().interrupt();
                }
            }
            postComplete();
            return r;
        }
    
    

    这个方法的实现时比较复杂的,方法中有几个地方需要特别注意,下面先来看一下spins是做什么的,根据注释,可以知道spins是用来在多核心环境下的自旋操作的,所谓自旋就是不断循环等待判断,从代码可以看出在多核心环境下,spins会被初始化为1 << 8,然后在自旋的过程中如果发现spins大于0,那么就通过一个关键方法ThreadLocalRandom.nextSecondarySeed()来进行spins的更新操作,如果ThreadLocalRandom.nextSecondarySeed()返回的结果大于0,那么spins就减1,否则不更新spins。ThreadLocalRandom.nextSecondarySeed()方法其实是一个类似于并发环境下的random,是线程安全的。

    接下来还需要注意的一个点是Signaller,从Signaller的实现上可以发现,Signaller实现了ForkJoinPool.ManagedBlocker,下面是ForkJoinPool.ManagedBlocker的接口定义:

    
        public static interface ManagedBlocker {
            /**
             * Possibly blocks the current thread, for example waiting for
             * a lock or condition.
             *
             * @return {@code true} if no additional blocking is necessary
             * (i.e., if isReleasable would return true)
             * @throws InterruptedException if interrupted while waiting
             * (the method is not required to do so, but is allowed to)
             */
            boolean block() throws InterruptedException;
    
            /**
             * Returns {@code true} if blocking is unnecessary.
             * @return {@code true} if blocking is unnecessary
             */
            boolean isReleasable();
        }
    
    

    ForkJoinPool.ManagedBlocker的目的是为了保证ForkJoinPool的并行性,具体分析还需要更为深入的学习Fork/Join框架。继续回到waitingGet方法中,在自旋过程中会调用ForkJoinPool.managedBlock(ForkJoinPool.ManagedBlocker)来进行阻塞工作,实际的效果就是让线程等任务执行完成,CompletableFuture中与Fork/Join的交叉部分内容不再本文的描述范围,日后再进行分析总结。总得看起来,waitingGet实现的功能就是等待任务执行完成,执行完成返回结果并做一些收尾工作。

    现在来看reportGet方法的实现细节,在判断任务执行完成之后,get方法会调用reportGet方法来获取结果:

    
        /**
         * Reports result using Future.get conventions.
         */
        private static <T> T reportGet(Object r)
            throws InterruptedException, ExecutionException {
            if (r == null) // by convention below, null means interrupted
                throw new InterruptedException();
            if (r instanceof AltResult) {
                Throwable x, cause;
                if ((x = ((AltResult)r).ex) == null)
                    return null;
                if (x instanceof CancellationException)
                    throw (CancellationException)x;
                if ((x instanceof CompletionException) &&
                    (cause = x.getCause()) != null)
                    x = cause;
                throw new ExecutionException(x);
            }
            @SuppressWarnings("unchecked") T t = (T) r;
            return t;
        }
    
    

    如果result为null,说明任务时被中断的,抛出中断异常,如果result类型为AltResult,代表执行过程中出现异常了,那么就抛出相应的异常,否则,返回result。

    分析完了不带参数的get方法(阻塞等待)之后,现在来分析一下带超时参数的get方法的具体实现细节:

    
        public T get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            Object r;
            long nanos = unit.toNanos(timeout);
            return reportGet((r = result) == null ? timedGet(nanos) : r);
        }
    
    

    和不带参数的get方法一样,还是会判断任务是否已经执行完成了,如果完成了会调用reportGet方法来返回最终的执行结果(或者抛出异常),否则,会调用timedGet来进行超时等待,timedGet会等待一段时间,然后抛出超时异常(或者执行结束返回正常结果),下面是timedGet方法的具体细节:

    
        private Object timedGet(long nanos) throws TimeoutException {
            if (Thread.interrupted())
                return null;
            if (nanos <= 0L)
                throw new TimeoutException();
            long d = System.nanoTime() + nanos;
            Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
            boolean queued = false;
            Object r;
            // We intentionally don't spin here (as waitingGet does) because
            // the call to nanoTime() above acts much like a spin.
            while ((r = result) == null) {
                if (!queued)
                    queued = tryPushStack(q);
                else if (q.interruptControl < 0 || q.nanos <= 0L) {
                    q.thread = null;
                    cleanStack();
                    if (q.interruptControl < 0)
                        return null;
                    throw new TimeoutException();
                }
                else if (q.thread != null && result == null) {
                    try {
                        ForkJoinPool.managedBlock(q);
                    } catch (InterruptedException ie) {
                        q.interruptControl = -1;
                    }
                }
            }
            if (q.interruptControl < 0)
                r = null;
            q.thread = null;
            postComplete();
            return r;
        }
    
    
    

    在timedGet中不再使用spins来进行自旋,因为现在可以确定需要等待多少时间了。timedGet的逻辑和waitingGet的逻辑类似,毕竟都是在等待任务的执行结果。

    除了两个get方法之前,CompletableFuture还提供了一个方法getNow,代表需要立刻返回不进行阻塞等待,下面是getNow的实现细节:

    
        public T getNow(T valueIfAbsent) {
            Object r;
            return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
        }
    
    

    getNow很简单,判断result是否为null,如果不为null则直接返回,否则返回参数中传递的默认值。

    分析完了get部分的内容,下面开始分析CompletableFuture最为重要的一个部分,就是如何开始一个任务的执行。下文中将分析supplyAsync的具体执行流程,supplyAsync有两个版本,一个是不带Executor的,还有一个是指定Executor的,下面首先分析一下不指定Executor的supplyAsync版本的具体实现流程:

    
        public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
            return asyncSupplyStage(asyncPool, supplier);
        }
    
    
        static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                         Supplier<U> f) {
            if (f == null) throw new NullPointerException();
            CompletableFuture<U> d = new CompletableFuture<U>();
            e.execute(new AsyncSupply<U>(d, f));
            return d;
        }
    

    可以看到supplyAsync会调用asyncSupplyStage,并且指定一个默认的asyncPool来执行任务,CompletableFuture是管理执行任务的线程池的,这一点是和FutureTask的区别,FutureTask只是一个可以被执行的task,而CompletableFuture本身就管理者线程池,可以由CompletableFuture本身来管理任务的执行。这个默认的线程池是什么?

    
        private static final boolean useCommonPool =
            (ForkJoinPool.getCommonPoolParallelism() > 1);
    
        /**
         * Default executor -- ForkJoinPool.commonPool() unless it cannot
         * support parallelism.
         */
        private static final Executor asyncPool = useCommonPool ?
            ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    
    
    

    首先会做一个判断,如果条件满足就使用ForkJoinPool的commonPool作为默认的Executor,否则会使用一个ThreadPerTaskExecutor来作为CompletableFuture来做默认的Executor。

    接着看asyncSupplyStage,我们提交的任务会被包装成一个AsyncSupply对象,然后交给CompletableFuture发现的Executor来执行,那AsyncSupply是什么呢?

    
       static final class AsyncSupply<T> extends ForkJoinTask<Void>
                implements Runnable, AsynchronousCompletionTask {
            CompletableFuture<T> dep; Supplier<T> fn;
            AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
                this.dep = dep; this.fn = fn;
            }
    
            public final Void getRawResult() { return null; }
            public final void setRawResult(Void v) {}
            public final boolean exec() { run(); return true; }
    
            public void run() {
                CompletableFuture<T> d; Supplier<T> f;
                if ((d = dep) != null && (f = fn) != null) {
                    dep = null; fn = null;
                    if (d.result == null) {
                        try {
                            d.completeValue(f.get());
                        } catch (Throwable ex) {
                            d.completeThrowable(ex);
                        }
                    }
                    d.postComplete();
                }
            }
        }
    
    

    观察到AsyncSupply实现了Runnable,而Executor会执行Runnable的run方法来获得结构,所以主要看AsyncSupply的run方法的具体细节,可以看到,run方法中会试图去获取任务的结果,如果不抛出异常,那么会调用CompletableFuture的completeValue方法,否则会调用CompletableFuture的completeThrowable方法,最后会调用CompletableFuture的postComplete方法来做一些收尾工作,主要来看前两个方法的细节,首先是completeValue方法:

    
        /** Completes with a non-exceptional result, unless already completed. */
        final boolean completeValue(T t) {
            return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                               (t == null) ? NIL : t);
        }
    
    

    completeValue方法会调用UNSAFE.compareAndSwapObject来讲任务的结果设置到CompletableFuture的result字段中去。如果在执行任务的时候抛出异常,会调用completeThrowable方法,下面是completeThrowable方法的细节:

    
        /** Completes with an exceptional result, unless already completed. */
        final boolean completeThrowable(Throwable x) {
            return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                               encodeThrowable(x));
        }
    
    

    指定Executor的supplyAsync方法和没有指定Executor参数的supplyAsync方法的唯一区别就是执行任务的Executor,所以不再赘述。

    到这里,可以知道Executor实际执行的代码到底是什么了,回到asyncSupplyStage方法,接着就会执行Executor.execute方法来执行任务,需要注意的是,asyncSupplyStage方法返回的是一个CompletableFuture,并且立刻返回的,具体的任务处理逻辑是有Executor来执行的,当任务处理完成的时候,Executor中负责处理的线程会将任务的执行结果设置到CompletableFuture的result字段中去。

    本文的内容到此也就结束了,上文中提到,CompletableFuture提供了大量实用的方法来支持我们的异步任务,具体提供的方法可以参考上文中提供的链接,或者直接参考jdk源码、javadoc来获取更为详细的内容,本文的目的是解析CompletableFuture的任务处理流程,并且试图分析Future在CompletableFuture中的使用,以更深入的理解Future,结合第一篇深度学习Java Future系列的文章,希望可以更加深入的理解Future,并且知道Future在java并发编程、异步计算中的重要作用。

    相关文章

      网友评论

        本文标题:深度学习Java Future (二)

        本文链接:https://www.haomeiwen.com/subject/wwcyixtx.html