FutureTask 源码分析

作者: jijs | 来源:发表于2017-07-04 22:48 被阅读836次

    FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。
    如:

    1. 取消任务执行
    2. 查询任务是否执行完成
    3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。
      注意:一旦任务执行完成或取消任务,则不能执行取消任务或者重新启动任务。(除非一开始就使用runAndReset模式运行任务)

    FutureTask实现了Runnable接口和Future接口,因此FutureTask可以传递到线程对象Thread或Excutor(线程池)来执行。

    如果在当前线程中需要执行比较耗时的操作,但又不想阻塞当前线程时,可以把这些作业交给FutureTask,另开一个线程在后台完成,当当前线程将来需要时,就可以通过FutureTask对象获得后台作业的计算结果或者执行状态。

    示例

    public class FutureTaskDemo {
    
        public static void main(String[] args) throws InterruptedException{
            FutureTask<Integer> ft = new FutureTask<>(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int num = new Random().nextInt(10);
                    TimeUnit.SECONDS.sleep(num);
                    return num;
                }
            });
            Thread t = new Thread(ft);
            t.start();
            //这里可以做一些其它的事情,跟futureTask任务并行,等需要futureTask的运行结果时,可以调用get方法获取。
            try {
                //等待任务执行完成,获取返回值
                Integer num = ft.get();
                System.out.println(num);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    FutureTask 源码分析

    JDK1.7及之前,FutureTask 通过使用内部类Sync继承AQS来实现。
    内部使用的AQS的共享锁。
    AQS具体实现可参考 AbstractQueuedSynchronizer 源码分析

    JDK1.8没有使用AQS,而是自己实现了一个同步等待队列,在结果返回之前,所有的线程都被阻塞,存放到等待队列中。

    下面我们来分析下JDK1.8的FutureTask 源码

    FutureTask 类结构

    public class FutureTask<V> implements RunnableFuture<V> {
     
       /**
         * 当前任务的运行状态。
         *
         * 可能存在的状态转换
         * NEW -> COMPLETING -> NORMAL(有正常结果)
         * NEW -> COMPLETING -> EXCEPTIONAL(结果为异常)
         * NEW -> CANCELLED(无结果)
         * NEW -> INTERRUPTING -> INTERRUPTED(无结果)
         */
        private volatile int state;
        private static final int NEW          = 0;  //初始状态
        private static final int COMPLETING   = 1;  //结果计算完成或响应中断到赋值给返回值之间的状态。
        private static final int NORMAL       = 2;  //任务正常完成,结果被set
        private static final int EXCEPTIONAL  = 3;  //任务抛出异常
        private static final int CANCELLED    = 4;  //任务已被取消
        private static final int INTERRUPTING = 5;  //线程中断状态被设置ture,但线程未响应中断
        private static final int INTERRUPTED  = 6;  //线程已被中断
    
        //将要执行的任务
        private Callable<V> callable;
        //用于get()返回的结果,也可能是用于get()方法抛出的异常
        private Object outcome; // non-volatile, protected by state reads/writes
        //执行callable的线程,调用FutureTask.run()方法通过CAS设置
        private volatile Thread runner;
        //栈结构的等待队列,该节点是栈中的最顶层节点。
        private volatile WaitNode waiters;
        ....
    

    FutureTask实现的接口信息如下:

    RunnableFuture 接口

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }
    

    RunnableFuture 接口基础了Runnable和Future接口

    Future 接口

    public interface Future<V> {
        //取消任务
        boolean cancel(boolean mayInterruptIfRunning);
        //判断任务是否已经取消
        boolean isCancelled();
        //判断任务是否结束(执行完成或取消)
        boolean isDone();
        //阻塞式获取任务执行结果
        V get() throws InterruptedException, ExecutionException;
        //支持超时获取任务执行结果
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    run 方法

    public void run() {
        //保证callable任务只被运行一次
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            //判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    

    1.如果state状态不为New或者设置运行线程runner失败则直接返回false,说明线程已经启动过,保证任务在同一时刻只被一个线程执行。
    2.调用callable.call()方法,如果调用成功则执行set(result)方法,将state状态设置成NORMAL。如果调用失败抛出异常则执行setException(ex)方法,将state状态设置成EXCEPTIONAL,唤醒所有在get()方法上等待的线程。
    3.如果当前状态为INTERRUPTING(步骤2已CAS失败),则一直调用Thread.yield()直至状态不为INTERRUPTING

    set方法
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    1. 首先通过CAS把state的NEW状态修改成COMPLETING状态。
    2. 修改成功则把v值赋给outcome变量。然后再把state状态修改成NORMAL,表示现在可以获取返回值。
    3. 最后调用finishCompletion()方法,唤醒等待队列中的所有节点。

    setException 方法

    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    

    同上 set(V v) 方法

    1. 首先通过CAS把state的NEW状态修改成COMPLETING状态。
    2. 修改成功则把v值赋给outcome变量。然后再把state状态修改成EXCEPTIONAL,表示待返回的异常信息设置成功。
    3. 最后调用finishCompletion()方法,唤醒等待队列中的所有节点。

    handlePossibleCancellationInterrupt方法

    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
    }
    

    该方法是如果正在响应中断(EXCEPTIONAL),则等待响应中断结束(INTERRUPTED)。

    finishCompletion方法

    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null;) {
            //通过CAS把栈顶的元素置为null,相当于弹出栈顶元素
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }
    

    把栈中的元素一个一个弹出,并通过 LockSupport.unpark(t)唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能cancel,异常等)

    runAndReset 方法 (可被子类重写,外部无法直接调用)

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }
    

    该方法和run方法的区别是,run方法只能被运行一次任务,而该方法可以多次运行任务。而runAndReset这个方法不会设置任务的执行结果值,如果该任务成功执行完成后,不修改state的状态,还是可运行(NEW)状态,如果取消任务或出现异常,则不会再次执行。
    而只是执行任务完之后,

    get方法

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    

    如果state状态小于等于COMPLETING,说明任务还没开始执行或还未执行完成,然后调用awaitDone方法阻塞该调用线程。如果state的状态大于COMPLETING,则说明任务执行完成,或发生异常、中断、取消状态。直接通过report方法返回执行结果。

    get(long timeout, TimeUnit unit) 方法

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    

    同上面get方法,该get方法支持阻塞等待多长时间,如果超时直接抛出TimeoutException异常。

    report方法

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
    

    如果state的状态为NORMAL,说明任务正确执行完成,直接返回计算后的值。
    如果state的状态大于等于CANCELLED,说明任务被成功取消执行、或响应中断,直接返回CancellationException异常
    否则返回ExecutionException异常。

    awaitDone 方法

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //如果该线程执行interrupt()方法,则从队列中移除该节点,并抛出异常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s = state;
            //如果state状态大于COMPLETING 则说明任务执行完成,或取消
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快。
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            //构建节点
            else if (q == null)
                q = new WaitNode();
            //把当前节点入栈
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            //如果需要阻塞指定时间,则使用LockSupport.parkNanos阻塞指定时间
            //如果到指定时间还没执行完,则从队列中移除该节点,并返回当前状态
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                //阻塞当前线程
                else
                    LockSupport.park(this);
            }
    }
    

    构建栈链表的节点元素,并将该节点入站,同时阻塞当前线程等待运行主任务的线程唤醒该节点。
    JDK1.7版本是使用AQS的双向链表队列实现的。

    removeWaiter 方法

    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                        continue retry;
                }
                break;
            }
        }
    }
    

    移除栈中的节点元素,需要使用CAS自旋来保障移除成功。

    cancel方法

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    
    1. 根据mayInterruptIfRunning是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则返回false
    2. 如果mayInterruptIfRunning为true,调用runner.interupt(),设置状态为INTERRUPTED
    3. 唤醒所有在get()方法等待的线程

    总结:状态为NEW时,cancel和run方法才可以被运行。

    1. 任务开始运行后,不能在次运行,保证只运行一次(runAndReset 方法除外)
    2. 任务还未开始,或者任务已被运行,但未结束,这两种情况下都可以取消; 如果任务已经结束,则不可以被取消 。

    想了解更多精彩内容请关注我的公众号

    相关文章

      网友评论

        本文标题:FutureTask 源码分析

        本文链接:https://www.haomeiwen.com/subject/kwsecxtx.html