java并发编程之FutureTask

作者: miaoLoveCode | 来源:发表于2017-02-16 13:07 被阅读2244次

    引言

    FutureTask实现了接口Future,同Future一样,代表异步计算的结果。当然,FutureTask除了实现Future接口之外,还实现了Runnable接口,所以,FutureTask既可以由Executor来调度执行,也可以由调度线程调用FutureTask.run()直接执行。

    FutureTask状态

    根据FutureTask的run方法是否被执行以及是否被执行完成,FutureTask有3种状态:

    1. 未启动:run方法被执行前,FutureTask处于未启动状态;

    2. 已启动:run方法被执行的过程中,FutureTask处于已启动状态;

    3. 已完成:run方法执行完成后正常结束,或者被取消,或者是执行过程中抛出异常导致的异常结束,FutureTask处于已完成状态。

    FutureTask状态转换

    FutureTask状态转换可以总结为下图:


    FutureTask状态转换图
    • 当FutureTask处于未启动或者是已启动状态时,此时还未得到线程执行结果,调用FutureTask.get方法会导致线程阻塞;

    • 当FutureTask处于已完成状态时,此时已经得到线程执行结果,调用FutureTask.get方法会立即返回线程执行结果;

    • 当FutureTask处于未启动状态时,调用FutureTask.cancel方法将会导致该task永远不会被执行;

    • 当FutureTask处于启动状态时,调用FutureTask.cancel方法将会中断该任务的执行,至于会不会对任务产生影响由cancel方法的入参决定;

    • 当FutureTask处于已完成状态,调用FutureTask.cancel方法返回false。

    接下来就以run、get和cancel方法为切入点分析FutureTask具体实现。

    FutureTask源码分析

    在开始分析源码之前,我们先来看看FutureTask的成员变量:


    成员变量
    1. state:记录task状态,可取值为0~6;

    2. callable:task实际载体,run方法实际调用callable.call();

    3. outcome:线程执行任务结束后的返回结果;

    4. runner:记录执行task的线程;

    5. waiters:等待task执行结果的线程队列。

    构造方法
    构造方法

    FutureTask提供两个构造方法来封装Callable和Runnable,当构造方法传入参数为Runnable,会通过Executors.callable方法将其转换成Callable。


    Executors.callable方法实现
    get方法实现

    FutureTask提供带超时时间的get和不到超时时间的get:


    get方法实现

    对比带超时时间和不带超时时间的get方法实现,最为重要的实现就是等待直到task状态变为已完成状态或者等待时间超过超时时间,对应到源码就是加红框的awaitDone方法。接下来我们来具体分析一下awaitDone方法到底是如何来实现线程阻塞等待的。

    awaitDone方法实现

    awaitDone实现
    具体的执行流程如下:
    1. 计算等待时间deadline,如果是带超时时间的get,deadline = 当前时间 + 等待时间,如果是不带超时时间的get,deadline = 0;

    2. 判断线程是否中断,如果线程中断,将当前线程从等待队列waiters中移除,抛出中断异常,否则,跳转到步骤3;

    3. 获取task状态state:

    • 如果task状态为已完成状态,将等待线程节点的线程置为null,返回state;

    • 如果task状态为正在执行,调用Thread.yield()将线程从执行状态变为可执行状态;

    • 否则,跳转到步骤4;

    1. 如果等待线程节点q为null,初始化等待线程节点q,否则,跳转到步骤5;

    2. 如果当前等待线程节点q还未成功进入等待队列waiters,进入线程等待队列,否则,跳转到步骤6;

    3. 判断是否是带超时时间的get:

    • 如果是带超时时间get,判断当前是否超时,如果已经超时,将当前等待节点q从waiters中移出,返回task状态state,如果还未超时,调用LockSupport.parkNanos方法阻塞当前线程;

    • 否则,跳转到步骤7;

    1. 调用LockSupport.park方法,阻塞当前线程,然后跳转到步骤2。

    从get方法整个流程可以看出:

    • FutureTask维护一个等待线程队列waiters,如果task还未执行完毕,调用get方法的线程会先进入等待队列自旋等待;

    • awaitDone方法其实是个死循环,直到task状态变为已完成状态或者等待时间超过超时时间或者线程中断才会跳出循环,程序结束;

    • 为了节省开销,线程不会一直自旋等待,而是会阻塞,使用LockSupport的park系列方法实现线程阻塞;

    run方法实现
    run方法实现

    具体执行流程如下:

    1. 判断task状态,如果task还未执行,跳转到步骤2,否则,返回,程序结束;

    2. 通过CAS设置执行task的线程,设置成功,跳转到步骤3,否则,返回,程序结束;

    3. 执行callable.call方法,调用set方法设置call方法返回结果以及task状态;

    4. 设置当前运行当前task的线程为null;

    5. 判断当前task状态,如果task状态为正在中断或者已中断,调用Thread.yield()将线程从执行状态变为可执行状态。

    set方法实现

    set方法实现
    set方法主要干了这两件事:
    1. 设置返回结果outcome以及task状态state;

    2. 调用finishCompletion方法操作等待队列waiters中的等待线程。

    finishCompletion实现

    finishCompletion实现
    整个finishCompletion方法清除和唤醒了等待队列中的等待线程,调用get方法被阻塞的线程也就是在这里调用LockSupport.unpark方法被唤醒的。
    cancel方法实现
    cancel方法实现
    1. 判断task状态,如果不为未启动状态,返回false,程序结束,否则,跳转到步骤2;

    2. 判断入参mayInterruptIfRunning:

    • true:CSA设置state为正在中断,设置失败返回false,否则中断正在运行task的线程,CAS设置state为已中断;

    • false:CSA设置state为已取消,设置失败返回false,需要注意的是,正在运行task的线程是不会中断的,换句话说,入参为false时不会对task的执行有任何影响。

    注:根据代码实现:
    - 处于启动状态的task,调用cancel方法是否会对task的执行有所影响完全依赖于cancel方法的入参,true时会有影响,false时不会有影响;
    - 处于未启动状态的task,调用cancel方法后,该task将不会再被执行。

    1. 调用finishCompletion方法清除和唤醒等待队列waiters中的等待线程,返回true,程序结束。

    从get、run、cancel方法的实现,FutureTask的线程等待与唤醒可以总结为下图:


    FutureTask线程等待唤醒

    后记

    到这里为止,FutureTask的源码就分析就结束了。做一个简短的总结:

    1. FutureTask是通过LockSupport来阻塞线程、唤醒线程;

    2. 对于多线程访问成员变量waiters、state,都采用CAS来操作;

    总的来说,FutureTask是一个非常好的CAS和LockSupport搭配使用的例子。

    相关文章

      网友评论

      • 江江的大猪:cancel方法实现的2里true和false的写错字了,写成了csa:wink:
        miaoLoveCode:@肥肥小浣熊 哇呀,哈哈,我都没注意。
      • b5c514d458b5:加超时时间的等待,等待的开始时间是从加入队列等待的时间开始算起?还是从正式执行的时间算起?
        miaoLoveCode:从加入等待队列开始。
      • Terminalist:想问个问题,明明FutureTask就一个线程,为什么还要设置等待队列?在awaitDone方法还要做这么多操作?
        我的阿福:@Terminalist futuretask就是一个线程?
        外号菜菜子:@Terminalist 你细看一下awaitDone 你会发现如果future没有执行完 他会将当前线程封装为一个waitNode 然后置在FutureTask的waiters链表中 这样做是为了防止外部同时有多个线程get 而task又没有执行完毕时 将多个线程同时阻塞 然后当task执行完毕(抛出异常) 将waiters链表中的thread unpark
        Terminalist:目的是为了高并发场景下必须执行这个线程一次?

      本文标题:java并发编程之FutureTask

      本文链接:https://www.haomeiwen.com/subject/bacqwttx.html