美文网首页
走马观花-FutureTask和Callable

走马观花-FutureTask和Callable

作者: OkCoco | 来源:发表于2018-03-27 20:56 被阅读0次

    创建线程的方式

    • 直接创建Thread
            new Thread() {
                    @Override
                    public void run() {
                        super.run();
                        setContent("来自Thread");
                    }
                }.start();
    
    • 通过Thread执行Runnable
            new Thread(new Runnable() {
                    @Override
                    public void run() {
                        setContent("来自Runnable");
                    }
                }).start();
    
    • 通过FutureTask和Callable
            //1.创建一个类实现Callable接口并得到实现类对象
            TestCallable callable = new TestCallable();
            //2.将Callable对象构建成FutureTask
            FutureTask task = new FutureTask<>(callable);
            //通过FutureTask实例创建Thread对象
            Thread thread = new Thread(task);
    
            thread.start();
    

    这些方法都可以开启一个新线程。那么他们之间有什么不同呢?
    Thread继承自Runnable,Runnable的源码:

        public interface Runnable {
            /**
             * 当一个对象实现了Runnable接口去实现一个线程是,开启该线程就会在此线程中回调run()方法
             */
            public abstract void run();
        }
    

    Runnable类的介绍:

      Runnable接口应该被任何一个想要执行一个新线程的类对象实现。且这个类必须定义一个不带参数的方法run()
      这个接口被设计来为那些存活着的时候想要去执行某些代码的对象提供一个协议。例如:Thread实现了Runnable接口,Thread对象存活意味着Thread启动了且还未停止。
      此外,Runnable提供了一种方法,让类在不继承Thread的情况下处于活动状态。一个实现了Runnable的类可以通过实例化一个Thread实例并将它自己作为目标进行传递,而无需创建Thread的子类。 在大多数情况下,如果您只打算覆盖run()方法并且没有其他Thread方法,则应该使用Runnable接口。 这很重要,因为类不应该被子类化除非程序员打算修改或增强班级的基本行为.总而言之,就是借助Runnable来开启一个线程。

    Callable:

        @FunctionalInterface
        public interface Callable<V> {
            /**
             * 返回结果,或者抛出异常
             */
            V call() throws Exception;
        }
    

    Callable类的介绍:

      一个可以返回结果或者抛出异常的任务,需要实现一个不带参数的方法call()
      Callable接口通Runnable接口很相似,两者都是被设计来开启一个新线程的。不同的是,Runnable接口不能返回 一个结果或抛出异常。
      Executors类包含从其他常见形式转换为Callable类的实用程序方法。

    FutureTask类的介绍:

      可取消的异步计算。 该类提供Future的基本实现,其中包含启动和取消任务的方法,查询任务是否完成以及检索任务结果的方法。
      只有在任务完成后才能检索结果; 如果任务还没有完成,get方法将会被阻塞。 一旦任务完成,任务就不能重新启动或取消(除非使用runAndReset调用任务)
      FutureTask可以用来包装一个Callable对象。因为FutureTask实现了Runnable接口,且FutureTask能被Executor提交
      除了作为独立类使用外,此类还提供受保护的功能,这在创建自定义任务类时可能非常有用。

    FutureTask运行状态:

        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    

      任务的运行状态,初始化为NEW。 运行状态仅在方法set,setException和cancel中转换为终端状态。
      在完成期间,状态可能会呈现COMPLETING(结果正在设置)或INTERRUPTING(中断赛跑者以满足取消(true))的瞬态值。
      从这些中间状态到最终状态的转换使用更便宜的有序/惰性写入,因为值是唯一的并且不能进一步修改。

    可能的状态转换:

             NEW -> COMPLETING -> NORMAL
             NEW -> COMPLETING -> EXCEPTIONAL
             NEW -> CANCELLED
             NEW -> INTERRUPTING -> INTERRUPTED
    

    FutureTask构造器:

            public FutureTask(Callable<V> callable) {
                if (callable == null)
                    throw new NullPointerException();
                this.callable = callable;
                this.state = NEW;       // ensure visibility of callable
            }
    
    
            //以适配模式将Runnable对象适配成一个Callable对象
            public FutureTask(Runnable runnable, V result) {
                this.callable = Executors.callable(runnable, result);
                this.state = NEW;       // ensure visibility of callable
            }
    

    启动线程时,会调用FutureTask的run()方法:

        public void run() {
            //如果当前FutureTask的装填不是New,直接return
            //直接将runner变成当前线程
            if (state != NEW ||
                !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        //调用Callable的call方法(异步线程,可以做耗时操作)
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    compareAndSwapInt()方法作用及其参数意思

        /**
         *public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5)方法
         *  var1:需要改变的对象
         *  var2:偏移量
         *  var4:期待的值
         *  var5:改变后的值
         *  作用:调用该方法时,若var1所在地址偏移了var2之后所在的地址位置取到的值(value)和var4相等,则将
         *        value的值替换成var5
         *  返回:true :value的值被更改
         *
         *public native void putOrderedInt(Object obj, long offset, int value);
         *  作用:设置obj对象中offset偏移地址对应的整型field的值为指定值。这是一个有序或者
         *  有延迟的putIntVolatile方法,并且不保证值的改变被其他线程立即看到
         */
    

    set()方法:

          protected void set(V v) {
                 //若当前对象的state偏移了STATE之后的值等于NEW,则将state置为COMPLETETING
                 if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
                     outcome = v;
                     //将state置为NORMAL
                     U.putOrderedInt(this, STATE, NORMAL); // final state
                     finishCompletion();
                 }
             }
    

    finishCompletion()方法:

        private void finishCompletion() {
            // assert state > COMPLETING;
            ...
    
            //任务执行完成之后调用,protected方法,子类实现
            done();
    
            //重置Callable
            callable = null;
        }
    

    cancel(boolean)方法:

        public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW &&
                  U.compareAndSwapInt(this, STATE, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        U.putOrderedInt(this, STATE, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();
            }
            return true;
        }
    

    cancel()方法可以看到,将正在运行的线程通过调用interrupt()方法打断,然后重置state的值为INTERRUPTED.最后调用finishCompletion()方法重置一些变量与状态。

    isDone()和isCancelled():

        public boolean isCancelled() {
            return state >= CANCELLED;
        }
    
        public boolean isDone() {
            return state != NEW;
        }
    

    get()方法获取结果:
      get()是一个阻塞方法,为什么阻塞?其实就是调用该方法之后,开启一个无限循环,直到state的值变为Normal或大于等于CANCELLED才将值通过report()方法返回。

        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    

    运行中,进入awaitDone(false, 0L)方法开启循环等待:

        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            // The code below is very delicate, to achieve these goals:
            // - call nanoTime exactly once for each call to park
            // - if nanos <= 0L, return promptly without allocation or nanoTime
            // - if nanos == Long.MIN_VALUE, don't underflow
            // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
            //   and we suffer a spurious wakeup, we will do no worse than
            //   to park-spin for a while
            long startTime = 0L;    // 0L表示未停止
            WaitNode q = null;      //等待节点
            boolean queued = false; //是否入队
    
            //开启死循环,知道结果出现
            for (;;) {
                int s = state;
                //执行完成,返回结果
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                //正在执行
                else if (s == COMPLETING)
                    // We may have already promised (via isDone) that we are done
                    // so never return empty-handed or throw InterruptedException
                    //线程处于可执行状态
                    Thread.yield();
                else if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                //new 一个WaitNode对象
                else if (q == null) {
                    if (timed && nanos <= 0L)
                        return s;
                    q = new WaitNode();
                }
                else if (!queued)
                    queued = U.compareAndSwapObject(this, WAITERS,
                                                    q.next = waiters, q);
                else if (timed) {
                    final long parkNanos;
                    if (startTime == 0L) { // first time
                        startTime = System.nanoTime();
                        if (startTime == 0L)
                            startTime = 1L;
                        parkNanos = nanos;
                    } else {
                        long elapsed = System.nanoTime() - startTime;
                        if (elapsed >= nanos) {
                            removeWaiter(q);
                            return state;
                        }
                        parkNanos = nanos - elapsed;
                    }
                    // nanoTime may be slow; recheck before parking
                    if (state < COMPLETING)
                        LockSupport.parkNanos(this, parkNanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    

    其实可以看到,返回结果要么正常,要么抛出异常。最后调用report(int)方法将结果返回:

        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                //返回正常结果
                return (V)x;
            if (s >= CANCELLED)
                //抛出异常
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    

    相关文章

      网友评论

          本文标题:走马观花-FutureTask和Callable

          本文链接:https://www.haomeiwen.com/subject/zmtecftx.html