美文网首页
FutureTask

FutureTask

作者: Pillar_Zhong | 来源:发表于2019-06-21 14:20 被阅读0次
FutureTask.png 1561082003855.png
1561082025770.png

Runnable

run

public void run() {
    // Task的状态必须是NEW才能开始run
    // 将当前线程设置为task的runner,如果失败,直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 执行任务,并拿到返回结果
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                // 如果执行失败
                result = null;
                ran = false;
                // 唤醒等待队列,并返回异常
                setException(ex);
            }
            if (ran)
                // 唤醒等待队列,并返回执行结果
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        // 最后将runner置空
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        // 再次查看任务状态,看是不是被人cancel了
        if (s >= INTERRUPTING)
            // 等待中断完毕
            handlePossibleCancellationInterrupt(s);
    }
}

setException

protected void setException(Throwable t) {
    // 首先设置任务为COMPLETING的中间状态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {     
        outcome = t;
        // 如果前面进入中间状态成功,那么直接设为EXCEPTIONAL,不再去做CAS
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state        
        finishCompletion();
    }
}

set

protected void set(V v) {
    // 首先设置任务为COMPLETING的中间状态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 保存执行结果
        outcome = v;
        // 如果前面进入中间状态成功,那么直接设为EXCEPTIONAL,不再去做CAS
        // 也可以认为,在进入NORMAL前,必须是COMPLETING,所以也没必要再去做CAS
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

finishCompletion

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        // 首先先拿到等待队列的同时,清空列表
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 将等待队列依次唤醒,说明任务执行的结果有了
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    
    // 钩子,子类定义
    done();

    callable = null;        // to reduce footprint
}

handlePossibleCancellationInterrupt

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    // INTERRUPTING就代表该任务执行线程正在中断中,一直自旋到中断完毕才退出
    // 否则让出CPU资源,等待下个调度再来查看是否中断完毕,如果往复
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

Future

cancel

public boolean cancel(boolean mayInterruptIfRunning) {
    // 任务状态必须为NEW,且能更新为INTERRUPTING或CANCELLED,就往下继续
    // 否则,cancel失败
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        // 如果mayInterruptIfRunning为true,就表示当前在执行的任务会被中断
        // 反之,会让任务执行完,也就是说只会设置状态为CANCELLED,并不会做其他操作
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    // 中断任务执行线程
                    t.interrupt();
            } finally { // final state
                // 设置状态为INTERRUPTED,表示中断完成
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 唤醒等待列表
        finishCompletion();
    }
    return true;
}

isDone

public boolean isDone() {
    // 只要不是NEW,就认为任务已经完成,不管是成功,异常,还是取消。
    // 因为只要不是NEW,最终都会流转成终态,而不是中间态
    return state != NEW;
}

isCancelled

public boolean isCancelled() {
    // 从前面的Cancel来看,只要是CANCELLED/INTERRUPTING/INTERRUPTED,都认为是被取消
    return state >= CANCELLED;
}

get

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果是NEW或COMPLETING,说明任务没有完成,那么开始等待
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

awaitDone

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 自旋
    for (;;) {
        // 如果该线程已经被中断,那么移除该线程,抛出InterruptedException
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 如果大于COMPLETING,说明进入终态,也就是拿到了执行结果,返回s
        // 且将执行线程置空
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 如果还在COMPLETING,说明正在执行中,暂停等待
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 如果当前的waitnode为空,新建一个
        else if (q == null)
            q = new WaitNode();
        // 这里的意义是将新节点作为栈顶
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 超时处理                                         
        else if (timed) {
            nanos = deadline - System.nanoTime();
            // 如果已经到期,那么移除该waitnode,并返回任务状态
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            // 否则,超时等待
            LockSupport.parkNanos(this, nanos);
        }
        // 否则,阻塞,等待任务执行完唤醒
        else
            LockSupport.park(this);
    }
}

report

private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 只有当任务状态时NORMAL,才返回执行结果
    if (s == NORMAL)
        return (V)x;
    // 如果任务被取消,抛出CancellationException
    if (s >= CANCELLED)
        throw new CancellationException();
    // 否则任务的状态是EXCEPTIONAL,再抛出捕获的执行异常
    throw new ExecutionException((Throwable)x);
}

runAndReset

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 跟run不同的是,这里会重置任务状态为NEW,以便下次周期性的重复调用
    return ran && s == NEW;
}

相关文章

网友评论

      本文标题:FutureTask

      本文链接:https://www.haomeiwen.com/subject/mlpyqctx.html