美文网首页
FutureTask源码解析

FutureTask源码解析

作者: anyoptional | 来源:发表于2021-04-18 01:10 被阅读0次

    概述

    future-task.png

       FutureTask实现了RunnableFuture接口,它既可以作为Runnable被提交给Executor去执行,又可以作为Future获取异步任务的执行结果,或者取消异步任务。一句话定义,FuntureTask代表的是一个可取消的异步任务。

    FutureTask的状态机模型

      JDK1.8中,FutureTask的实现采用了状态机模型,并且不同状态的数值是经过精心设计的。FutureTask内部维护了一个状态变量,用来表示任务执行的各个阶段。

        // 使用volatile禁用线程、cpu缓存
        // 以便其它线程能立即观测到状态的改变
        private volatile int state;
    
        // 初始状态
        private static final int NEW          = 0;
        // 等待完成中
        private static final int COMPLETING   = 1;
        // 正常完成
        private static final int NORMAL       = 2;
        // 异常结束
        private static final int EXCEPTIONAL  = 3;
        // 已取消
        private static final int CANCELLED    = 4;
        // 响应打断中
        private static final int INTERRUPTING = 5;
        // 已打断
        private static final int INTERRUPTED  = 6;
    

    其中,COMPLETINGINTERRUPTING是中间状态,NORMALEXCEPTIONALCANCELLEDINTERRUPTED是终态,一旦到达终态后续就不能再发生状态转换了。同时状态的转换并不是任意的,比如不能从COMPLETING -> INTERRUPTING,可能的状态转换是以下几种情形之一:

        /**
         * Possible state transitions:
         *
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INTERRUPTED
         */
    

    FutureTask的实例变量

        /** 实际被执行的Callable,因为Runnable不能返回结果 */
        private Callable<V> callable;
        /** 任务的执行结果,或者执行过程中出现的异常 */
        private Object outcome;
        /** 执行Callable的线程 */
        private volatile Thread runner;
        /** 
         * Treiber stack构成的等待线程栈,
         * 因为可能会有多个线程调用Future#get()
         * 来获取任务执行结果
         */
        private volatile WaitNode waiters;
    
            /** 单链表实现的Treiber stack */
        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }
    

      Treiber stack这个词在整个java.util.concurrent包中出现的次数特别多,那么它到底是怎样的一个数据结构呢?简单来说,Treiber stack是利用细粒度并发原语(CAS)实现的一种无锁结构。Treiber stack内部维护链表头结点,在pushpop时通过CAS来保证线程安全,下面是一个简单的Treiber stack实现。

    public class TreiberStack<E> {
    
        /**
         * 使用AtomicReference维护栈顶元素,用于后续CAS操作
         */
        private final AtomicReference<Node<E>> top = new AtomicReference<>();
    
        /**
         * 入栈时首先检查栈顶元素,只有在确定其它线程
         * 没有修改时才能入栈成功,否则进入重试
         */
        public void push(E e) {
            Node<E> old;
            Node<E> cur = new Node<>(e);
            do {
                old = top.get();
                cur.next = old;
            } while (!top.compareAndSet(old, cur));
        }
    
        /**
         * 出栈时也需要检查栈顶元素,其它线程没有修改时
         * 才能出栈,否则进入重试
         */
        public E pop() {
            Node<E> old;
            Node<E> cur;
            do {
                old = top.get();
                if (old == null) {
                    return null;
                }
                cur = old.next;
            } while (!top.compareAndSet(old, cur));
            return old.value;
        }
    
        private static class Node<E> {
            final E value;
            Node<E> next;
            Node(E value) {
                this.value = value;
            }
        }
    }
    

    构造函数

        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
    
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    

      FutureTask实际上是通过将Runnable包装成Callable来获取异步任务的执行结果,而显示初始化state则是利用了Happens-Before中的程序次序规则和volatile变量规则来保证Callable的线程可见性:

    • 程序次序规则:在一个线程内,按照控制流顺序,书写在前面的操作Happens-Before于书写在后面的操作。
    • volatile变量规则:对一个volatile变量的写操作Happens-Before于后面对这个变量的读操作,这里的"后面"指时间上的先后顺序。

    核心方法

    run
    /**
     * run方法保证任务只会被执行一次
     */
    public void run() {
        // 1. 任务必须处于初始状态NEW
        // 2. 当前没有分配线程来执行底层的callable
        // 只有同时满足这两个条件,才能启动任务
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            // callable会在任务执行完成之后置空
            // state可能在#runAndReset()中恢复到初始状态
            // 这一步检查也是为了确保确实有必要继续执行
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    // 正常执行完成
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    // 执行过程中遇到异常
                    ran = false;
                    // 转换状态到EXCEPTIONAL
                    setException(ex);
                }
                // 如果正常执行完成
                if (ran)
                    // 转换状态到NORMAL
                    set(result);
            }
        } finally {
            // 整个任务执行期间,runner都是不为null的
            // 保证了run()方法不会被多个线程同时执行
            runner = null;
            // run()方法结束之后,state必须到达终态
            // 如果在执行过程中遭遇打断,此时状态可能
            // 还是INTERRUPTING,没有到达INTERRUPTED
            int s = state;
            if (s >= INTERRUPTING)
                // 自旋等待state到达终态INTERRUPTED
                handlePossibleCancellationInterrupt(s);
        }
    }
    
       /**
        * #run()正常结束
        */
         protected void set(V v) {
            // CAS切换到中间状态COMPLETING
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                // 设置返回值
                outcome = v;
                // 设置为正常结束
                // 到达终态后不能再继续转换,因此可以使用lazySet
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                // 唤醒执行期间因#get()阻塞的线程
                finishCompletion();
            }
        }
    
        /**
         * #run()异常结束
         */
        protected void setException(Throwable t) {
            // CAS切换到中间状态COMPLETING
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                // 设置异常信息
                outcome = t;
                // 设置为异常结束
                // 到达终态后不能再继续转换,因此可以使用lazySet
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                // 唤醒执行期间因#get()阻塞的线程
                finishCompletion();
            }
        }
    
        /**
         * 公共的完成逻辑:清除并唤醒执行期间因#get()阻塞的线程
         */
            private void finishCompletion() {
            // assert state > COMPLETING;
            // 获取链表头节点
            for (WaitNode q; (q = waiters) != null;) {
                // 清除表头,除了局部变量q再没有其它引用指向它了
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    // 循环处理每一个链表节点
                    for (;;) {
                        // 唤醒因#get()阻塞的线程
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        // 检查到达尾节点
                        WaitNode next = q.next;
                        if (next == null)
                            // 是则表示已完成
                            break;
                        // 断掉引用
                        // 这样GC时可以及时发现并清理
                        q.next = null; // unlink to help gc
                        // 转到下一个节点
                        q = next;
                    }
                    break;
                }
            }
            
                    // 调用钩子方法
            done();
            // callable在结束时置空
            callable = null;        // to reduce footprint
            // 方法退出时,waiters实例变量为null,局部变量q超出作用域
            // 这样就完成了对整个Treiber stack的清理
        }
    
        /**
         * 确保在#run()退出时state到达终态
         */
        private void handlePossibleCancellationInterrupt(int s) {
                // 如果确实处于中间状态INTERRUPTING
            if (s == INTERRUPTING)
                // 那么就自旋等待
                while (state == INTERRUPTING)
                    // 告诉调度器出让cpu时间
                    Thread.yield(); // wait out pending interrupt
                    // 按照约定,INTERRUPTING之后必然是INTERRUPTED
            // assert state == INTERRUPTED;
        }
    
    cancel
        /**
         * 取消任务
         * @param mayInterruptIfRunning 如果已经开始执行,是否需要打断执行中的任务
         */
            public boolean cancel(boolean mayInterruptIfRunning) {
            // 只有未达到终态的任务才能取消
            if (!(state == NEW &&
                  // 如果已经开始执行,根据参数决定是否打断任务的执行
                  // 当然,这里也是使用CAS控制并发
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                // 如果需要打断的话
                if (mayInterruptIfRunning) {
                    try {
                        // 就打断它
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        // 打断完成进入终态
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                // 同样,取消成功以后需要进行清理
                finishCompletion();
            }
            return true;
        }
    
    get
        /**
         * 获取任务执行结果
         */
            public V get() throws InterruptedException, ExecutionException {
            int s = state;
            // 未开始或执行中的任务才有可能获取到结果
            if (s <= COMPLETING)
                // 阻塞当前线程,直到终态
                s = awaitDone(false, 0L);
            // 根据状态获取结果
            return report(s);
        }
    
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            // 未开始或执行中的任务才有可能获取到结果
            if (s <= COMPLETING &&
                // 等待结束后,如果仍未到达终态,则为超时
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            // 根据状态获取结果
            return report(s);
        }
    
        /**
         * 等待任务产生执行结果(阻塞式),或者等待过程中被打断的话,抛出异常
         * @param timed 是否是带有时间限制的等待
         * @param nanos 最长等待时间
         */
            private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            // 计算等待的截止时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            // 根据LockSupport.parkXXX系列方法的文档
            // park系列方法会因为
            //  1. 其它线程调用unpark()
            //  2. 到达parkXXX指定时间
            //  3. 线程被打断
            //  4. 无理由
            // 以上4种情况返回,因此park系列方法需要保证在循环中,在方法返回时再次进行条件检测
            for (;;) {
                    // 检测是否因为线程打断返回
                if (Thread.interrupted()) {
                    // 是的话进行节点清理
                    removeWaiter(q);
                    // 抛出异常
                    throw new InterruptedException();
                }
                            // 读取当前状态
                int s = state;
                // 状态机的值是经过精心设计的
                // COMPLETING之后的状态要么是终态
                // 要么是打断,而打断已经处理过了
                // 因此大于COMPLETING也就是到达了终态
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                // 处于COMPLETING状态,说明任务很快就会结束了
                // 这里的处理是不进行阻塞等待,而是调用Thread.yield()
                // 出让cpu时间,等待任务执行完成,从而由上一步s > COMPLETING处理
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                // 来到这里,说明s == NEW,这是真正需要进行阻塞等待的状态
                // 如果还没有节点,新建之
                else if (q == null)
                    q = new WaitNode();
                // 还没有入栈的话,入栈之
                // 这两步就是一个普通的Treiber stack入栈操作了
                // 经过这两步之后,调用#get()的线程就成功加入等待队列了
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    // 计算需要等待的时间
                    nanos = deadline - System.nanoTime();
                    // 小于0表示等待已经结束了
                    if (nanos <= 0L) {
                        // 清理节点
                        // 对应Treiber stack出栈操作
                        removeWaiter(q);
                        return state;
                    }
                    // 带有时间限制的等待
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    // 无限期等待
                    LockSupport.park(this);
            }
        }
    
            /**
             * 报告任务的执行结果
             */
            private V report(int s) throws ExecutionException {
            Object x = outcome;
            // NORMAL表示正常执行完成
            if (s == NORMAL)
                // outcome携带的是callable的返回值
                return (V)x;
            // 已取消的话,抛出异常
            if (s >= CANCELLED)
                throw new CancellationException();
            // 剩下的就是执行异常的情况了
            throw new ExecutionException((Throwable)x);
        }
    
    isCancelled/isDone
        public boolean isCancelled() {
            // 比CANCELLED大的只有INTERRUPTING和INTERRUPTED
            // 而INTERRUPTING和INTERRUPTED同样是取消的状态之一
            // 它表示任务已经开始执行的情况下的强制取消
            return state >= CANCELLED;
        }
    
        public boolean isDone() {
            return state != NEW;
        }
    

      得益于状态机数值的精心设计,isCancelledisDone可以根据状态简单的判定。注意,isDone并不仅仅表示任务正常执行结束或者执行遇到异常,任务被取消或者被打断同样也计入isDoneFutureTask的源码就分析到这里了,下次再分享一下线程池的源码分析吧。

    相关文章

      网友评论

          本文标题:FutureTask源码解析

          本文链接:https://www.haomeiwen.com/subject/pqrmlltx.html