美文网首页
分析Java线程池Callable任务执行原理

分析Java线程池Callable任务执行原理

作者: lxqfirst | 来源:发表于2017-12-12 19:14 被阅读0次

    【转自】http://www.jianshu.com/p/f624934b9a23

    线程池的执行原理,主要关于线程池的生命周期和任务如何在池里创建、运行和终止。不过上次研究的是execute方法,执行的是Runnable任务,它不返回任何值。如果希望任务完成后返回结果,那么需要使用Callable接口,这也是本文要研究的主题。

    ExecutorService es = Executors.newSingleThreadExecutor();
    Future<?> task = es.submit(new MyThread());
    try {
        //限定时间获取结果
        task.get(5, TimeUnit.SECONDS);
    } catch (TimeoutException e) {
        //超时触发线程中止
        System.out.println("thread over time");
    } catch (ExecutionException e) {
       //抛出执行异常
        throw e;
    } finally {
       //如果任务还在运行,执行中断
        boolean mayInterruptIfRunning = true;
        task.cancel(mayInterruptIfRunning);
    }
    
    

    上面代码是Future的一个简单例子:MyThread实现Callable接口,执行时要求在限定时间内获取结果,超时执行会抛出TimeoutException,执行异常会抛出ExecutionException。最后在finally里,如果任务还在执行,就进行取消;如果任务已经执行完,取消操作也没有影响。

    image.png

    Future接口代表一个异步任务的结果,提供了相应方法判断任务是否完成或者取消。从图1可知,RunnableFuture同时继承了Future和Runnable,是一个可运行、可知结果的任务,FutureTask是具体的实现类。

    FutureTask的状态

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    

    FutureTask有7种状态,初始状态从NEW开始,状态转换路径可以归纳为图2所示。在后文的代码,会使用int的大小比较判断状态处于哪个范围,需要留意上面状态的排列顺序。

    [图片上传失败...(image-f7d0b4-1513077081753)]

    FutureTask的状态路径,取决于run和cancel的调用顺序,在后文分析时,对号入座这几条路径。

    1. NEW -> COMPLETING -> NORMAL 正常的流程

    2. NEW -> COMPLETING -> EXCEPTIONAL 异常的流程

    3. NEW -> CANCELLED 被取消流程

    4. NEW -> INTERRUPTING -> INTERRUPTED 被中断流程

    FutureTask的变量

    • int state

    • Thread runner

    • WaitNode waiters

    • Callable<V> callable

    • Object outcome

    state、runner、waiters三个变量没有使用原子类,而是使用Unsafe对象进行原子操作。代码中会见到很多形如compareAndSwap的方法,入门原理可以看我以前写的认识非阻塞的同步机制CAS

    callable是要执行的任务,runner是执行任务的线程,outcome是返回的结果(正常结果或Exception结果)

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
    
    

    waiters的数据结构是WaitNode,保存了Thread和下个WaitNode的引用。waiters保存了等待结果的线程,每次操作只会增减头,所以是一个栈结构,详细见后文对get方法的分析。

    FutureTask的创建

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    
    

    FutureTask可以接受Callable或者Runnable,state从NEW开始。如果是Runnable,需要调用Executors.callable转成Callable,返回的结果是预先传入的result。转换过程使用一个实现了Callable的RunnableAdapter包装Runnable和result,代码比较简单。

     static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
    
    

    提交FutureTask到线程池的submit定义在AbstractExecutorService,根据入参的不同,有三个submit方法。下面以提交Callable为例:

    public <T> Future<T> submit(Callable<T> task) {
       if (task == null) throw new NullPointerException();
       RunnableFuture<T> ftask = newTaskFor(task);
       execute(ftask);
       return ftask;
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {    
       return new FutureTask<T>(callable);
    }
    
    

    FutureTask在newTaskFor创建,然后调用线程池的execute执行,最后返回Future。获取Future后,就可以调用get获取结果,或者调用cancel取消任务。

    FutureTask的运行

    FutureTask实现了Runnable,在线程池里执行时调用的方法是run。

    public void run() {
        //1
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
            return;
        //2
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
           //3
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    

    标记1处检查FutureTask的状态,如果不是处于NEW,说明状态已经进入四条路径之一,也就没有必要继续了。如果状态是NEW,则将执行任务的线程交给runner。

    标记2处开始正式执行任务,调用call方法获取结果,没有异常就算成功,最后执行set方法;出现异常就调用setException方法。

    标记3处,无论任务执行是否成功,都需要将runner重新置为空。

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    
    

    任务执行成功与失败,分别对应NEW -> COMPLETING -> NORMAL和NEW -> COMPLETING -> EXCEPTIONAL两条路径。这里先将状态修改为中间状态,再对结果赋值,最后再修改为最终状态。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }
    
    

    最后调用finishCompletion执行任务完成,唤醒并删除所有在waiters中等待的线程。done方法是空的,供子类实现,最后callable也设置为空。


    FutureTask还有个runAndReset,逻辑和run类似,但没有调用set方法来设置结果,执行完成后将任务重新初始化。

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }
    
    

    FutureTask的取消

    对于已经提交执行的任务,可以调用cancel执行取消。

    public boolean cancel(boolean mayInterruptIfRunning) {
       //1
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
           //2
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    
    

    标记1处判断任务状态,为NEW才能被取消。如果mayInterruptIfRunning是true,代表任务需要被中断,走NEW -> INTERRUPTING -> INTERRUPTED流程。否则代表任务被取消,走NEW -> CANCELLED流程。

    标记2处理任务被中断的情况,这里仅仅是对线程发出中断请求,不确保任务能检测并处理中断,详细原理去看Java的中断机制。

    最后调用finishCompletion完成收尾工作。

    public boolean isCancelled() {
        return state >= CANCELLED;
    }
    
    

    判断任务是否被取消,具体逻辑是判断state >= CANCELLED,包括了被中断一共两条路径的结果。

    FutureTask获取结果

    调用FutureTask的get方法获取任务的执行结果,可以阻塞直到获取结果,也可以限制范围时间内获取结果,否则抛出TimeoutException。

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    
    

    get的核心实现调用了awaitDone,入参为是否开启时间限制和最大的等待时间。

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            if (s > COMPLETING) {    //1
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet    //2
                Thread.yield();
            else if (q == null)     //3
                q = new WaitNode();
            else if (!queued)    //4
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {    //5
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else     //6
                LockSupport.park(this);
        }
    }
    
    

    awaitDone主要逻辑是一个无限循环,首先判断线程是否被中断,是的话移除waiter并抛出中断异常。接下来是一串if-else,一共六种情况。

    1. 判断任务状态是否已经完成,是就直接返回;

    2. 任务状态是COMPLETING,代表在set结果时被阻塞了,这里先让出资源;

    3. 如果WaitNode为空,就为当前线程初始化一个WaitNode;

    4. 如果当前的WaitNode还没有加入waiters,就加入;

    5. 如果是限定时间执行,判断有无超时,超时就将waiter移出,并返回结果,否则阻塞一定时间;

    6. 如果没有限定时间,就一直阻塞到下次被唤醒。

    LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。park和unpark的作用分别是阻塞线程和解除阻塞线程。

    private V report(int s) throws ExecutionException {
       Object x = outcome;
       if (s == NORMAL)
           return (V)x;
       if (s >= CANCELLED)
           throw new CancellationException();
       throw new ExecutionException((Throwable)x);
    }
    
    

    最后get调用report,使用outcome返回结果。

    image.png

    看图3,如果多个线程向同一个FutureTask实例get结果,但FutureTask又没有执行完毕,线程将会阻塞并保存在waiters中。待FutureTask获取结果后,唤醒waiters等待的线程,并返回同一个结果。

    总结

    image.png

    图4归纳了FutureTask的作用,任务的调用线程Caller和线程池的工作线程通过FutureTask交互。对比线程池的执行原理,FutureTask是比较简单的。

    相关文章

      网友评论

          本文标题:分析Java线程池Callable任务执行原理

          本文链接:https://www.haomeiwen.com/subject/vqnnixtx.html