深度学习Java Future (一)

作者: 一字马胡 | 来源:发表于2017-12-07 23:47 被阅读1559次

    作者: 一字马胡
    转载标志 【2017-12-07】

    更新日志

    日期 更新内容 备注
    2017-12-07 学习Future的总结 关于Future的深入学习内容
    2017-12-14 深度学习Java Future (二) 补充相关内容

    Future

    
     * A {@code Future} represents the result of an asynchronous
     * computation.  Methods are provided to check if the computation is
     * complete, to wait for its completion, and to retrieve the result of
     * the computation.  The result can only be retrieved using method
     * {@code get} when the computation has completed, blocking if
     * necessary until it is ready.  Cancellation is performed by the
     * {@code cancel} method.  Additional methods are provided to
     * determine if the task completed normally or was cancelled. Once a
     * computation has completed, the computation cannot be cancelled.
     * If you would like to use a {@code Future} for the sake
     * of cancellability but not provide a usable result, you can
     * declare types of the form {@code Future<?>} and
     * return {@code null} as a result of the underlying task.
    
    

    上面这段文字已经说明了Future的本质,一个Future代表一个异步计算的结果,并且它提供一些方法来让调用者检测异步过程是否完成,或者取得异步计算的结果,或者取消正在执行的异步任务。本文将分析总结Future的一些实现细节,希望可以弄明白Future的原理。

    为了有一定的目标性,本文将选取Future的一个基本实现FutureTask来进行分析总结,其他更为复杂丰富的Future实现日后再进行分析总结。下面的图片展示了FutureTask的类关系图,下文会对FutureTask进行详细的分析:

    从类图可以看出,FutureTask实现了Runnable接口和Future接口,下面的图片展示了FutureTask提供的一些接口,下文中将对其中的一些接口做详细分析。

    FutureTask

    首先来看一下FutureTask的一些关键字段,第一个需要注意的是state字段,看下面的代码:

    
        /*
         * Possible state transitions:
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INTERRUPTED
         */
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    
    

    state代表当前任务的状态,NEW代表当前以及获取到任务,准备开始执行任务。COMPLETING状态代表正在进行任务处理中,NORMAL表示任务执行结束,并且任务处理结果正常,没有异常出现,EXCEPTIONAL则表示执行任务的过程中出现了异常,CANCELLED表示任务被取消了,INTERRUPTING表示任务在执行过程中被中断,是一个中间状态,INTERRUPTED表示中断结束。这些状态的可能转换关系在上面的注释中可以看到,可以发现总共只有四种状态转移路径,在下文的某些方法分析中还会提到state。

    第二个需要关注的字段是callable字段,这就是实际需要执行的任务,而结果将被设置到outcome字段中去,runner字段代表了运行任务的线程。waiters字段则代表阻塞在该Future上的线程链表,可以看一下waiters的数据结构:

    
        /**
         * Simple linked list nodes to record waiting threads in a Treiber
         * stack.  See other classes such as Phaser and SynchronousQueue
         * for more detailed explanation.
         */
        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }
    
    

    可以看到是一个非常简单的单链表数据结构。下面来看一下FutureTask的构造函数,FutureTask有两个构造函数,分别如下:

    
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
    
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    

    可以看到,第一个构造函数传递了一个Callable类型的参数,构造函数将该Callable参数初始化给类的callback字段,并且初始化state为NEW。第二个构造函数传递了两个参数,一个是Runnable类型的runnable,代表需要执行的任务,以及任务的返回结果,其实还是使用这两个参数来构造出一个callback,并且执行和第一个构造函数一样的逻辑,下面是如何使用Runnable和result来构造出一个callback的剩下细节:

    
        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    

    下面来看一下Future的一个核心方法get的实现细节,下面是get方法的具体代码:

    
        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    
    

    首先获取到任务的当前状态,如果状态小于等于COMPLETING,那么根据最开始的定义,可以知道目前的状态只可能是NEW或者COMPLETING,也就是任务还没有开始执行,或者还在继续执行没有结束,那么就调用awaitDone方法来进行等待任务执行完成,否则,也就是说,当任务的当前状态大于COMPLETING的时候,那么当前状态可能为:

    • NORMAL 正常结束
    • EXCEPTIONAL 异常结束
    • CANCELLED 被取消
    • INTERRUPTING 正在中断
    • INTERRUPTED 中断结束

    先来看第一条分支,也就是调用awaitDone的具体流程。

    
       private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    
    

    这个方法还是很复杂的,下面根据分支来分析一下这个方法都在做什么事情:

    • 如果当前线程被中断,那么就将所有等待在该Future上的线程都从阻塞链表移除
    • 如果发现任务的状态变为某种final态的时候,也就是state大于COMPLETING的时候,说明任务执行已经结束了(无论是怎么结束的都不再运行中),那么返回任务的状态,并且清除等待在该Future上的线程
    • 如果发现任务的当前状态为COMPLETING,那么说明任务正在执行过程中,需要等待一下
    • 否则,就需要等待了,如果配置了不等待,那么不会将当前线程添加到等待链表中,否则将当前线程添加到等待链表中去
    • 如果配置了等待时间,那么就需要判断是否超时了,如果超时了,那么就阻塞等待,如果没有设置超时时间,那么就会一直阻塞等待下去直到任务处理完成(不管怎么完成)

    上面分析完了当状态小于等于COMPLETING的时候的处理流程,下面来看一下当任务的状态大于COMPLETING的时候的处理流程:

    
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    
    

    需要判断任务的当前状态,其实现在可以知道目前的状态可能为什么,任务肯定已经不再运行了,要么正常结束,要么异常结束,要么被中断,根据不同的情况进行不同的处理,比如当发现状态为NORMAL的时候,就判断为任务正常结束,处理结果应该保存在outcome中,所以返回outcom就可以了,当发现任务时被取消的时候,get操作会抛出异常,其他情况也会抛出异常来告知调用者发生的情况。Future除了提供不带参数的get方法外,还提供了一个带参数的get方法,也就是可以设置等待时间,超时会抛出异常,下面是带参数的get方法的细节:

    
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    
    

    该方法的实现细节与不带参数的get方法一样,只是增加了超时机制,等待时间超过了设定的时间之后就会抛出TimeoutException异常。该方法和不带参数的get方法是共用一个awaitDone方法来实现任务结果的等待获取的,所以就不再往下赘述了。

    下面再来看一个Future的比较重要的方法cancel,也就是取消任务的执行,它的具体实现细节看下面的代码:

    
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW &&
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();
            }
            return true;
        }
    
    

    首先判断任务的当前状态,如果不为NEW或者试图将任务的状态设置为NEW失败之后,就会返回false告诉用户cancel失败了,否则,就调用负责执行任务的线程的interrupt方法来结束任务的运行,并且会更新任务的状态。在finally中会调用一个方法finishCompletion,下面是这个方法的具体实现细节:

    
    
        private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }
    
    

    这个方法做的事情就是告诉所有等待在该Future上的线程,让他们别等了,任务已经被cancel了,再等下去也不会有结果了。

    文章开头给出了FutureTask的类关系图,并且知道了FutureTask继承了Runnable,我们在创建了一个FutureTask之后,会使用线程池来执行这个FutureTask,最后会执行FutureTask的run方法,所以最为重要的就是FutureTask的run方法,下面开始分析FutureTask的run方法的具体实现细节:

    
       public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    
    

    首选,如果任务的当前状态不是NEW,或者试图将任务的状态变为NEW失败的时候,或者试图设置runner字段为当前的线程的时候遇到失败,也就是获取到了执行任务的具体Thread,但是设置字段失败,run方法都将直接返回,这也就说明了为什么说Future是不可逆的,只能执行一次。接着,run方法获取到了具体的任务,并且再次判断该任务的状态是否为NEW,以及判断任务是否为null,如果这些判断都通过的话,那么就可以执行任务了,具体的执行就是调用Callable的call方法来获取结果,如果在执行过程中抛出异常,那么就需要调用setException来设置具体的异常,否则调用set方法来设置任务的执行结果,下面先来看setException的具体细节:

    
        protected void setException(Throwable t) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = t;
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }
    
    

    这个方法会设置任务的状态为EXCEPTIONAL,并且调用finishCompletion来做一些任务的收尾工作。下面来看一下正常结束时候的set细节:

    
        protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    
    

    和setException一样,只是将任务的状态设置为了NORMAL而不是EXCEPTIONAL,这样调用线程在进行get方法调用的时候就可以获取到正常的结果了。

    到此,本文分析了Future的基本实现,并且基于FutureTask来进行具体的分析,思路更加清晰,再次需要说明的是,一个Future代表一个异步计算的结果,我们可以取消任务,可以等待任务,并且可以设置一个超时时间以控制等待时间,当然,本文的目的是初步理解Future的原理,为了深刻理解Future的原理,需要做更为复杂丰富的分析总结,下一步可以借助CompletableFuture来深入学习Future,关于CompletableFuture的一些基础知识,可以参考文章Java CompletableFuture,对于CompletableFuture的更为深入的学习总结将在未来适宜的时候进行。

    相关文章

      网友评论

      • 哼哼菜:if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
        }
        当前线程中断后为什么要把所有等待在该Future上的线程都从阻塞链表移除?
      • a072fa038320:2)另外文中 提到 Future是不可逆的? 这个不可逆是啥意思? 意思是 /** The thread running the callable; CASed during run() */
        private volatile Thread runner;
        尝试执行一次这个CAS,来将当前的ThreadPool中对应的Worker的thread设置为当前的FutureTask的 runner失败,这次就直接放弃执行了,是这个意思吗?? 如果是这样的话,如果竞争,FutureTask的执行岂不是没有任何保证?
        一字马胡:文章中提到的不可逆是针对Future的state来说的,Future的state只能走文章中列出的几个路径,并且只会向前发展,这和线程的状态转换是一样的,只是线程的状态会循环发展,但是Future的状态不会循环转化,但是本质上,都可以说状态都是不可逆的,当然这只是我为了学习Future的一种辅助思考方式,因人而异~
      • a072fa038320:楼主 两个疑问: 1) 所有调用cancell的方法,如果符合条件,会调用thread.interrupt()方法中断线程,但是我们知道,如果要响应interrupt必须对线程的中断状态进行注定判断,比如whiel(thread.isInter()){break;}类似的。 FutureTask中对调用了interrupt方法的调用没有看到这样类似的处理? 我觉得这点没解释清除。
        一字马胡:跟着自己的思路,然后抓住重点,你的这个疑问很好,你觉得应该是这样做的,找一下代码里面到底是怎么实现的,为什么不使用你思考的这个方案等等问题,当你去分析源码的时候,如果分析过程中源码中的实现和你自己的思考方案一致的时候,你就厉害了~文章确实没有描述到这个细节,但是是可以看出来的,好好找找就能看出来~
      • a072fa038320:接着 顶 文章挨个看下来。
      • i_3e70:用的myeclipse吗?
        一字马胡:@i_3e70 IDEA
      • AWeiLoveAndroid:楼主试试rxjava
        一字马胡:@阿韦爱Android 正有此意,估计也就这一阵会陆续发出
      • Antz_H碎碎念:我总感觉缺了点什么,future的实际应用啊,事件驱动的实现机理啊
        Antz_H碎碎念:@一字马胡 嘿嘿
        一字马胡:@我是小蚂蚁 嗯嗯,这才是第一篇,以后会继续更新这个内容的,不断学习,不断总结

      本文标题:深度学习Java Future (一)

      本文链接:https://www.haomeiwen.com/subject/tgdsixtx.html