美文网首页Java 杂谈多线程技术分享
Java 多线程之 FutureTask 源码剖析

Java 多线程之 FutureTask 源码剖析

作者: 爱打乒乓的程序员 | 来源:发表于2019-10-15 09:24 被阅读0次

    系统通过多线程优化性能,实际上就是将串行操作转换为并行操作,也就是说将同步操作转换为异步操作。在众多并发类中,FutureTask 类可以接收线程返回的结果,并且可以取消或者中断线程。

    先看下 FutureTask 类的类图结构:


    由类图可以知道,FutureTask 类是 Runnable 的实现类,所以可以通过线程池 submit() 或者直接 new Thread() 启动线程。

    举个栗子:

    public class FutureTaskDemo {
        public static void main(String[] args) {
            MyCallableDemo demo = new MyCallableDemo();
    //        // 1.直接通过new Thread()启动线程
    //        FutureTask task = new FutureTask(demo);
    //        new Thread(task).start();
    //        try {
    //            System.out.println(task.get());
    //        } catch (Exception e) {
    //            e.printStackTrace();
    //        }
    
            // 2.通过线程池启动线程
            ExecutorService service = Executors.newSingleThreadExecutor();
    
            Future<String> result = service.submit(demo);
            try {
                System.out.println(result.get());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                service.shutdown();
            }
        }
    }
    
    class MyCallableDemo implements Callable {
        @Override
        public String call() throws Exception {
            return "MuggleLee";
        }
    }
    

    输出结果:

    MuggleLee
    

    通过例子可以看出,使用FutureTask类可以接收线程完成后返回的结果。如果使用场景是需要接收线程执行的结果(无论是成功执行的结果还是异常返回的信息),实现Callable接口结合FutureTask实现类接收返回数据是比较常见的一种做法。更为常见的做法是通过使用线程池submit()方法接收返回的结果。

    线程池的实现类 ThreadPoolExecutor 提供了3个 submit() 方法支持获取线程返回的结果。

    Future<?> submit(Runnable task);
    <T> Future<T> submit(Runnable task, T result);
    <T> Future<T> submit(Callable<T> task);
    

    可以发现,返回值类型都是 Future 接口。那继续看下 Future 接口有哪些抽象方法。

    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    

    通过方法名很明显可以知道各个抽象方法的作用。

    实际上,submit() 方法将线程的执行结果封装成 FutureTask 对象返回。

        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    
        // 将 Callable 接口对象封装成 FutureTask 对象
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    

    接下来,看下创建 FutureTask 对象的源码。

        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;
        }
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;
        }
    

    Executor类部分源码:

        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    

    FutureTask 类的第一个构造方法将参数 Callable 对象赋值给 FutureTask 对象的 callable 属性,并设置 state 变量为 NEW(稍后再解释 callable 和 state 两个变量的作用);有意思的是第二个构造方法,将第一个参数 Runnable 对象传给 Executor 类的 callable() 方法,再调用已经实现了 Callable 接口的 RunnableAdapter 适配器类,执行 Runnable 对象的 run() 方法。(设计模式中的适配器模式,不熟悉的可以参考我另外一篇拙作:设计模式之适配器模式

    emmmm...上面几个方法的确比较绕,画了一张流程图辅助理解吧。


    接下来,重点剖析 FutureTask 类的源码:

    FutureTask 类声明了7种状态:

        /**
         * Possible state transitions:(FutureTask状态的转换)
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INTERRUPTED
         */
        // 任务当前 FutureTask 对象的状态
        private volatile int state;
        // 新建任务(初始状态)
        private static final int NEW = 0;
        // 任务运行中
        private static final int COMPLETING = 1;
        // 任务正常完成
        private static final int NORMAL = 2;
        // 任务异常
        private static final int EXCEPTIONAL = 3;
        // 任务被取消
        private static final int CANCELLED = 4;
        // 任务中断中
        private static final int INTERRUPTING = 5;
        // 任务被中断
        private static final int INTERRUPTED = 6;
    
        // 提交的任务
        private Callable<V> callable;
        // 任务运行的结果
        private Object outcome;
        // 执行任务的线程
        private volatile Thread runner;
        // 等待结果的队列(单链表)
        private volatile WaitNode waiters;
    

    除了记录 FutureTask 对象状态之外,还声明了 state、runner、waiters的内存偏移量 和等待节点 WaitNode:

        private static final sun.misc.Unsafe UNSAFE;
        // FutureTask 对象状态在内存中的偏移量
        private static final long stateOffset;
        // 执行任务对象在内存中的偏移量
        private static final long runnerOffset;
        // 等待链表在内存中的偏移量
        private static final long waitersOffset;
    
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = FutureTask.class;
                // 通过反射获取各对象在内存中的偏移量
                stateOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("state"));
                runnerOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("runner"));
                waitersOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("waiters"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    
        /**
         * 单链表。存储等待线程
         */
        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
    
            WaitNode() {
                thread = Thread.currentThread();
            }
        }
    

    当程序执行到线程池的execute(Runnable runnable)方法的时候,由于 execute() 方法接收的参数是 FutureTask 对象,所以肯定是执行 FutureTask 类的 run() 方法。

    FutureTask.run()方法剖析

        public void run() {
            // 如果当前 FutureTask 对象的状态不是 NEW 或者执行 CAS 操作赋值给 runnerOffset 失败直接跳出 run 方法
            if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // 设置 runner 为 null ,利于 GC
                runner = null;
                int s = state;
                // 如果有其它线程在中断任务,会调用 handlePossibleCancellationInterrupt 方法处理
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
        // 执行任务正常结束后调用此方法
        protected void set(V v) {
            // 执行 CAS 方法设置 FutureTask 对象的状态由 NEW -> COMPLETING -> NORMAL
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                // 将任务执行的结果赋值给 outcome 变量
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
                // 唤醒等待线程
                finishCompletion();
            }
        }
        // 执行任务异常会调用此方法
        protected void setException(Throwable t) {
            // 执行 CAS 方法设置 FutureTask 对象的状态由 NEW -> COMPLETING -> EXCEPTIONAL
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                // 将任务执行的结果赋值给 outcome 变量
                outcome = t;
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
                // 唤醒等待线程
                finishCompletion();
            }
        }
        /**
         * 唤醒等待线程
         */
        private void finishCompletion() {
            for (WaitNode q; (q = waiters) != null; ) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    // 自旋遍历单链表
                    for (; ; ) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            // 唤醒线程
                            LockSupport.unpark(t);
                        }
                        // 获取下一个节点,直到节点为null
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null;
                        q = next;
                    }
                    break;
                }
            }
            // 这是一个空方法,可以让开发中扩展使用
            done();
            callable = null;
        }
        private void handlePossibleCancellationInterrupt(int s) {
            // 双重判断,这里困扰我很久。为什么需要两次判断状态是否为 INTERRUPTING 呢?
            // 考虑的场景是:需要确保其它线程执行 cencel(true) 是在执行 run() 或者 runAndReset()的过程中
            if (s == INTERRUPTING)
                while (state == INTERRUPTING)
                    Thread.yield();// 通过自旋,优先让其它线程执行,等待 cancel(true) 执行完成
        }
    

    核心代码都加上了注释,结合源码,画了张流程图加深理解吧!

    FutureTask.get()方法剖析

        /**
         * 返回执行结果
         */
        public V get() throws InterruptedException, ExecutionException {
            // 获取当前 FutureTask 对象的状态
            int s = state;
            // 如果任务的状态是新建(NEW)或者运行中()就执行 awaitDone 方法等待获取
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            // 返回执行结果
            return report(s);
        }
    
        /**
         * 与上面 get() 相似,加上 timeout 设置过期时间,超时抛出异常
         *
         * @param timeout 过期时间
         * @param unit    时间单位
         * @return 返回执行结果
         */
        public V get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                    (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    
        @SuppressWarnings("unchecked")
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V) x;
            if (s >= CANCELLED)
                throw new CancellationException();// 被取消或者中断,直接抛出异常
            throw new ExecutionException((Throwable) x);// 运行过程中发生异常
        }
    
        private int awaitDone(boolean timed, long nanos)
                throws InterruptedException {
            // 如果设置超时,计算截止时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // 代表当前等待结果线程的等待节点
            WaitNode q = null;
            // 记录是否把当前线程加入到了队列
            boolean queued = false;
            for (; ; ) {
                if (Thread.interrupted()) {
                    removeWaiter(q);// 如果被中断了,删除当前线程节点,并抛出异常
                    throw new InterruptedException();
                }
                int s = state;
                // 如果 state 状态大于 COMPLETING 则说明任务已经执行,直接返回状态值
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                } else if (s == COMPLETING) // 如果 state 状态等于 COMPLETING,说明正在设置结果,则放弃时间片轮询等待
                    Thread.yield();
                else if (q == null) // 任务状态为 NEW ,构造等待节点
                    q = new WaitNode();
                else if (!queued) // 状态为 NEW ,并且节点不为 null ,并且该节点没有加入到 waiter 队列中
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                            q.next = waiters, q);
                else if (timed) { // 如果设定超时,进行超时判断
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                // 阻塞到超时时间
                    LockSupport.parkNanos(this, nanos);
                } else // 如果没有设置超时,会一直阻塞,直到被中断或者被唤醒
                    LockSupport.park(this);
            }
        }    
        // 通过自旋链表删除指定节点
        private void removeWaiter(WaitNode node) {
            if (node != null) {
                // 设置节点为空,通过自旋找出空节点并删除
                node.thread = null;
                // 自旋保证删除成功
                retry:
                for (; ; ) {
                    for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                        s = q.next;
                        if (q.thread != null)// 如果当前节点不是空,不需要删除
                            pred = q;
                        else if (pred != null) {
                            pred.next = s;
                            if (pred.thread == null)
                                continue retry;
                        } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                q, s))
                            continue retry;
                    }
                    break;
                }
            }
        }
    

    结合源码,画出了以下流程图加深理解:


    FutureTask.cancel()方法剖析

        /**
         * 任务取消
         * @param mayInterruptIfRunning true:代表中断线程 false:代表取消线程
         * @return
         */
        public boolean cancel(boolean mayInterruptIfRunning) {
            // 如果任务不在 NEW 状态或者执行 UNSAFE 操作失败直接返回 false
            if (!(state == NEW &&
                    UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                            mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally {
                        // 设置最终的状态为中断状态
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                // 释放等待线程
                finishCompletion();
            }
            return true;
        }
    

    cancel 方法参数为 mayInterruptIfRunning,当参数值为 false 的时候,直接将 FutureTask 对象的状态设置为 CANCELLED,并释放等待线程;当参数值为 ture 的时候,调用 interrupt() 方法中断线程,并设置最终状态为中断状态。

    runAndReset()方法剖析

    值得一提的是, FutureTask 类有个方法 runAndReset() 可以让线程重复执行。譬如使用 ScheduledThreadPoolExecutor 线程池,这个类可以使线程周期性的重复执行,具体的可以自行查看相关源码。

        /**
         * 与 run() 方法类似,但该方法可以执行多次。
         * 不同点:1.不设置返回值 2.不设置 state 值(执行完任务后,FutureTask 对象状态还是 NEW )
         */
        protected boolean runAndReset() {
            if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                return false;
            boolean ran = false;
            int s = state;
            try {
                Callable<V> c = callable;
                if (c != null && s == NEW) {
                    try {
                        // 不设置返回值
                        c.call();
                        ran = true;
                    } catch (Throwable ex) {    
                        setException(ex);
                    }
                }
            } finally {
                runner = null;
                s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
            // 任务执行成功,且状态重置为NEW
            return ran && s == NEW;
        }
    

    isCancelled()、isDone()方法源码:

        // 根据状态码判断任务是否被取消
        public boolean isCancelled() {
            return state >= CANCELLED;
        }
    
        // 根据状态码判断任务是否执行完成
        public boolean isDone() {
            return state != NEW;
        }
    

    总结

    使用 FutureTask 类结合使用线程池,可以通过多线程异步计算任务,最后所有子线程执行完成之后再继续执行主线程。

    参考资料:

    《Java多线程编程实战指南-核心篇》

    http://www.tianshouzhi.com/api/tutorials/mutithread/317

    相关文章

      网友评论

        本文标题:Java 多线程之 FutureTask 源码剖析

        本文链接:https://www.haomeiwen.com/subject/bmermctx.html