美文网首页
不怕难之FutureTask源码分析

不怕难之FutureTask源码分析

作者: 天外流星for | 来源:发表于2018-10-23 20:19 被阅读0次

一、引言

1. FutureTask在高并发场景下能确保任务只执行一次吗?

2. 任务还在执行的时候用户调用cancel能否让任务停止执行?

二、功能简介

FutureTask是一种异步任务(或异步计算),举个栗子,主线程的逻辑中需要使用某个值,但这个值需要负责的运算得来,那么主线程可以提前建立一个异步任务来计算这个值(在其他的线程中计算),然后去做其他事情,当需要这个值的时候再通过刚才建立的异步任务来获取这个值,有点并行的意思,这样可以缩短整个主线程逻辑的执行时间。

与1.6版本不同,1.7的FutureTask不再基于AQS来构建,而是在内部采用简单的Treiber Stack来保存等待线程。

三、前置知识

LockSupport

LockSupport是用来创建锁及其他同步类的基本线程阻塞元素,它的park和 unpark能够分别阻塞线程和解除线程阻塞。它提供了可以指定阻塞时长的park方法。park和unpark的基本接口为:

public static void park() {

    unsafe.park(false, 0L);

}

public static void unpark(Thread thread) {

    if (thread != null)

        unsafe.unpark(thread);

}

Unsafe

Java不能够直接访问操作系统底层,而是通过本地方法来访问。Unsafe提供了硬件级别的原子访问,主要提供一下功能:

1. 分配释放内存

2. 定位某个字段的内存位置

3. 挂起一个线程和恢复,更多的是通过LockSupport来访问。park和unpark

4. CAS操作,比较一个对象的某个位置的内存值是否与期望值一致,一致则更新对应值,此更新是不可中断的。主要方法是compareAndSwap*

并发工具三板斧

状态,队列,CAS

四、源码分析

1. FutureTask介绍

FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,可以把它理解为是可以返回结果的Runnable。

使用FutureTask的优势有:

可以获取线程执行后的返回结果;

提供了超时控制功能。

它实现了Runnable接口和Future接口:

2. FutureTask的状态

在FutureTask中,状态是由state属性来表示的,不出所料,它是volatile类型的,确保了不同线程对它修改的可见性:

private volatile int state;

private static final int NEW          =0;

private static final int COMPLETING  =1;

private static final int NORMAL      =2;

private static final int EXCEPTIONAL  =3;

private static final int CANCELLED    =4;

private static final int INTERRUPTING =5;

private static final int INTERRUPTED  =6;

状态转换路径

栈结构


run方法

public void run(){

/*

    * 首先判断状态,如果不是初始状态,说明任务已经被执行或取消;

    * runner是FutureTask的一个属性,用于保存执行任务的线程,

    * 如果不为空则表示已经有线程正在执行,这里用CAS来设置,失败则返回。

    */

if(state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))

 return;

try{

        Callable<V> c = callable;

          // 只有初始状态才会执行

         if(c !=null&& state == NEW) {

            V result;

           boolean ran;

try{

               // 执行任务

                result = c.call();

               // 如果没出现异常,则说明执行成功了

               ran =true;

}catch(Throwable ex) {

result =null;

ran =false;

// 设置异常

setException(ex);

  }

// 如果执行成功,则设置返回结果

if(ran)

                set(result);

        }

}finally{

// runner must be non-null until state is settled to

// prevent concurrent calls to run()

// 无论是否执行成功,把runner设置为null

runner =null;

// state must be re-read after nulling runner to prevent

// leaked interrupts

ints = state;

// 如果被中断,则说明调用的cancel(true),

// 这里要保证在cancel方法中把state设置为INTERRUPTED

// 否则可能在cancel方法中还没执行中断,造成中断的泄露

if(s >= INTERRUPTING)

            handlePossibleCancellationInterrupt(s);

    }

}

run方法总结

校验当前任务状态是否为NEW以及runner是否已赋值。这一步是防止任务被取消。

double-check任务状态state

执行业务逻辑,也就是c.call()方法被执行

如果业务逻辑异常,则调用setException方法将异常对象赋给outcome,并且更新state值

如果业务正常,则调用set方法将执行结果赋给outcome,并且更新state值

awaitDone方法

//  第一次循环:创建栈头结点

// 第二次循环: 入栈操作

// 第三次循环: 开始阻塞,等待通知

//  队列针对一个FutureTask示例

// 一个FutureTask示例的多个线程,依次入栈,通过next节点链接,后面再依次唤醒

private int awaitDone(boolean timed, long nanos)throws InterruptedException {

// 计算到期时间

    final long deadline = timed ? System.nanoTime() + nanos :0L;

    WaitNode q =null;

    boolean queued =false;

    for (; ; ) {

// 如果被中断,删除节点,抛出异常

        if (Thread.interrupted()) {

            removeWaiter(q);

            throw new InterruptedException();

        }

       int s = state;

        // 如果任务执行完毕并且设置了最终状态或者被取消,则返回

        if (s > COMPLETING) {

          if (q !=null)

          q.thread =null;

            return s;

        }

// s == COMPLETING时通过Thread.yield();让步其他线程执行,

// 主要是为了让状态改变

        else if (s == COMPLETING)// cannot time out yet

            Thread.yield(); // 主动让出CPU

            // 创建一个WaitNode

        else if (q ==null)

          q =new WaitNode();   // 第一次头节点

            // CAS设置栈顶节点

        else if (!queued)

         queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);  // 入栈操作

            // 如果设置了超时,则计算是否已经到了开始设置的到期时间

        else if (timed) {

             nanos = deadline - System.nanoTime();

            // 如果已经到了到期时间,删除节点,返回状态

            if (nanos <=0L) {

              removeWaiter(q);

                return state;

            }

            // 阻塞到到期时间

            LockSupport.parkNanos(this, nanos);

        }

        // 如果没有设置超时,会一直阻塞,直到被中断或者被唤醒

        else

            LockSupport.park(this);

    }

}

awaitDone方法总结

计算deadline,也就是到某个时间点后如果还没有返回结果,那么就超时了。

进入自旋,也就是死循环。

首先判断是否响应线程中断。对于线程中断的响应往往会放在线程进入阻塞之前,这里也印证了这一点。

判断state值,如果>COMPLETING表明任务已经取消或者已经执行完毕,就可以直接返回了。

如果任务还在执行,则为当前线程初始化一个等待节点WaitNode,入等待队列。这里和AQS的等待队列类似,只不过Node只关联线程,而没有状态。AQS里面的等待节点是有状态的。

计算nanos,判断是否已经超时。如果已经超时,则移除所有等待节点,直接返回state。超时的话,state的值仍然还是COMPLETING。

如果还未超时,就通过LockSupprot类提供的方法在指定时间内挂起当前线程,等待任务线程唤醒或者超时唤醒。

入栈示意图

五、总结

FutureTask是线程安全的,在多线程下任务也只会被执行一次;

注意在执行时各种状态的切换;

get方法调用时,如果任务没有结束,要阻塞当前线程,法阻塞的线程会保存在一个Treiber Stack中;

get方法超时功能如果超时未获取成功,会抛出TimeoutException;

注意在取消时的线程中断,在run方法中一定要保证结束时的状态是INTERRUPTED,否则在cancel方法中可能没有执行interrupt,造成中断的泄露。

相关文章

网友评论

      本文标题:不怕难之FutureTask源码分析

      本文链接:https://www.haomeiwen.com/subject/dgtzzftx.html