美文网首页JUC并发相关
24. 并发终结之FutureTask

24. 并发终结之FutureTask

作者: 涣涣虚心0215 | 来源:发表于2020-09-24 00:07 被阅读0次
    关于Thread和Runnable的区别:

    1)从对象编程来说:Thread是继承;Runnable是组合,会比继承耦合性低,更加灵活。
    2)从对象共享角度:Runnable实例可以由过个线程实例共享,会产生并发问题。
    3)对象创建成本:Thread在创建的时候JVM就会为其分配调用栈空间,内核线程等资源;而Runnable是普通的类,作为参数传给Thread,所以Runnable创建成本相对较低。

    关于用户线程(User)和守护线程(Daemon):

    用户线程会阻止JVM正常停止;
    守护线程不会影响JVM正常停止,所以守护线程通常用于执行一些重要性不高的任务。

    关于Callable和Runnable的区别:

    Callable方式需要FutureTask实现类的支持,用于接收运算结果。
    FutureTask是Future接口的实现类,且FutureTask也可用于闭锁的操作,因为get() 会阻塞当前线程直到Callable返回结果。

    FutureTask

    FutureTask提供了支持cancel的异步计算方式,它实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable和Future接口。
    FutureTask可以用来包装一个Callable和Runnable对象,且因为实现了Runnable,所以FutureTask可以被提交到线程池处理。

    源码分析

    因为FutureTask实现了Future接口,所以它会实现Future接口的相关方法,比如说get(), cancel()等等。

    成员变量以及构造函数

    有三个volatile成员变量,会通过UNSAFE类来进行CAS操作。
    另外定义了state变量对应的7中状态。
    callable变量意味着FutureTask会将Runnable也封装成Callable来处理。
    outcome则是返回这或者异常的信息

    private volatile int state;
    private static final int NEW          = 0;//初始是NEW
    private static final int COMPLETING   = 1;//在set()和setException()里面先CAS更新成COMPLETING,加锁操作
    private static final int NORMAL       = 2;//set()方法执行成功从COMPLETING到NORMAL
    private static final int EXCEPTIONAL  = 3;//setException()方法执行成功从COMPLETING到EXCEPTIONAL
    private static final int CANCELLED    = 4;//cancel(false)的时候
    private static final int INTERRUPTING = 5;//cancel(true)的时候
    private static final int INTERRUPTED  = 6;//cancel(true)最终状态
    
    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    构造函数

    FutureTask内部有Callable类型的成员变量,所以Runnable会通过一个适配器RunnableAdapter,转换成Callable,内部还是调用的Runnable的run()方法。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    //接受Runnable的构造函数
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    //适配器将Runnable转换成Callable
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    //Adapter类,Callable的call方法调用Runnable的run方法。
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
    
    run()

    run()方法主要是调用Callable的call方法,如果有异常则通过setException(Throwable t)设置异常;如果没有异常,则通过set(V result)方法,来设置返回值。且这两个set方法最终都会调用finishCompletion()来unpark唤醒WaitNode节点里的等待线程去获得结果。
    run()方法涉及到的方法有setException(), set(), finishCompletion(), handlePossibleCancellationInterrupt()。
    涉及到的state有COMPLETING, EXCEPTIONAL, NORMAL
    且在call()方法返回之前,都是NEW状态,这样cancel()才会有机会。
    且run方法最好检查INTERRUPTING状态,会在handlePossibleCancellationInterrupt自旋直到cancel()方法结束,state变成INTERRUPTED状态(下面代码里分析)。

    public void run() {
        //如果state不是NEW或者CAS更新runner成员变量失败,则直接return。
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //直接调用Callable的call方法并拿到result
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    //如果call()方法抛出异常,ran=false
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    //如果抛出异常,则调用该方法,更新state状态到COMPLETING,
    //将Throwable赋值给outcome变量,再更新state到EXCEPTIONAL.
    //最后finishCompletion()唤醒WaitNode中等待线程节点
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            //因为上面更新到COMPLETING成功,这边更新EXCEPTIONAL则不需要CAS操作,而是putOrderedInt内存操作
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            //先通过CAS获得更新资格,将waiter变量更新为null
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        //拿到q对应的线程,更新为null,并唤醒该线程
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    //然后操作q的后继节点
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        //done()方法由子类实现
        done();
    
        callable = null;        // to reduce footprint
    }
    //如果没有异常,更新state到COMPLETING先,然后将result设置到outcome,更新state到NORMAL
    //最后finishCompletion唤醒等待线程
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    //final的时候会检查一下state是不是被中断,如果一直在中断过程中,则当前线程yield让出CPU
    //这边自旋等待调用中断的线程执行完毕,因为此时run方法已经结束,runner也被重置为null
    //   所以别的线程可能有机会提交这个FutureTask,而执行run方法,这时候cancel可能被应用到不同的Task上。
    //   所以这里要自旋直到cancel()方法执行结束
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
    }
    
    cancel()

    cancel的时候有几种情况:
    1.任务还没开始,直接返回false
    2.任务已经开始:
    2.1调用cancel(false),就是最后调用finishCompletion()来唤醒等待线程
    2.2调用cancel(true),则通过runner获得当前运行线程,调用运行线程的Interrupt()方法来设置目标线程的中断标记位为true
    这里涉及到的state状态有INTERRUPTING, CANCELLED, INTERRUPTED

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              //如果参数是true,则更新为INTERRUPTING状态,否则CANCELLED状态
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();//通过设置中断标志位来控制目标线程的生命周期
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    
    get()

    如果state的状态还没到COMPLETING,就在awaitDone()里面通过LockSupport.park()挂起当前线程,直到run()方法执行结束唤醒等待线程。
    异常:
    1.get()方法响应异常中断,会抛出InterruptedException。
    2.且如果是get(long timeout, TimeUnit unit)方法,还会抛出TimeoutException。
    3.另外如果Callable的call方法没有捕捉异常而抛出异常,则get()方法会抛出ExecutionException。FutureTask的run()方法在执行Callable的call()方法时,会将call抛出的异常捕获包装成ExecutionException,这意味着call()方法出现异常,不会直接导致执行线程运行结束(相比正常的Thread的run()方法抛出异常可能导致执行线程提前终止生命周期)。

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            //如果state还没到COMPLETING,则开始挂起当前线程
            s = awaitDone(false, 0L);
        //run()方法结束之后会唤醒当前线程,去拿到执行结果。
        return report(s);
    }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //判断当前线程是否被中断
            if (Thread.interrupted()) {
                //如果被中断,则需要把waitNode的节点都垃圾回收掉,抛出InterruptedException
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            //如果已经结束,则将waitNode的thread设置为null,并返回state值
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //COMPLETING表示set开始,快结束了,则让执行线程让出CPU等待一下
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            //这里s就还没到COMPLETING,如果waitNode是null,则创建WaitNode
            else if (q == null)
                q = new WaitNode();
            //如果q不为null,则需要添加到WaitNode的next后继节点
            else if (!queued)
                //这个UNSAFE操作,先更新q的next=waiter,再将qCAS更新到waiters变量
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            //如果是有等待时间的,则在等待时间过后,removeWaiter()来处理等待的node
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
    //处理最终结果
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            //如果state是NORMAL,则直接返回outcome值
            return (V)x;
        if (s >= CANCELLED)
            //则就是cancel()使得线程被Interrupt了或者cancel了,抛出CancellationException
            throw new CancellationException();
        //否则就是抛出异常了
        throw new ExecutionException((Throwable)x);
    }
    

    因为是无线循环,所以里面的q = new WaitNode()和UNSAFE更新操作在park之前都会做一边,即park之前会将当前线程封装到WaitNode链表里。

    相关文章

      网友评论

        本文标题:24. 并发终结之FutureTask

        本文链接:https://www.haomeiwen.com/subject/guowhktx.html