美文网首页
JUC--FutureTask解析

JUC--FutureTask解析

作者: 噜啦噜啦嘞_d31b | 来源:发表于2021-06-13 13:33 被阅读0次
FutureTask的继承结构为:
  1. FutureTask实现RunnableFuture接口,RunnableFuture接口实现了Runnable接口和Future接口,Future接口为实现异步线程的接口其他提供多种方法,cancel()取消线程,isDone(),判断线程是否执行完毕,get()获取线程执行结构,get(long timeout, TimeUnit unit)在规定时间内获取线程返回结果。
  2. 对future理解可为:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。
  3. FutureTask解析:
    当我们用Callable方法去创建线程的时候,步zou为,创建一个callable接口的实现类然后将其作为参数构造出FuturnTask对象,将FutureTask对象作为具体任务的承载体交给Thread去执行,当调用Thread的run方法的时候,执行FuturnTask的run方法
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }

futurnTask中定义了线程任务执行的7个状态,分别为NEW(0新建),COMPLETING(1完成)NORMAL(2普通),EXCEPTIONAL(3异常),CANCELLED(4取消),INTERRUPTING(5中断),INTERRUPTED(6中断)。
当执行run方法的时候先进行任务状态的判断,不是新建状态代表已经执行完毕或者发生异常,直接renturn,否为继续向下,取当前FutureTask承载的callable对象,继续进行判空处理,然后调用承载对象的call方法进行执行。当执行完毕之后执行set方法
将当前执行任务的接口赋值给outcome然后使用CAS替换当前FutureTask对象的状态.

 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }

之后执行finishCompletion()方法进行后续处理,该方法主要作用为使用 LockSupport.unpark(t);方法唤醒因为调用get方法获取任务返回值结果而被阻塞的线程(被阻塞的线程存在链表之中)

   for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
  1. get()方法
    FutureTask为我们提供两个get方法一个定时获取一个阻塞等待获取,本文以get()为例.
 public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

同样先是判断状态,判断当前FutureTask承载的任务是否执行完毕,如果没有执行完毕,则执行awaitDone方法
先判断是否设置超时时间 timed为false为没有设置 true为设置了等待时间然后进行循环,如果当前任务在执行过程中被中断则直接移除当前调用get方法的线程抛出异常,然后就是进行状态判断如果小于完成状态则表示为新建,将当前线程包装为一个WaitNode节点使用CAS方式存入阻塞队列之中(之后继续进行循环,如果状态不满足完成状态则 LockSupport.park(this)阻塞当前线程,当执行完毕的时候会run方法会进行一系列操作将状态设置为完成状态,然后唤醒被阻塞的线程,否则会一直阻塞在这里)被唤醒后,如果等于完成状态表示任务已经执行完毕,在进行后续处理,使当前线程让步即可,无需等待阻塞,如果大于完成状态则表示执行完毕,返回状态。回到get()方法执行report(s)方法。

   private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;   //包装当前调用get方法的线程为WaitNode节点
        boolean queued = false;  //调用get()方法线程的阻塞队列
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

该方法的判断任务执行状态如果为普通则代表没有发生异常状态(将返回值包装为对应的泛型对象返回),如不是普通则代表被取消或者执行异常,直接抛出异常

 private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

相关文章

网友评论

      本文标题:JUC--FutureTask解析

      本文链接:https://www.haomeiwen.com/subject/astgeltx.html