美文网首页
FutureTask源码走读

FutureTask源码走读

作者: 忘净空 | 来源:发表于2017-04-03 22:00 被阅读18次

    FutureTask类图

    从类图可以看出FutureTask简介实现了Runnable、Future接口,我们知道Future用于表示异步计算的结果,所以通过它我们可以获得线程的执行结果。如何获得线程的执行结果呢?

    FutureTask获取执行结果

    一般我们按照下面的方式使用FutureTask

    //1. 创建FutureTask
    FutureTask futureTask = new FutureTask(new CallableTest());
    //2. 启动线程
    new Thread(futureTask).start();
    //3 获取线程执行结果
    String result = (String) futureTask.get();
    

    源码走读

    //创建FutureTask对象
    public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
    }
    
    //调用run方法,执行我们的业务逻辑
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //我们重写的call的方法的执行
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)//call方法执行完成执行唤醒操作
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    //执行call方法的时候,我们主线程接着执行get()方法
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            //阻塞主方法的逻辑
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    //未被唤醒会一直等待直到超时或知道被中断
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        //等待队列
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //阻塞操作(有超时时间)
                LockSupport.parkNanos(this, nanos);
            }
            else
                //阻塞操作
                LockSupport.park(this);
        }
    }
    
    //run()中的唤醒操作
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //执行后结果赋值给outcome
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    //依次唤醒阻塞的线程
    private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; 
                        q = next;
                    }
                    break;
                }
            }
            done();
            callable = null;
        }
    
    

    从上面的分析可以看出:异步计算线程启动后,主线程(或者其他调用get()方法的线程)将被放在一个等待对列中,同时被阻塞(通过LockSupport类的park方法),知道异步计算线程执行完成后,等待队列中的线程将被依次唤醒,并且或得计算结果。

    相关文章

      网友评论

          本文标题:FutureTask源码走读

          本文链接:https://www.haomeiwen.com/subject/sknoottx.html