美文网首页我爱编程程序员
[Java源码][并发J.U.C]---解析FutureTask

[Java源码][并发J.U.C]---解析FutureTask

作者: nicktming | 来源:发表于2018-10-28 18:08 被阅读2次

    前言

    Future接口和实现Future接口的FutureTask类,代表异步计算的结果. 简单点说就是实现有返回结果的task, 实现Runnable接口的线程没有提供获得线程返回的结果, 而FutureTask实现了异步获得计算结果的一种方式, 也就是说可以先让一个线程去执行该task后自己去干其他的事情,等到一段时间后可以来获取该task的执行结果.

    本文源码: 本文源码地址

    例子

    先使用一个例子简单看看FutureTask的使用.

    package com.futuretask;
    
    import java.util.concurrent.Callable;
    
    public class FutureTaskTest02 {
        public static void main(String[] args) throws Exception {
            FutureTask<String> futureTask = new FutureTask< >(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.out.println(Thread.currentThread().getName() + " starts to run.");
                    Thread.sleep(5000);
                    System.out.println(Thread.currentThread().getName() + " wakes up.");
                    return "futurecall";
                }
            });
    
            Thread thread = new Thread(futureTask);
    
            thread.start();
    
            System.out.println(Thread.currentThread().getName() + " finished to start thread.");
    
            System.out.println(Thread.currentThread().getName() + "->" + futureTask.get());
        }
    }
    

    由代码中可以看到初始化一个Thread时, 传入一个FutureTask对象, 正常创建一个线程, 要传入一个Runnable对象, 其实FutureTaskRunnable的一个子类. 所以就好理解了, 另外还注意的是FutureTask对象传入了一个Callable实例, 暂时可以理解call方法为Runnable里面的run方法,是线程要执行的实体. 接着启动线程后在主线程中可以获得线程
    FutureTask的结果.

    输出结果如下: 可以看到FutureTask中可以得到线程执行结束后得到的结果.

    main finished to start thread.
    Thread-0 starts to run.
    Thread-0 wakes up.
    main->futurecall
    

    类结构

    futureTask.png

    可以看到FutureTask实现了RunnableFuture接口, 然而RunnableFuture接口继承了Runnable接口和Future接口. 同时类FutureTask中使用了Callable对象, Callable接口定义了call由用户实现并且注入到FutureTask中.

    由此可以猜测上例中thread中真正调用的是FutureTaskrun方法, 而run方法中实际调用了Callablecall方法并返回值, 关于取消获取返回值之类的方法都是FutureTask定义了一些逻辑来实现了Future的所有接口方法.

    源码

    接下来将分析一个FutureTask类.

    属性

    private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    
        /** The underlying callable; nulled out after running */
        private Callable<V> callable;
        /** The result to return or exception to throw from get() */
        private Object outcome; // non-volatile, protected by state reads/writes
        /** The thread running the callable; CASed during run() */
        private volatile Thread runner;
        /** Treiber stack of waiting threads */
        private volatile WaitNode waiters;
    

    之前有说FutureTask是设计了一些逻辑来实现Future接口中的方法. 这些逻辑的基本线就是基于state, state是表示当前线程执行该任务的一些状态值. 状态值就是代码中对应的那些值, 他们的状态值转换只有下面这四种可能性.

    NEW -> COMPLETING -> NORMAL
    NEW -> COMPLETING -> EXCEPTIONAL
    NEW -> CANCELLED
    NEW -> INTERRUPTING -> INTERRUPTED
    

    接下来基于这四种可能性, 我们通过源码和例子共同来测试和查看, 弄明白每种可能性执行的代码逻辑. 最后加以总结.

    构造方法

        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    

    这个就不多说了, 可以看到初始状态为NEW.

    run 方法

    接着看要执行的run方法.

        /**
         *  最终运行的方法
         */
        public void run() {
            // 如果状态值不为NEW 表示已经有线程运行过该task了 因此返回
            // 如果状态值为NEW 则设置RUNNER为当前线程 如果设置不成功也返回
            if (state != NEW ||                                                                 // 1
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                return;
            // 进入到这里 表明执行该task的是当前线程已经被设置到RUNNER变量中并且状态值state为NEW
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {                                                // 2
                    /**
                     * result 接收callable的返回值
                     * ran    表示callable方法是否正确执行完成
                     */
                    V result;
                    boolean ran;
                    try {
                        // 调用callable的方法call 并把结果放到result中
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {                                                    // 3
                        // call()方法出现异常执行的操作
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    // call()正确执行完执行的操作
                    if (ran)                                                                    // 4
                        set(result);
                }
            } finally {                                                                         
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                /**
                 * 设置执行线程为null 不需要用CAS是因为前面的CAS会挡住其他的线程进入
                 */
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                /**
                 *  如果s >= INTERRUPTING 则调用handlePossibleCancellationInterrupt(s)方法
                 *  那什么时候会 s >= INTERRUPTING呢?
                 *      -> 调用cancel(true)并且没有在setException(ex)和set(result)发生前
                 */
                int s = state;
                if (s >= INTERRUPTING)                                                          // 5              
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    按代码中顺序:
    1. 如果状态值不为NEW, 表明有线程已经在执行这个run方法, 因此直接返回. 如果状态值为NEW则把当前线程设置到runner变量中,由于是多线程操作,为了保持线程间可见性, runner变量是volatile 并且用CAS设置.
    2. 进入到第2步, 为什么需要进行判断if (c != null && state == NEW)呢? 这是因为如果刚刚结束完第1步正在进入第2步的过程中,别的线程启动了cancel(false/true)方法(该方法会在后面分析,此时只需要知道这个即可), 都会导致state不为NEW的. 接着就开始执行callablecall方法, 用result接受返回值, 用ran表示call方法是否完整顺利的执行.
    3. 进入到第3步, 表明call方法出了异常没有正常顺利的执行完, 此时设置resultnull,ranfalse, 并且通过setException(ex)进入到异常结束状态. 看如下代码:

        protected void setException(Throwable t) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = t;
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }
    

    可以看到会把异常对象传给outcome, 并且设置状态. 由此可见状态值是
    NEW -> COMPLETING -> EXCEPTIONAL 并且 COMPLETING 是一个中间状态. finishCompletion()是做一些收尾工作,会放到后面分析.

    4. 如果call方法正常顺利执行完, 则会调用set(result)设置正常结束状态. 看如下代码段:

    protected void set(V v) {
            //System.out.println(" set after state:" + state);
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                //System.out.println(" set before state:" + state);
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    

    可以看到outcome是保存最终的结果包括返回值或者异常对象. 由此可见状态值是NEW -> COMPLETING -> NORMAL 并且 COMPLETING 是一个中间状态. finishCompletion()会放到后面分析.

    5. 这个是finally执行片段, 这段代码会把runner设置为null, 这里不需要用CAS是因为通过上面的片段,只有放到runner的那个线程才可以执行到try...finally...的片段. 上面也有提到过如果在第1步和第2步之间发生了cancel方法. 为了可以更加清晰的明白此代码段,看一下cancel方法的代码及其作用:

    cancel方法

    public boolean cancel(boolean mayInterruptIfRunning) {
            /**
             *  相当于 state != NEW || !UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
             *                         mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
             *  如果状态值不是NEW 则直接返回 (只有在状态值是NEW的情况下才进行取消操作)
             *  如果状态值是NEW并且CAS操作失败 会直接返回false
             *  CAS操作成功会继续执行后续的操作, CAS分两种:
             *  mayINterruptIfRunning = true:
             *      状态值 NEW -> INTERRUPTING -> INTERRUPTED (期间调用runner.interrupt()方法中断执行call方法的线程)
             *  mayINterruptIfRunning = false:
             *      状态值 NEW -> CANCELLED
             *
             *  最后都会调用finishCompletion();
             */
            if (!(state == NEW &&
                    UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                            mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();
            }
            return true;
        }
    

    mayInterruptIfRunning: 表示是否需要中断执行task的线程.
    (1) 如果state不为NEW, 表明state已经是此两种NEW -> COMPLETING -> EXCEPTIONALNEW -> COMPLETING -> NORMAL中的一种了, 说明已经task已经完成了, 已经没有必要取消任务了, 直接返回false.
    (2) 如果stateNEW, 则根据mayInterruptIfRunning的值来设置state的值, 如果为trueNEW -> INTERRUPTING (INTERRUPTING是一个中间状态), 如果falseNEW -> CANCELLED 是一个最终状态.
    (3) 如果myaInterruptIfRunningtrue则需要调用interrupt()去中断执行task的线程. 并且把NEW -> INTERRUPTING 设置为 NEW -> INTERRUPTING -> INTERRUPTED.
    (4) 最终执行finishCompletion(), 做一些收尾工作, 会在后续分析.

    另外这里需要注意几点:
    1. 通过interrupt()中断线程, 只有在此线程在执行wait,sleep,park等方法的时候才会真正中断,意思是会从这些方法中返回并抛出InterrruptedException, 其余的情况只是把此线程的中断状态设置为true后什么也没有做, 并不能真正的中断此线程. 因此如果自定义的call方法中在别的线程调用interrupt()方法的时候没有在执行wait,sleep,park这些方法的时候并不会起作用. 关于中断的详细理解可以参考我的另一篇博客 [并发J.U.C] 用例子理解线程中断.
    2.cancel方法和run方法中可以看到(当然一般情况下这两个方法是通过两个不同线程来执行的), 这两个方法执行的逻辑顺序是通过state来决定的. 接下来是分析一下cancel方法会出现在run方法的那个代码片段.

    2.1 如果cancel执行的时候state的时候不为NEW, 则run中已经调用了setException(ex)set(result)方法并且已经改变了state值, 此时cancel已经无意义了, 直接返回false.
    2.2 如果cancel执行的时候state的时候为NEW, 那cancel方法此时可以出现在run方法中的那几个时间点呢?

    2.2.1 出现在run方法中的12之间, 那如果此时cancelCAS执行设置NEW状态为INTERRUPTING或者CANCELLED, 则run会直接进入到finally代码块, 也就是run中的5部分的handlePossibleCancellationInterrupt(s)会被执行. 如果此时cancelCAS还没有执行,则run方法会进入到2中执行call方法, 在没有执行setException(ex)set(result)前, cancel中的CAS操作仍然可以成功, 其实就是看谁先改变state的状态值了. 自行分析即可.

    现在已经明白了什么时候会执行handlePossibleCancellationInterrupt(s), 接下来看看它做了什么事情:

    private void handlePossibleCancellationInterrupt(int s) {
            // It is possible for our interrupter to stall before getting a
            // chance to interrupt us.  Let's spin-wait patiently.
            if (s == INTERRUPTING)
                while (state == INTERRUPTING)
                    Thread.yield(); // wait out pending interrupt
    
            // assert state == INTERRUPTED;
    
            // We want to clear any interrupt we may have received from
            // cancel(true).  However, it is permissible to use interrupts
            // as an independent mechanism for a task to communicate with
            // its caller, and there is no way to clear only the
            // cancellation interrupt.
            //
            // Thread.interrupted();
        }
    

    作用: 可以看到该方法是为了保证让state值从INTERRUPTING变为INTERRUPTED, 从上面分析我们知道cancel方法在mayInterruptIfRunningtrue时会执行中断执行任务的线程,也知道执行cancel方法和执行run一般情况下是两个不同的线程, 所以执行任务的线程也就是执行handlePossibleCancellationInterrupt方法的线程让出cpu控制权,让其余的线程执行, 直到执行cancel的线程把状态值变为INTERRUPTED.

    由此我们已经分析了四种状态值的变化过程, 并且知道最终的返回值或者异常对象是保存在outcome中. 接下来主要分析一下别的线程是如何获得这个task的结果的.

    get方法

    获得返回值或者异常对象使用get() 或 带有超时时间的get(long timeout, TimeUnit unit)方法. 有一个线程执行task, 从上面的代码分析中我们也知道当有一个线程执行task的时候, 别的线程就无法执行task了,直接取结果就可以了. 接下来看看get方法的逻辑.

    /**
         * @throws CancellationException {@inheritDoc}
         */
        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    

    可以看到该方法会响应InterruptedExceptionExecutionException. 由于取返回值可以有很多线程来取值, 如果任务还没有执行完成, 则需要把这些线程休眠等待用一个链表来串联这些线程. 当状态为COMPLETING或者NEW的时候(也就是s <= COMPLETING), 表明任务还在进行中, 需要调用awaitDone(false, 0L)来把当前线程加入到链表中. 如果s > COMPLETING, 则调用report(s)返回结果.

    接着看看awaitDone方法

        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }
        /**
         * Awaits completion or aborts on interrupt or timeout.
         *
         * @param timed true true表示有超时限制/false表示会一直等待
         * @param nanos time 超时时间
         * @return state upon completion
         */
        private int awaitDone(boolean timed, long nanos)
                throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false; // 是否进队列/链表
            for (;;) {
                /**
                  *  如果线程已经被中断 则删除该节点并抛出InterruptedException
                  *  Thread.interrupted() 会将中断状态设置为false
                  */
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                /**
                 * s >COMPLETING 表明task已经完成
                 * 如果q的thread不为null 则设置为null
                 * 返回s
                 */
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                /**
                 * 在中间状态值此时让出cpu控制权
                 * 好让执行run方法的线程继续执行以达到最终状态
                 */
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                /**
                 * 此时状态值为NEW
                 * 如果q==null 就创建一个新的WaitNode
                 */
                else if (q == null)
                    q = new WaitNode();
                /**
                 * 此时状态值为NEW并且已经创建好了WaitNode即 q!=null
                 * 如果没有入队列即链表 则加入到链表中
                 */
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                            q.next = waiters, q);
                /**
                 *  判断是否是超时等待
                 */
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                /**
                 * 如果是一直等待则调用park方法使当前线程休眠
                 */
                else
                    LockSupport.park(this);
            }
        }
    

    作用: 是将当前线程加入到等待队列中并且返回状态值. 当当前线程被其他线程中断时抛出中断异常InterruptedException. 无限循环做以下事情:
    1. 如果线程已经被中断 则删除该节点并抛出InterruptedException.
    2. 如果task已经完成即s > COMPLETING, 则返回状态值.
    3. 如果s == COMPLETING表明执行task的线程正在执行task,那当前线程让出cpu控制权.
    4. 此时s == NEW 如果q没有初始化则初始化q.
    5. 如果s == NEW 并且 queued == false 则把当前节点q加入到链表中.
    6. 此时s == NEW 并且当前线程所代表的节点已经在链表中, 如果设置了超时等待, 则计算时间进行判断等等.
    7. 此时s == NEW 并且当前线程所代表的节点已经在链表中并且是一直等待, 因此直接调用park方法即可.

    简单看一下removeWaiter方法, 可以学习一下CAS在链表上的操作.

        private void removeWaiter(WaitNode node) {
            if (node != null) {
                node.thread = null;
                retry:
                for (;;) {          // restart on removeWaiter race
                    /**
                     *  q 为当前节点
                     *  s 为下一个节点
                     *  pred 是前一个节点
                     */
                    for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                        s = q.next;
                        if (q.thread != null)
                            pred = q;
                        else if (pred != null) {
                            pred.next = s;
                            if (pred.thread == null) // check for race
                                continue retry;
                        }
                        else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                q, s))
                            continue retry;
                    }
                    break;
                }
            }
        }
    

    在理解了WaitNode后, 就可以看一下finishCompletion是如何做收尾工作了, 不过大致也可以猜测得到, 对链表进行清理工作,因为链表中所代表的线程都是在等待结果的,当调用finishCompletion表明已经结束了task任务,所以可以唤醒这些线程去获取结果了.

    private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    /**
                     * 将waiters设置为null
                     * 用CAS意味着只能有一个线程可以进入到里面的for循环操作链表
                     */
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t); // 唤醒线程
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }
    

    在理解了不带超时时间的get方法, 带超时时间的get逻辑也差不多.

        public V get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                    (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    

    多了一个TimeoutException.

    接下来看看report(s)方法

    @SuppressWarnings("unchecked")
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            // 正常结束
            if (s == NORMAL)
                return (V)x;
            // 取消操作结束 抛出CancellationException(运行时异常)
            if (s >= CANCELLED)
                throw new CancellationException();
            // 异常结束 抛出ExecutionException异常
            throw new ExecutionException((Throwable)x);
        }
    

    作用: 返回最终的结果或者抛出异常.
    1. 正常结束(NEW -> COMPLETING -> NORMAL),返回task结束后的返回值.
    2. 取消操作结束(NEW -> INTERRUPTING -> INTERRUPTED 或者 NEW -> CANCELLED), 抛出运行时异常CancellationException.
    3. 异常结束(NEW -> COMPLETING -> EXCEPTIONAL), 抛出ExecutionException异常.

    例子

    接下来将用几个简单的例子来看看这几种状态. 在文章最开始的例子中很明显的就是NEW -> COMPLETING -> NORMAL这种路线的变化.

    例子 NEW -> COMPLETING -> EXCEPTIONAL

    Callable中的call方法在执行过程中抛出了异常, 最后看看结果会如何.

    package com.futuretask;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    
    public class FutureTaskTest03 {
        public static void main(String[] args) {
            FutureTask<String> futureTask = new FutureTask< >(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.out.println(Thread.currentThread().getName() + " starts to run.");
                    //Thread.sleep(5000);
                    if(true) throw new InterruptedException();
                    System.out.println(Thread.currentThread().getName() + " wakes up.");
                    return "futurecall";
                }
            });
    
            Thread thread = new Thread(futureTask);
            thread.start();
            try {
                System.out.println(Thread.currentThread().getName() + "->" + futureTask.get() + "<-");
            } catch (ExecutionException ex) {
                System.out.println("ExecutionException exception:" + ex);
            } catch (InterruptedException ex) {
                System.out.println("InterruptedException exception:" + ex);
            } catch (Exception ex) {
                System.out.println("Exception exception:" + ex);
            }
            System.out.println(Thread.currentThread().getName() + " finished!");
        }
    }
    

    输出: 可以看到最终是抛出了ExecutionException异常.由此可见在report方法中是NEW -> COMPLETING -> EXCEPTIONAL, 因为run方法中执行call的时候抛出了异常. 同样可以在代码中进行调试查看.

    Thread-0 starts to run.
    ExecutionException exception:java.util.concurrent.ExecutionException: java.lang.InterruptedException
    main finished!
    

    例子 NEW -> INTERRUPTING -> INTERRUPTED 或者 NEW -> CANCELLED Cancel真的是否可以取消任务?

    通过以下两个例子来验证cancel方法是否真的可以取消任务.

    package com.futuretask;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    
    public class FutureTaskTest {
        static final int COUNT = 1000000000;
        public static void main(String[] args) throws Exception {
            FutureTask<String> futureTask = new FutureTask< >(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.out.println(Thread.currentThread().getName() + " starts to run.");
                    //Thread.sleep(5000);
                    int i = 0;
                    while (i < COUNT) {
                        i++;
                        int j = 0;
                        while (j < COUNT) {
                            j++;
                        }
                    }
                    System.out.println(Thread.currentThread().getName() + " wakes up.");
                    return "futurecall";
                }
            });
    
            Thread thread = new Thread(futureTask);
    
            thread.start();
    
            System.out.println(Thread.currentThread().getName() + " finished to start thread.");
    
            futureTask.cancel(true);
    
            try {
                System.out.println(Thread.currentThread().getName() + "->" + futureTask.get() + "<-");
            } catch (ExecutionException ex) {
                System.out.println("ExecutionException exception:" + ex);
            } catch (InterruptedException ex) {
                System.out.println("InterruptedException exception:" + ex);
            } catch (Exception ex) {
                System.out.println("Exception exception:" + ex);
            }
            System.out.println(Thread.currentThread().getName() + " finished!");
        }
    }
    

    输出: 可以看到最终的report方法抛出了CancellationException异常. 同时也可以看到Thread-0(执行task的线程)在task被取消后依然完成了其余的工作. 可以得知Thread-0在执行call方法中两层while循环时被别的线程(在这里是主线程)取消任务, 然后在setException(Throwable t)或者set(V v)执行改变了state的值. 但是cancel操作并没有真正取消task的任务. 如果把COUNT值变小一点, 有可能会是NEW -> COMPLETING -> NORMAL流程(也就是cancel操作发生在set(V v)之后)

    Thread-0 starts to run.
    main finished to start thread.
    Exception exception:java.util.concurrent.CancellationException
    main finished!
    Thread-0 wakes up.
    

    接下来看个可以cancel部分任务的操作.

    static final int COUNT = 1000000000;
        public static void main(String[] args) throws Exception {
            FutureTask<String> futureTask = new FutureTask< >(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.out.println(Thread.currentThread().getName() + " starts to run.");
                    Thread.sleep(5000);
                    System.out.println(Thread.currentThread().getName() + " wakes up.");
                    return "futurecall";
                }
            });
    
            Thread thread = new Thread(futureTask);
    
            thread.start();
    
            System.out.println(Thread.currentThread().getName() + " finished to start thread.");
    
            futureTask.cancel(true);
    
            try {
                System.out.println(Thread.currentThread().getName() + "->" + futureTask.get() + "<-");
            } catch (ExecutionException ex) {
                System.out.println("ExecutionException exception:" + ex);
            } catch (InterruptedException ex) {
                System.out.println("InterruptedException exception:" + ex);
            } catch (Exception ex) {
                System.out.println("Exception exception:" + ex);
            }
            System.out.println(Thread.currentThread().getName() + " finished!");
        }
    

    执行结果: 可以看到cancel后并没有执行System.out.println(Thread.currentThread().getName() + " wakes up.");这句话, 因为在cancel方法中调用interrupt()方法会使得Thread-0sleep中返回并抛出InterruptedException. 关于中断那部分可以看一下[并发J.U.C] 用例子理解线程中断.

    Thread-0 starts to run.
    main finished to start thread.
    Exception exception:java.util.concurrent.CancellationException
    main finished!
    

    参考

    1. Java 1.8 源码
    2. Java并发编程的艺术

    相关文章

      网友评论

        本文标题:[Java源码][并发J.U.C]---解析FutureTask

        本文链接:https://www.haomeiwen.com/subject/shystqtx.html