美文网首页
并发编程系列之FutureTask源码学习笔记

并发编程系列之FutureTask源码学习笔记

作者: smileNicky | 来源:发表于2021-12-21 22:50 被阅读0次

    并发编程系列之FutureTask源码学习笔记

    1、什么是FutureTask类?

    上一章节的学习中,我们知道了Future类的基本用法,知道了Future其实就是为了监控线程任务执行的,接着本博客继续学习FutureTask。然后什么是FutureTask类?

    Future是1.5版本引入的异步编程的顶层抽象接口,FutureTask则是Future的基础实现类。同时FutureTask还实现了Runnable接口,所以FutureTask也可以作为一个独立的Runnable任务

    2、使用FutureTask封装Callable任务

    线程中是不能直接传入Callable任务的,所以需要借助FutureTask,FutureTask可以用来封装Callable任务,下面给出一个例子:

    package com.example.concurrent.future;
    
    import java.util.Random;
    import java.util.concurrent.*;
    
    /**
     * <pre>
     *      FutureTask例子
     * </pre>
     * <p>
     * <pre>
     * @author nicky.ma
     * 修改记录
     *    修改后版本:     修改人:  修改日期: 2021/08/28 18:04  修改内容:
     * </pre>
     */
    public class FutureTaskExample {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            FutureTask futureTask = new FutureTask(new CallableTask());
            Thread t = new Thread(futureTask);
            t.start();
            System.out.println(futureTask.get());
        }
        static class CallableTask implements Callable<Integer> {
            @Override
            public Integer call() throws Exception{
                Thread.sleep(1000L);
                return new Random().nextInt();
            }
        }
    }
    

    3、FutureTask UML类图

    翻下FutureTask的源码,可以看出实现了RunnableFuture接口

    public class FutureTask<V> implements RunnableFuture<V> {
    // ...
    }
    

    RunnableFuture接口是怎么样的?可以看出其实是继承了Runnable,Future

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    
    }
    
    

    在idea里画出FutureTask的uml类图:


    在这里插入图片描述

    所以,可以说FutureTask本质就是一个Runnable任务

    4、FutureTask源码学习

    • FutureTask类属性
    public class FutureTask<V> implements RunnableFuture<V> {
        // 状态:存在以下7中状态
        private volatile int state;
        // 新建
        private static final int NEW          = 0;
        // 任务完成中
        private static final int COMPLETING   = 1;
        // 任务正常完成
        private static final int NORMAL       = 2;
        // 任务异常
        private static final int EXCEPTIONAL  = 3;
        // 任务取消
        private static final int CANCELLED    = 4;
        // 任务中断中
        private static final int INTERRUPTING = 5;
        // 任务已中断
        private static final int INTERRUPTED  = 6;
    
        // 支持结果返回的Callable任务
        private Callable<V> callable;
        
        // 任务执行结果:包含正常和异常的结果,通过get方法获取
        private Object outcome; 
        
        // 任务执行线程
        private volatile Thread runner;
        
        // 栈结构的等待队列,该节点是栈中的最顶层节点
        private volatile WaitNode waiters;
    }
    
    • 构造方法
    // 传入callable任务
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    
    // 传入runnable任务、结果变量result
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
        
    
    
    • 是一个Runnable任务,run方法实现
    public void run() {
         // 两种情况直接返回
         // 1:状态不是NEW,说明已经执行过,获取已经取消任务,直接返回
         // 2:状态是NEW,将当前执行线程保存在runner字段(runnerOffset)中,如果赋值失败,直接返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行了给如的Callable任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 异常的情况,设置异常
                    setException(ex);
                }
                if (ran)
                    // 任务正常执行,设置结果
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            // 任务被中断,执行中断处理
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    

    setException方法:

    protected void setException(Throwable t) {
      // CAS,将状态由NEW改为COMPLETING(中间状态)
       if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 返回结果
            outcome = t;
            // 将状态改为EXCEPTIONAL
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    
    • get获取执行结果
    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            // 任务还没完成,调用awaitDonw
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            // 返回结果
            return report(s);
        }
    

    get超时的方法

    public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            // unit是时间单位,必须传
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            // 超过阻塞时间timeout,抛出TimeoutException
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    

    重点看下awaitDone方法:

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // 计算截止时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        // 
        boolean queued = false;
        // 无限循环,判断条件是否符合
        for (;;) {
            // 1、线程是否被中断,是的情况,移除节点,同时抛出InterruptedException
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            // 2、获取当前状态,如果状态大于COMPLETING
            // 说明任务完成了,有可能正常执行完成,也有可能是取消了任务
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    // thread置为null 等待JVM gc
                    q.thread = null;
                    //返回结果
                return s;
            }
            //3、如果状态处于中间状态COMPLETING
            //表示任务已经结束但是任务执行线程还没来得及给outcome赋值
            else if (s == COMPLETING) // cannot time out yet
                // 这种情况线程yield让出执行权,给其它线程先执行
                Thread.yield();
             // 4、如果等待节点为空,则构造一个等待节点
            else if (q == null)
                q = new WaitNode();
           // 5、如果还没有入队列,则把当前节点加入waiters首节点并替换原来waiters
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                //如果需要等待特定时间,则先计算要等待的时间
                // 如果已经超时,则删除对应节点并返回对应的状态
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 阻塞等待特定时间
                LockSupport.parkNanos(this, nanos);
            }
            else
               // 让线程等待,阻塞当前线程
                LockSupport.park(this);
        }
    }
    
    • cancel取消任务
    public boolean cancel(boolean mayInterruptIfRunning) {
        // 如果任务已经结束,则直接返回false
       if (!(state == NEW &&
             UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                 mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
           return false;
       try {    // in case call to interrupt throws exception
            // 需要中断任务的情况
           if (mayInterruptIfRunning) {
               try {
                   Thread t = runner;
                   // 调用线程的interrupt来停止线程
                   if (t != null)
                       t.interrupt();
               } finally { // final state
                   // 修改状态为INTERRUPTED
                   UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
               }
           }
       } finally {
           finishCompletion();
       }
       return true;
    }
    

    finishCompletion方法:

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 无限循环,遍历waiters列表,唤醒节点中的线程,然后将Callable置为null
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        // 唤醒线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                   // 置为null,让JVM gc
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
    
        done();
        
        callable = null;        // to reduce footprint
    }
    

    相关文章

      网友评论

          本文标题:并发编程系列之FutureTask源码学习笔记

          本文链接:https://www.haomeiwen.com/subject/svufqrtx.html