美文网首页Springboot
FutureTask在线程池中应用和源码解析

FutureTask在线程池中应用和源码解析

作者: hcy0411 | 来源:发表于2018-07-26 15:47 被阅读1274次
    image

    FutureTask是一个支持取消的异步处理器,一般在线程池中用于异步接受callable返回值。
    主要实现分三部分:
    1、封装callable,然后放到线程池中去异步执行->run。
    2、获取结果->get。
    3、取消任务->cancel。
    接下来主要学习下该模型如何实现

    举例说明FutureTask在线程池中的应用

    // 第一步,定义线程池,
    ExecutorService executor = new ThreadPoolExecutor(
            minPoolSize,
            maxPollSize,
            keepAliveTime,
            TimeUnit.SECONDS,
            new SynchronousQueue<>());
    // 第二步,放到线程池中执行,返回FutureTask
    FutureTask  task = executor.submit(callable);
    // 第三步,获取返回值
    T data = task.get();
    

    学习下FutureTask实现

    类属性

    //以下是FutureTask的各种状态
    private volatile int state; 
    private static final int NEW          = 0; 
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    private Callable<V> callable; //执行的任务
    private Object outcome; //存储结果或者异常
    private volatile Thread runner;//执行callable的线程
    private volatile WaitNode waiters; //调用get方法等待获取结果的线程栈
    
    其中各种状态存在 最终状态 status>COMPLETING
    1)NEW -> COMPLETING -> NORMAL(有正常结果)
    2) NEW -> COMPLETING -> EXCEPTIONAL(结果为异常) 
    3) NEW -> CANCELLED(无结果) 
    4) NEW -> INTERRUPTING -> INTERRUPTED(无结果)
    

    类方法

    从上面举例说明开始分析

    run()方法

    FutureTask继承runnable,ExecutorService submit把提交的任务封装成FutureTask然后放到线程池ThreadPoolExecutor的execute执行。

    public void run() {
        //如果不是初始状态或者cas设置运行线程是当前线程不成功,直接返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                  // 执行callable任务 这里对异常进行了catch
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); // 封装异常到outcome
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            // 这里如果是中断中,设置成最终状态
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    

    以上是run方法源码实现很简单,解析如下:
    1、如果不是始状态或者cas设置运行线程是当前线程不成功,直接返回,防止多个线程重复执行。
    2、执行callable的call(),即提交执行任务(这里做了catch,会捕获执行任务的异常封装到outcome中)
    3、如果成功执行set方法,封装结果。

    set方法

    protected void set(V v) {
        //cas方式设置成completing状态,防止多个线程同时处理
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v; // 封装结果
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终设置成normal状态
            
            finishCompletion();
        }
    }
    

    解析如下:
    1、cas方式设置成completing状态,防止多个线程同时处理
    2、封装结果到outcome,然后设置到最终状态normal
    3、执行finishCompletion方法。

    finishCompletion方法

    // state > COMPLETING; 不管异常,中断,还是执行完成,都需要执行该方法来唤醒调用get方法阻塞的线程
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            // cas 设置waiters为null,防止多个线程执行。
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 循环唤醒所有等待结果的线程
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //唤醒线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
       //该方法为空,可以被重写
        done();
        callable = null;        // to reduce footprint
    }
    

    解析如下:
    遍历waiters中的等待节点,并通过 LockSupport唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能cancel,异常等)

    以上就是执行的过程,接下来分析获取结果的过程->get

    get方法

        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    

    解析如下:
    以上两个方法,原理一样,其中一个设置超时时间,支持最多阻塞多长时间。
    状态如果小于COMPLETING,说明还没到最终状态,(不管是否是成功,还是异常,还是取消)
    调用awaitDone方法阻塞线程,最终调用report方法返回结果。

    awaitDone方法

        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                //线程可中断,如果当前阻塞获取结果线程执行interrupt()方法,则从队列中移除该节点,并抛出中断异常
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                int s = state;
                // 如果已经是最终状态,退出返回
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                //这里做了个优化,competiting到最终状态时间很短,通过yield比挂起响应更快。
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                // 初始化该阻塞节点
                else if (q == null)
                    q = new WaitNode();
                // cas方式写到阻塞waiters栈中
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                // 这里做阻塞时间处理。
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    // 阻塞线程,有超时时间
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    // 阻塞线程
                    LockSupport.park(this);
            }
        }
    

    解析如下:
    整体流程已写到注解中,整体实现是放在一个死循环中,唯一出口,是达到最终状态。
    然后是构建节点元素,并将该节点入栈,同时阻塞当前线程等待运行主任务的线程唤醒该节点。

    report方法

        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    

    然后是report方法,如果是正常结束,返回结果,如果不是正常结束,(取消,中断)抛出异常。

    最后分析下取消

    cancel方法

        public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW &&
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();
            }
            return true;
        }
    

    解析如下:
    mayInterruptIfRunning参数是是否允许运行中被中断取消。
    1、根据入参是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则直接返回false。
    2、如果允许运行中被中断取消,调用runner.interupt()进行中断取消,设置状态为INTERRUPTED
    唤醒所有在get()方法等待的线程
    此处有两种状态转换
    1)如果mayInterruptIfRunning为true
    status状态转换为 new -> INTERRUPTING->INTERRUPTED
    主动去中断执行线程,然后唤醒所有等待结果的线程
    2)如果mayInterruptIfRunning为false
    status状态转换为 new -> CANCELLED。
    不会去中断执行线程,直接唤醒所有等待结果的线程,从awaitDone方法中可以看到,唤醒等待线程后,直接从跳转回get方法,然后把结果返回给获取结果的线程,当然此时的结果是null。

    总结,以上就是FutureTask的源码简单解析,实现比较简单,FutureTask就是一个实现Future模式,支持取消的异步处理器。

    相关文章

      网友评论

        本文标题:FutureTask在线程池中应用和源码解析

        本文链接:https://www.haomeiwen.com/subject/wijmmftx.html