美文网首页程序员
JUC源码分析-线程池篇(二):FutureTask

JUC源码分析-线程池篇(二):FutureTask

作者: 泰迪的bagwell | 来源:发表于2018-02-07 22:13 被阅读0次

    Future 表示了一个任务的生命周期,是一个可取消的异步运算,可以把它看作是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。在并发包中许多异步任务类都继承自Future,其中最典型的就是 FutureTask,本章我们将通过对 FutureTask 的源码解析来看一下异步任务的实现方式。


    概述

    FutureTask 为 Future 提供了基础实现,如获取任务执行结果(get)和取消任务(cancel)等。如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消(除非使用runAndReset执行计算)。FutureTask 常用来封装 Callable 和 Runnable,也可以作为一个任务提交到线程池中执行。除了作为一个独立的类之外,此类也提供了一些功能性函数供我们创建自定义 task 类使用。FutureTask 的线程安全由CAS来保证。

    FutureTask 内部维护了一个由volatile修饰的int型变量—state,代表当前任务的运行状态,state有七种状态:

    • NEW:新建
    • COMPLETING:完成
    • NORMAL:正常运行
    • EXCEPTIONAL:异常退出
    • CANCELLED:任务取消
    • INTERRUPTING:线程中断中
    • INTERRUPTED:线程已中断

    在这七种状态中,有四种任务终止状态:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED。各种状态的转化如下:


    FutureTask 任务状态

    数据结构及核心参数

    FutureTask 继承关系
    //内部持有的callable任务,运行完毕后置空
    private Callable<V> callable;
    
    //从get()中返回的结果或抛出的异常
    private Object outcome; // non-volatile, protected by state reads/writes
    
    //运行callable的线程
    private volatile Thread runner;
    
    //使用Treiber栈保存等待线程
    private volatile WaitNode waiters;
    

    FutureTask 继承了RunnaleFuture,本身也作为一个线程运行。并且维护了一个内部类WaitNode,使用简单的Treiber栈(无锁并发栈)实现,用于存储等待线程。


    源码解析

    AbstractExecutorService.submit(Callable<T> task)

    FutureTask 内部的实现方法都很简单,这里我们先从线程池的submit方法开始分析,一步一步分析FutureTasksubmit方法默认实现在AbstractExecutorService中,几种实现源码如下:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    

    说明:方法比较简单,首先调用newTaskFor方法构造FutureTask,然后调用execute把任务放进线程池中,返回FutureTask


    FutureTask.run()

    public void run() {
        //新建任务,CAS替换runner为当前线程
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);//设置执行结果
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);//处理中断逻辑
        }
    }
    

    说明

    1. 运行任务,如果任务状态为NEW状态,则利用CAS修改为当前线程。执行完毕调用set(result)方法设置执行结果。set(result)源码如下:
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();//执行完毕,唤醒等待线程
        }
    }
    
    1. 首先利用cas修改state状态为COMPLETING,设置返回结果,然后使用 lazySet(UNSAFE.putOrderedInt)的方式设置state状态为NORMAL。结果设置完毕后,调用finishCompletion()方法唤醒等待线程,源码如下:
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待线程
                for (;;) {//自旋遍历等待线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//唤醒等待线程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        //任务完成后调用函数,自定义扩展
        done();
    
        callable = null;        // to reduce footprint
    }
    
    1. 回到run方法,如果在 run 期间被中断,此时需要调用handlePossibleCancellationInterrupt方法来处理中断逻辑,确保任何中断(例如cancel(true))只停留在当前runrunAndReset的任务中,源码如下:
    private void handlePossibleCancellationInterrupt(int s) {
        //在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
    }
    

    FutureTask.runAndReset()

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }
    

    说明runAndReset是 FutureTas k的另外一个任务运行的方法,它不会返回执行结果,而且在任务执行完之后会重置stat的状态为NEW,使任务可以多次执行。runAndReset的典型应用是在 ScheduledThreadPoolExecutor 中,周期性的执行任务。


    FutureTask.get()

    //获取执行结果
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    

    说明:FutureTask 通过get()方法获取任务执行结果。如果任务处于未完成的状态(state <= COMPLETING),就调用awaitDone方法(后面单独讲解)等待任务完成。任务完成后,通过report方法获取执行结果或抛出执行期间的异常。report源码如下:

    //返回执行结果或抛出异常
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
    

    awaitDone(boolean timed, long nanos)

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {//自旋
            if (Thread.interrupted()) {//获取并清除中断状态
                removeWaiter(q);//移除等待WaitNode
                throw new InterruptedException();
            }
    
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;//置空等待节点的线程
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                //CAS修改waiter
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);//超时,移除等待节点
                    return state;
                }
                LockSupport.parkNanos(this, nanos);//阻塞当前线程
            }
            else
                LockSupport.park(this);//阻塞当前线程
        }
    }
    

    说明awaitDone用于等待任务完成,或任务因为中断或超时而终止。返回任务的完成状态。函数执行逻辑如下:

    1. 如果线程被中断,首先清除中断状态,调用removeWaiter移除等待节点,然后抛出InterruptedExceptionremoveWaiter源码如下:
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;//首先置空线程
            retry:
            for (;;) {          // restart on removeWaiter race
                //依次遍历查找
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s)) //cas替换
                        continue retry;
                }
                break;
            }
        }
    }
    
    1. 如果当前状态为结束状态(state>COMPLETING),则根据需要置空等待节点的线程,并返回 Future 状态;
    2. 如果当前状态为正在完成(COMPLETING),说明此时 Future 还不能做出超时动作,为任务让出CPU执行时间片;
    3. 如果stateNEW,先新建一个WaitNode,然后CAS修改当前waiters
    4. 如果等待超时,则调用removeWaiter移除等待节点,返回任务状态;如果设置了超时时间但是尚未超时,则park阻塞当前线程;
    5. 其他情况直接阻塞当前线程。

    FutureTask.cancel(boolean mayInterruptIfRunning)

    public boolean cancel(boolean mayInterruptIfRunning) {
        //如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {//可以在运行时中断
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();//移除并唤醒所有等待线程
        }
        return true;
    }
    

    说明:尝试取消任务。如果任务已经完成或已经被取消,此操作会失败。
    如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTINGCANCELLED
    如果当前状态不为NEW,则根据参数mayInterruptIfRunning决定是否在任务运行中也可以中断。中断操作完成后,调用finishCompletion移除并唤醒所有等待线程。


    小结

    本章重点:FutureTask 结果返回机制,以及内部运行状态的转变

    相关文章

      网友评论

        本文标题:JUC源码分析-线程池篇(二):FutureTask

        本文链接:https://www.haomeiwen.com/subject/xxlozxtx.html