美文网首页
FutureTask

FutureTask

作者: 小刀厨师 | 来源:发表于2016-12-01 16:35 被阅读0次

    FutureTask的是实现类图如下:

    Screenshot - 2016年12月01日 - 14时08分12秒.png

    它的实现也比较简单,主要是用了UNSAFE类来实现大部分的功能。
    它的属性主要有state(表示状态),callable(FutureTask指向的要运行的callable对象),outcome(Object对象,运行结果,可能是异常), runner(Thread对象 线程),waiters(WaitNode对象,它是链表元素,表示等待结果的线程)
    它的许多操作都是以状态驱动的,通过CAS改变状态成功后进行后续操作:
    下面两个方法在类中其他方法中会用到如run()方法中运行成功后设置结果值

        /**
         * Sets the result of this future to the given value unless
         * this future has already been set or has been cancelled.
         *
         * <p>This method is invoked internally by the {@link #run} method
         * upon successful completion of the computation.
         *
         * @param v the value
         */
        protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    
        /**
         * Causes this future to report an {@link ExecutionException}
         * with the given throwable as its cause, unless this future has
         * already been set or has been cancelled.
         *
         * <p>This method is invoked internally by the {@link #run} method
         * upon failure of the computation.
         *
         * @param t the cause of failure
         */
        protected void setException(Throwable t) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = t;
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }
    

    FutureTask的大部分方法比较简单,重点是一下几个:

        public boolean cancel(boolean mayInterruptIfRunning) {//取消方法
            if (!(state == NEW &&
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))//向置状体为取消
                return false;
            try {    // in case call to interrupt throws exception//置状态成功后
                if (mayInterruptIfRunning) {//是否使用中断线程
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();//取消线程运行后的后续操作
            }
            return true;
        }
    private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {//唤醒等待结果的线程
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//原子替换waiters变量为空
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);//唤醒线程
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();//这个方法为空,子类可以实现它增加额外的的操作
    
            callable = null;        // to reduce footprint
        }
    

    第二个获取结果方法

    public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//判断状态,然后等待,也会判断等待后返回的结果状态
                throw new TimeoutException();
            return report(s);//使用report方法发会结果
        }
        这个方法就是无限循环,每次循环只做满足条件的一件事
        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {//状态已经表示结束了
                    if (q != null)
                        q.thread = null;//把q的线程之空,它是当前线程,表示当前线程已经不等待了,不必在其他的方法中唤醒了,q也没从队列移除,主要减少移除队列的次数,提高性能
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();//生成等待者,详见相应类,很简单
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);//加入等待队列
                else if (timed) {//等待一段时间
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);//一直等待
            }
        }
        private void removeWaiter(WaitNode node) {//移除等待者,
            if (node != null) {
                node.thread = null;//首先将移除者的线程置空,后面删除线程为空的等待者
                retry:
                for (;;) {          // restart on removeWaiter race
                    for (WaitNode pred = null, q = waiters, s; q != null; q = s) {//一直到末尾才结束
                        s = q.next;
                        if (q.thread != null)//不是需要移除的节点,则增加个前节点
                            pred = q;
                        else if (pred != null) {//q现在是移除节点,但是前节点不为空,直接改链表结构
                            pred.next = s;
                            if (pred.thread == null) // check for race//前置节点的线程为空,则从头再来一次
                                continue retry;
                        }
                        else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                              q, s))//并发修改了首节点,失败则从头再来
                            continue retry;
                    }
                    break;
                }
            }
        }
    
    

    //以当前线程来运行

        public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))//置runner为当前线程
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    相关文章

      网友评论

          本文标题:FutureTask

          本文链接:https://www.haomeiwen.com/subject/hwmxmttx.html