美文网首页
你一定会需要的FutureTask在线程池中应用和源码解析

你一定会需要的FutureTask在线程池中应用和源码解析

作者: java菜 | 来源:发表于2018-11-23 15:02 被阅读4次

    FutureTask 是一个支持取消的异步处理器,一般在线程池中用于异步接受callable返回值。

    主要实现分三部分:

    封装 Callable,然后放到线程池中去异步执行->run。

    获取结果-> get。

    取消任务-> cancel。

    接下来主要学习下该模型如何实现。

    举例说明FutureTask在线程池中的应用

    // 第一步,定义线程池,

    ExecutorService executor = new ThreadPoolExecutor(

            minPoolSize,

            maxPollSize,

            keepAliveTime,

            TimeUnit.SECONDS,

            new SynchronousQueue<>());

    // 第二步,放到线程池中执行,返回FutureTask

    FutureTask  task = executor.submit(callable);

    // 第三步,获取返回值

    T data = task.get();

    学习FutureTask实现

    类属性

    //以下是FutureTask的各种状态

    private volatile int state;

    private static final int NEW          = 0;

    private static final int COMPLETING   = 1;

    private static final int NORMAL       = 2;

    private static final int EXCEPTIONAL  = 3;

    private static final int CANCELLED    = 4;

    private static final int INTERRUPTING = 5;

    private static final int INTERRUPTED  = 6;

    private Callable<V> callable; //执行的任务

    private Object outcome; //存储结果或者异常

    private volatile Thread runner;//执行callable的线程

    private volatile WaitNode waiters; //调用get方法等待获取结果的线程栈

    其中各种状态存在 最终状态 status>COMPLETING

    1)NEW -> COMPLETING -> NORMAL(有正常结果)

    2) NEW -> COMPLETING -> EXCEPTIONAL(结果为异常)

    3) NEW -> CANCELLED(无结果)

    4) NEW -> INTERRUPTING -> INTERRUPTED(无结果)

    类方法

    从上面举例说明开始分析。

    run()方法

    FutureTask 继承 Runnable,ExecutorService submit 把提交的任务封装成 FutureTask 然后放到线程池 ThreadPoolExecutor 的 execute 执行。

    public void run() {

        //如果不是初始状态或者cas设置运行线程是当前线程不成功,直接返回

        if (state != NEW ||

            !UNSAFE.compareAndSwapObject(this, runnerOffset,

                                         null, Thread.currentThread()))

            return;

        try {

            Callable<V> c = callable;

            if (c != null && state == NEW) {

                V result;

                boolean ran;

                try {

                  // 执行callable任务 这里对异常进行了catch

                    result = c.call();

                    ran = true;

                } catch (Throwable ex) {

                    result = null;

                    ran = false;

                    setException(ex); // 封装异常到outcome

                }

                if (ran)

                    set(result);

            }

        } finally {

            runner = null;

            int s = state;

            // 这里如果是中断中,设置成最终状态

            if (s >= INTERRUPTING)

                handlePossibleCancellationInterrupt(s);

        }

    }

    以上是 run 方法源码实现很简单,解析如下:

    如果不是始状态或者 cas 设置运行线程是当前线程不成功,直接返回,防止多个线程重复执行。

    执行 Callable 的 call(),即提交执行任务(这里做了catch,会捕获执行任务的异常封装到 outcome 中)。

    如果成功执行 set 方法,封装结果。

    set 方法

    protected void set(V v) {

        //cas方式设置成completing状态,防止多个线程同时处理

        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

            outcome = v; // 封装结果

            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终设置成normal状态

            finishCompletion();

        }

    }

    解析如下:

    cas方式设置成completing状态,防止多个线程同时处理

    封装结果到outcome,然后设置到最终状态normal

    执行finishCompletion方法。

    finishCompletion方法

    // state > COMPLETING; 不管异常,中断,还是执行完成,都需要执行该方法来唤醒调用get方法阻塞的线程

    private void finishCompletion() {

        // assert state > COMPLETING;

        for (WaitNode q; (q = waiters) != null;) {

            // cas 设置waiters为null,防止多个线程执行。

            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

                // 循环唤醒所有等待结果的线程

                for (;;) {

                    Thread t = q.thread;

                    if (t != null) {

                        q.thread = null;

                        //唤醒线程

                        LockSupport.unpark(t);

                    }

                    WaitNode next = q.next;

                    if (next == null)

                        break;

                    q.next = null; // unlink to help gc

                    q = next;

                }

                break;

            }

        }

       //该方法为空,可以被重写

        done();

        callable = null;        // to reduce footprint

    }

    解析如下:

    遍历waiters中的等待节点,并通过 LockSupport 唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能 cancel,异常等)。

    以上就是执行的过程,接下来分析获取结果的过程->get。

    get 方法

    public V get() throws InterruptedException, ExecutionException {

        int s = state;

        if (s <= COMPLETING)

            s = awaitDone(false, 0L);

        return report(s);

    }

    public V get(long timeout, TimeUnit unit)

            throws InterruptedException, ExecutionException, TimeoutException {

            if (unit == null)

                throw new NullPointerException();

            int s = state;

            if (s <= COMPLETING &&

                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

                throw new TimeoutException();

            return report(s);

        }

    解析如下:

    以上两个方法,原理一样,其中一个设置超时时间,支持最多阻塞多长时间。

    状态如果小于 COMPLETING,说明还没到最终状态,(不管是否是成功、异常、取消)。

    调用 awaitDone 方法阻塞线程,最终调用 report 方法返回结果。

    awaitDone 方法

    private int awaitDone(boolean timed, long nanos)

            throws InterruptedException {

            final long deadline = timed ? System.nanoTime() + nanos : 0L;

            WaitNode q = null;

            boolean queued = false;

            for (;;) {

                //线程可中断,如果当前阻塞获取结果线程执行interrupt()方法,则从队列中移除该节点,并抛出中断异常

                if (Thread.interrupted()) {

                    removeWaiter(q);

                    throw new InterruptedException();

                }

                int s = state;

                // 如果已经是最终状态,退出返回

                if (s > COMPLETING) {

                    if (q != null)

                        q.thread = null;

                    return s;

                }

                //这里做了个优化,competiting到最终状态时间很短,通过yield比挂起响应更快。

                else if (s == COMPLETING) // cannot time out yet

                    Thread.yield();

                // 初始化该阻塞节点

                else if (q == null)

                    q = new WaitNode();

                // cas方式写到阻塞waiters栈中

                else if (!queued)

                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

                                                         q.next = waiters, q);

                // 这里做阻塞时间处理。

                else if (timed) {

                    nanos = deadline - System.nanoTime();

                    if (nanos <= 0L) {

                        removeWaiter(q);

                        return state;

                    }

                    // 阻塞线程,有超时时间

                    LockSupport.parkNanos(this, nanos);

                }

                else

                    // 阻塞线程

                    LockSupport.park(this);

            }

        }

    解析如下:

    整体流程已写到注解中,整体实现是放在一个死循环中,唯一出口,是达到最终状态。

    然后是构建节点元素,并将该节点入栈,同时阻塞当前线程等待运行主任务的线程唤醒该节点。

    report 方法

    private V report(int s) throws ExecutionException {

        Object x = outcome;

        if (s == NORMAL)

            return (V)x;

        if (s >= CANCELLED)

            throw new CancellationException();

        throw new ExecutionException((Throwable)x);

    }

    然后是report方法,如果是正常结束,返回结果,如果不是正常结束(取消,中断)抛出异常。

    最后分析下取消流程。

    cancel 方法

    public boolean cancel(boolean mayInterruptIfRunning) {

        if (!(state == NEW &&

              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

            return false;

        try {    // in case call to interrupt throws exception

            if (mayInterruptIfRunning) {

                try {

                    Thread t = runner;

                    if (t != null)

                        t.interrupt();

                } finally { // final state

                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);

                }

            }

        } finally {

            finishCompletion();

        }

        return true;

    }

    解析如下:

    mayInterruptIfRunning参数是是否允许运行中被中断取消。

    根据入参是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则直接返回false。

    如果允许运行中被中断取消,调用runner.interupt()进行中断取消,设置状态为INTERRUPTED。

    唤醒所有在get()方法等待的线程。

    此处有两种状态转换:

    如果mayInterruptIfRunning为true:status状态转换为 new -> INTERRUPTING->INTERRUPTED。主动去中断执行线程,然后唤醒所有等待结果的线程。

    如果mayInterruptIfRunning为false:status状态转换为 new -> CANCELLED。

    不会去中断执行线程,直接唤醒所有等待结果的线程,从 awaitDone 方法中可以看到,唤醒等待线程后,直接从跳转回 get 方法,然后把结果返回给获取结果的线程,当然此时的结果是 null。

    总结

    以上就是 FutureTask 的源码简单解析,实现比较简单,FutureTask 就是一个实现 Future 模式,支持取消的异步处理器。

    欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 854393687

    群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

    相关文章

      网友评论

          本文标题:你一定会需要的FutureTask在线程池中应用和源码解析

          本文链接:https://www.haomeiwen.com/subject/mxujqqtx.html