美文网首页
FutureTask源码解析

FutureTask源码解析

作者: 多喝水JS | 来源:发表于2019-01-29 11:19 被阅读4次

    简介

    FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,可以把它理解为是可以返回结果的Runnable。

    使用FutureTask的优势有:

    1、可以获取线程执行后的返回结果;
    2、提供了超时控制功能。

    缺陷:
    1、虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果
    2、想完成一些复杂的任务可能就比较难。比如下面一些例子:
    ①将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果。
    ②当Future集合中某个任务最快结束时,返回结果。
    ③等待Future结合中的所有任务都完成。

    实现的接口

    实现了Runnable接口和Future接口:

    例子

    public class FutureTaskTest {
    
        public static void main(String[] args) {
            Callable<Integer> call = new Callable<Integer>() {
    
                @Override
                public Integer call() throws Exception {
                    System.out.println("process ...");
                    Thread.sleep(3000);
                    return 1;
                }
            };
            FutureTask<Integer> task = new FutureTask<>(call);
    
            Thread thread = new Thread(task);
            thread.start();
            Thread thread1 = new Thread(task);
            thread1.start();
    
            System.out.println("do something...");
    
            try {
                Integer result = task.get();
                System.out.println(result);
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
    }
    输出:
    process ...
    do something...
    1
    

    上面代码在在任务执行时,不需要一直等待其运行结束返回结果,而是可以先去处理其他的事情,然后再获取返回结果。

    原理

    FutureTask构造器

     public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
     }
    public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
    }
    public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    

    可以看出FutureTask提供两种方式来执行任务:
    第一种是直接通过callable。
    第二种构造方法传入一个Runnable对象和一个返回值对象,然后把他们封装为callable(RunnableAdapter),所以本质上还是通过callable来调用的。另因为Runnable是没有返回值的,所以要通过result参数在执行完之后返回结果。

    核心成员变量

    volatile int state;//表示对象状态,volatile关键字保证了内存可见性。futureTask中定义了7种状态,代表了7种不同的执行状态
    private static final int NEW          = 0; //任务新建和执行中
    private static final int COMPLETING   = 1; //任务将要执行完毕
    private static final int NORMAL       = 2; //任务正常执行结束
    private static final int EXCEPTIONAL  = 3; //任务异常
    private static final int CANCELLED    = 4; //任务取消
    private static final int INTERRUPTING = 5; //任务线程即将被中断
    private static final int INTERRUPTED  = 6; //任务线程已中断
    

    run方法

     public void run() {
            /*
         * 首先判断状态,如果不是初始状态,说明任务已经被执行或取消;
         * runner是FutureTask的一个属性,用于保存执行任务的线程,
         * 如果不为空则表示已经有线程正在执行,这里用CAS来设置,失败则返回。
         */
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        //这里调用目标方法
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        //设置返回结果对象
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
              // 无论是否执行成功,把runner设置为null
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                // 如果被中断,则说明调用的cancel(true),
              // 这里要保证在cancel方法中把state设置为INTERRUPTED
              // 否则可能在cancel方法中还没执行中断,造成中断的泄露
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
       //设置返回结果对象
         protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                //当前任务执行完成(不管成功还是失败),唤醒等待队列中的下一个任务
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    

    总结一下run方法的执行过程

    1、只有state为NEW的时候才执行任务;
    2、执行前要设置runner为当前线程,使用CAS来设置是为了防止竞争;
    3、如果任务执行成功,任务状态从NEW转换为COMPLETING,如果执行正常,设置最终状态为NORMAL;如果执行中出现了异常,设置最终状态为EXCEPTIONAL
    4、任务不管成功或者失败都唤醒并删除Treiber Stack中的所有节点;
    5、如果调用了cancel(true)方法进行了中断,要确保在run方法执行结束前的状态是INTERRUPTED

    finishCompletion方法

    private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }
    

    在调用get方法时,如果任务还没有执行结束,则会阻塞调用的线程,然后把调用的线程放入waiters中,这时,如果任务执行完毕,也就是调用了finishCompletion方法,waiters会依次出栈并逐个唤醒对应的线程。

    get方法

     public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    
        /**
         * @throws CancellationException {@inheritDoc}
         */
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    

    1、这两个方法类似,首先判断状态,如果s <= COMPLETING,说明任务已经执行完毕,但set方法或setException方法还未执行结束(还未设置状态为NORMALEXCEPTIONAL),这时需要将当前线程添加到waiters中并阻塞。

    2、第二种get提供了超时功能,如果在规定时间内任务还未执行完毕或者状态还是COMPLETING,则获取结果超时,抛出TimeoutException。而第一种get会一直阻塞直到state > COMPLETING
    3、两个方法最终都调用report方法返回结果:如果state==NORMAL,将返回结果,否则抛异常

    cancel方法

     public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            // mayInterruptIfRunning参数表示是否要进行中断
            if (mayInterruptIfRunning) {
                try {
                    // runner保存着当前执行任务的线程
                    Thread t = runner;
                    // 中断线程
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    // 设置最终状态为INTERRUPTED
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    

    如果状态不是NEW,或者设置状态为INTERRUPTINGCANCELLED失败,则取消失败,返回false。

    简单来说有一下两种情况:

    1、如果当前任务还没有执行,那么state == NEW,那么会尝试设置状态,如果设置状态失败会返回false,表示取消失败;
    2、如果当前任务已经被执行了,那么state > NEW,也就是!state == NEW为true,直接返回false。也就是说,如果任务一旦开始执行了(state != NEW),那么就不能被取消。
    如果mayInterruptIfRunning为true,要中断当前执行任务的线程。

    总结

    1、FutureTask是线程安全的,在多线程下任务也只会被执行一次;
    2、get方法调用时,如果任务没有结束,要阻塞当前线程,阻塞的线程将保存在一个Treiber Stack中;
    3、get方法超时功能如果超时未获取成功,会抛出TimeoutException;

    相关文章

      网友评论

          本文标题:FutureTask源码解析

          本文链接:https://www.haomeiwen.com/subject/hagtsqtx.html