美文网首页java多线程
多线程-源码解析RunnableFuture

多线程-源码解析RunnableFuture

作者: 余生爱静 | 来源:发表于2021-05-13 12:03 被阅读0次
    /**
     * A {@link Future} that is {@link Runnable}. Successful execution of
     * the {@code run} method causes completion of the {@code Future}
     * and allows access to its results.
     * @see FutureTask
     * @see Executor
     * @since 1.6
     * @author Doug Lea
     * @param <V> The result type returned by this Future's {@code get} method
     */
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    
    
    A {@link Future} that is {@link Runnable}. Successful execution of
     * the {@code run} method causes completion of the {@code Future}
     * and allows access to its results.
    

    译文:可运行的Future。 成功执行run方法会导致Future的完成,并允许访问其结果。

    从源码可知,RunnableFuture继承了Future和Runnable,所以其就具有了可以运行在线程中,能够取消并且能够异步获取到运行结果。接下来学习一下它的一个具体实现类FutureTask

    概论

    /**
     * A cancellable asynchronous computation.  This class provides a base
     * implementation of {@link Future}, with methods to start and cancel
     * a computation, query to see if the computation is complete, and
     * retrieve the result of the computation.  The result can only be
     * retrieved when the computation has completed; the {@code get}
     * methods will block if the computation has not yet completed.  Once
     * the computation has completed, the computation cannot be restarted
     * or cancelled (unless the computation is invoked using
     * {@link #runAndReset}).
     *
     * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
     * {@link Runnable} object.  Because {@code FutureTask} implements
     * {@code Runnable}, a {@code FutureTask} can be submitted to an
     * {@link Executor} for execution.
     *
     * <p>In addition to serving as a standalone class, this class provides
     * {@code protected} functionality that may be useful when creating
     * customized task classes.
     *
     * @since 1.5
     * @author Doug Lea
     * @param <V> The result type returned by this FutureTask's {@code get} methods
     */
    public class FutureTask<V> implements RunnableFuture<V>
    
    * A cancellable asynchronous computation.  This class provides a base
     * implementation of {@link Future}, with methods to start and cancel
     * a computation, query to see if the computation is complete, and
     * retrieve the result of the computation.  The result can only be
     * retrieved when the computation has completed; the {@code get}
     * methods will block if the computation has not yet completed.  Once
     * the computation has completed, the computation cannot be restarted
     * or cancelled (unless the computation is invoked using
     * {@link #runAndReset}).
    

    译文:可取消的异步计算。 此类提供Future的基本实现,其中包含启动和取消计算,查询计算是否完成以及返回计算结果的方法。 只有在计算完成后才能返回结果; 如果计算尚未完成,则get方法将阻塞。 一旦计算完成,除非使用runAndReset调用计算,否则无法重新启动或取消计算。

    FutureTask主要成员

    public class FutureTask<V> implements RunnableFuture<V> {
         /*
         * FutureTask中定义了一个state变量,用于记录任务执行的相关状态 ,状态的变化过程如下
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INTERRUPTED
         */
        private volatile int state;
        //主流程状态
        private static final int NEW = 0; //当FutureTask实例刚刚创建到callbale的call方法执行完成前,处于此状态
        private static final int COMPLETING  = 1; //callable的call方法执行完成或出现异常时,首先进行此状态
        private static final int NORMAL    = 2;//callable的call方法正常结束时,进入此状态,将outcom设置为正常结果
        private static final int EXCEPTIONAL = 3;//callable的call方法异常结束时,进入此状态,将outcome设置为抛出的异常
        //取消任务执行时可能处于的状态
        private static final int CANCELLED= 4;// FutureTask任务尚未执行,即还在任务队列的时候,调用了cancel方法,进入此状态
        private static final int INTERRUPTING = 5;// FutureTask的run方法已经在执行,收到中断信号,进入此状态
        private static final int INTERRUPTED  = 6;// 任务成功中断后,进入此状态
        
        private Callable<V> callable;//需要执行的任务,提示:如果提交的是Runnable对象,会先转换为Callable对象,这是构造方法参数
        private Object outcome; //任务运行的结果
        private volatile Thread runner;//执行此任务的线程
      
        //等待该FutureTask的线程链表,对于同一个FutureTask,如果多个线程调用了get方法,对应的线程都会加入到waiters链表中,同时当FutureTask执行完成后,也会告知所有waiters中的线程
        private volatile WaitNode waiters;
        ......
    }
    

    FutureTask的成员变量并不复杂,主要记录以下几部分信息:
    1、状态
    2、任务(callable)
    3、结果(outcome)
    4、等待线程(waiters)

    构造方法

    Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似。

    
    FutureTask(Callable<V> callable);
    FutureTask(Runnable runnable, V result);
    

    FutureTask的方法

    run()方法

    FutureTask执行任务的方法当然还是run方法:

    public void run() {
            if (state != NEW ||
                !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        //执行任务
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        //设置结果
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                   //判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    run方法的大概逻辑如下:
    1、如果状态不为new或者运行线程runner失败,说明当前任务已经被其他线程启动或者已经被执行过,直接返回false
    2、调用call方法执行核心任务逻辑。如果调用成功则执行set(result)方法,将state状态设置成NORMAL。如果调用失败抛出异常则执行setException(ex)方法,将state状态设置成EXCEPTIONAL,唤醒所有在get()方法上等待的线程
    3、如果当前状态为INTERRUPTING(步骤2已CAS失败),则一直调用Thread.yield()直至状态不为INTERRUPTING

    set()方法

    除非已经设置或取消了该Future,否则将此Future的结果设置为给定值

    /**
         * Sets the result of this future to the given value unless
         * this future has already been set or has been cancelled.
         *
         * <p>This method is invoked internally by the {@link #run} method
         * upon successful completion of the computation.
         *
         * @param v the value
         */
        protected void set(V v) {
            if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
                outcome = v;
                U.putOrderedInt(this, STATE, NORMAL); // final state
                finishCompletion();
            }
        }
    

    finishCompletion()

    删除并发送所有等待线程的信号,调用done(),并使callable无效。

     /**
         * Removes and signals all waiting threads, invokes done(), and
         * nulls out callable.
         */
        private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                 ////通过CAS把栈顶的元素置为null,相当于弹出栈顶元素
                if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                              //唤醒等待的线程
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }
    

    finishCompletion的逻辑也比较简单:
    1、遍历waiters链表,取出每一个节点:每个节点都代表一个正在等待该FutureTask结果(即调用过get方法)的线程
    2、通过 LockSupport.unpark(t)唤醒每一个节点,通知每个线程,该任务执行完成

    get()方法

    get方法很简单,主要就是调用awaitDone方法:

    /**
         * @throws CancellationException {@inheritDoc}
         */
        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    
        /**
         * @throws CancellationException {@inheritDoc}
         */
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    

    awaitDone()

    /**
         * Awaits completion or aborts on interrupt or timeout.
         *
         * @param timed true if use timed waits
         * @param nanos time to wait, if timed
         * @return state upon completion or at timeout
         */
        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            // The code below is very delicate, to achieve these goals:
            // - call nanoTime exactly once for each call to park
            // - if nanos <= 0L, return promptly without allocation or nanoTime
            // - if nanos == Long.MIN_VALUE, don't underflow
            // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
            //   and we suffer a spurious wakeup, we will do no worse than
            //   to park-spin for a while
            long startTime = 0L;    // Special value 0L means not yet parked
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                int s = state;
                if (s > COMPLETING) { //如果state状态大于COMPLETING 则说明任务执行完成,或取消
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING)//如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快。
                    // We may have already promised (via isDone) that we are done
                    // so never return empty-handed or throw InterruptedException
                    Thread.yield();
                else if (Thread.interrupted()) { //如果该线程执行interrupt()方法,则从队列中移除该节点,并抛出异常
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                else if (q == null) { //构建节点
                    if (timed && nanos <= 0L)
                        return s;
                    q = new WaitNode();
                }
                else if (!queued)//把当前节点入栈
                    queued = U.compareAndSwapObject(this, WAITERS,
                                                    q.next = waiters, q);
                 //如果需要阻塞指定时间,则使用LockSupport.parkNanos阻塞指定时间
                //如果到指定时间还没执行完,则从队列中移除该节点,并返回当前状态
                else if (timed) {
                    final long parkNanos;
                    if (startTime == 0L) { // first time
                        startTime = System.nanoTime();
                        if (startTime == 0L)
                            startTime = 1L;
                        parkNanos = nanos;
                    } else {
                        long elapsed = System.nanoTime() - startTime;
                        if (elapsed >= nanos) {
                            removeWaiter(q);
                            return state;
                        }
                        parkNanos = nanos - elapsed;
                    }
                    // nanoTime may be slow; recheck before parking
                    if (state < COMPLETING)
                        LockSupport.parkNanos(this, parkNanos);//阻塞当前线程nanos秒
                }
                else
                    LockSupport.park(this);//阻塞当前线程
            }
        }
    

    整个方法的大致逻辑主要分为以下几步:
    1>如果当前状态值大于COMPLETING,说明已经执行完成或者取消,直接返回
    2>如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快
    3>如果当前线程是首次进入循环,为当前线程创建wait节点加入到waiters链表中
    4>根据是否定时将当前线程挂起(LockSupport.parkNanos LockSupport.park)来阻塞当前线程,直到超时或者线程被finishCompletion方法唤醒
    5>当线程挂起超时或者被唤醒后,重新循环执行上述逻辑

    cancel()

     public boolean cancel(boolean mayInterruptIfRunning) {
            //根据mayInterruptIfRunning是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则返回false
            if (!(state == NEW &&
                  U.compareAndSwapInt(this, STATE, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {//如果mayInterruptIfRunning为true,调用runner.interupt(),设置状态为INTERRUPTED
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        U.putOrderedInt(this, STATE, INTERRUPTED);
                    }
                }
            } finally {
                //唤醒所有在get()方法等待的线程
                finishCompletion();
            }
            return true;
        }
    
    public class FutureTaskDemo {
        public static void main(String[] args) {
            FutureTask<String> futureTask=new FutureTask<>(new Runnable(){
    
                @Override
                public void run() {
                    try {
                        Thread.sleep(20*1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"Hello world");
            Thread thread = new Thread(futureTask,"Thread Future");
            thread.start();
            try {
                String result=futureTask.get();
                System.out.println(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    运行时线程堆栈信息

    D:\android\progect\IdeaDemo>jps
    10624 Launcher
    1296 FutureTaskDemo
    5360 Jps
    12872
    
    D:\android\progect\IdeaDemo>jstack 1296
    2021-05-13 00:03:51
    Full thread dump Java HotSpot(TM) Client VM (25.202-b08 mixed mode, sharing):
    
    "Thread Future" #9 prio=5 os_prio=0 tid=0x15a1d800 nid=0x2040 waiting on condition [0x15cef000]
       java.lang.Thread.State: TIMED_WAITING (sleeping)
            at java.lang.Thread.sleep(Native Method)
            at com.idea.future.FutureTaskDemo$1.run(FutureTaskDemo.java:13)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.lang.Thread.run(Thread.java:748)
    "main" #1 prio=5 os_prio=0 tid=0x02a0dc00 nid=0x18a8 waiting on condition [0x00d4f000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x04cfd7e8> (a java.util.concurrent.FutureTask)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
            at java.util.concurrent.FutureTask.get(FutureTask.java:191)
            at com.idea.future.FutureTaskDemo.main(FutureTaskDemo.java:22)
    
    

    由于我在run方法里面设置了相当于耗时的操作,从堆栈信息可以得知,"Thread Future"的状态是TIMED_WAITING,而主线程"main"则处于WAITING的状态

    实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行

    又因为实现了 Future 接口,所以也能用来获得任务的执行结果

    提交给 ThreadPoolExecutor 去执行

    下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行

    
    // 创建FutureTask
    FutureTask<Integer> futureTask
      = new FutureTask<>(()-> 1+2);
    // 创建线程池
    ExecutorService es = 
      Executors.newCachedThreadPool();
    // 提交FutureTask 
    es.submit(futureTask);
    // 获取计算结果
    Integer result = futureTask.get();
    

    直接被 Thread 执行

    FutureTask 对象直接被 Thread 执行的示例代码如下所示。相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。

    
    // 创建FutureTask
    FutureTask<Integer> futureTask
      = new FutureTask<>(()-> 1+2);
    // 创建并启动线程
    Thread T1 = new Thread(futureTask);
    T1.start();
    // 获取计算结果
    Integer result = futureTask.get();
    

    总结

    利用 Java 并发包提供的 Future 可以很容易获得异步任务的执行结果,无论异步任务是通过线程池 ThreadPoolExecutor 执行的,还是通过手工创建子线程来执行的。

    利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决。

    相关文章

      网友评论

        本文标题:多线程-源码解析RunnableFuture

        本文链接:https://www.haomeiwen.com/subject/rvwzdltx.html