还有一种提交任务的方式,就是ExecutorService的submit,再之前的文章介绍了这种提交任务方法的好处。
继承图
AbstractExecuotrService:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
将Callable封装成一个FutureTask。
image.pngFuture
Future接口代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。Future接口的定义如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- cancel():cancel()方法用来取消异步任务的执行。如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。如果任务还没有被执行,则会返回true并且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。
- isCanceled():判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。
- isDone():判断任务是否已经完成,如果完成则返回true,否则返回false。需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。
- get():获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。如果任务被取消则会抛出CancellationException异常,如果任务执行过程发生异常则会抛出ExecutionException异常(最好对其进行转换处理,如下面实例中的做法),如果阻塞等待过程中被中断则会抛出InterruptedException异常。
- get(long timeout,Timeunit unit):带超时时间的get()版本,如果阻塞等待过程中超时则会抛出TimeoutException异常
submit返回了Future,我们就可以利用Future来查看任务的状态isCancelled(是否被取消),isDone(是否完成),get阻塞直到任务完成返回任务执行结果;
FutureTask
FutureTask是Future的实现类,如上面类继承关系,它还继承了Runnable;所以FutureTask即可以作为Runnable被Thread执行,也可以作为Future获取Callable的结果。
实例
/**
* FutureTask一般配合ExecutorService使用
*/
public class FutureTest {
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("Thread " + Thread.currentThread().getName() + " is running");
int result = 0;
for(int i = 0; i < 100; i++){
result += i;
}
Thread.sleep(3000);
return result;
}
}
public static void main(String[] args) {
// 第一种方式:Future + ExecutorService
// Task task = new Task();
// ExecutorService executor = Executors.newCachedThreadPool();
// Future<Integer> future = executor.submit(task);
// executor.shutdown();
// 第二种方式: FutureTask + ExecutorService
// Task task = new Task();
// FutureTask<Integer> futureTask = new FutureTask<>(task);
// ExecutorService executor = Executors.newCachedThreadPool();
// executor.submit(futureTask); //将其用作Runnable
// executor.shutdown();
//第三种方式:Thread + FutureTask
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<>(task);
Thread thread = new Thread(futureTask);
thread.setName("task thread");
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread " + Thread.currentThread().getName() + " is running");
if (!futureTask.isDone()){
System.out.println("task is not done");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int result = 0;
try {
result = futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
throw Throwable2RunEx.launderThrowable(e.getCause());
}
System.out.println("result: " + result);
}
}
public class Throwable2RunEx {
public static RuntimeException launderThrowable(Throwable t){
if (t instanceof RuntimeException){
return (RuntimeException) t;
}else if(t instanceof Error){
throw (Error)t;
}else {
throw new IllegalStateException("Not Checked", t);
}
}
}
原理
FutureTask有两个构造函数,先来看看其中一个,另一个只是将Runnable包装成了Callable:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
初始将状态这位NEW,FutureTask有7状态来表示任务的执行情况
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** 代表存储被阻塞的线程的链表的头节点 */
private volatile WaitNode waiters;
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
NEW:初始状态。
COMPLETING :任务完成(正常完成或者抛异常),该状态是种中间状态,此时结果(计算结果或异常原因)还没有保存到outcome字段。
NORMAL:任务正常完成,结果被保存在了outcome字段,为最终状态。
EXCEPTIONAL :任务异常,异常原因保存在outcome字段,为最终状态。
CANCELLED:任务还没开始执行或者正在执行还没完成,此时用户调用cancel(false)方法取消任务但不中断任务执行线程,状态由NEW变为CANCELLED,为最终状态。
INTERRUPTING:任务还没开始执行或者正在执行还没完成,此时用户调用cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态由NEW变为INTERRUPTING,为中间状态。
INTERRUPTED:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。
可能的状态转化:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
关系转换
在上面的例子里,无论是将FutureTask传给线程池,还是new thread本质上是将它做为一个Runnable,所以我们来看看run方法:
run()
public void run() {
//将执行线程保存在runner字段;这里若状态不是NEW,任务可能被cancel,直接返回
//不执行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//任务执行异常
setException(ex);
}
if (ran)
//任务计算完成,写入结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
//如果任务中断,执行中断操作
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
setException
任务执行异常时调用的方法,主要对状态进行转化与保存异常原因
结合COMPLETING定义,与状态转换就很好理解下面的逻辑了
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
set
任务计算完成时调用,主要逻辑状态转换与保存任务计算结果
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
FutureTask有很多其他方法,用于获取任务执行结果,取消任务,查看任务执行状态等,接下来分析分析它们:
get
获取任务执行结果,没完成则阻塞等待
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
COMPLETING是任务执行完成的临界点,上面意思是还没执行完(任务完成,执行异常,任务被取消)就阻塞等待,完成了就report()返回结果。
awaitDone
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//线程中断,等待队列中删除节点,并抛出异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//任务完成(正常,异常,取消),置空thread,返回状态
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//此时outcome未赋值,让出执行权,重新竞争
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//构造一个等待节点
else if (q == null)
q = new WaitNode();
//未入队,利用CAS 将当前节点加入到等待队列头部
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//如果有时限,超时则删除节点返回状态,否则阻塞一定时间
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
//否则阻塞
else
LockSupport.park(this);
}
}
设计逻辑是:当调用线程获取任务时,如果任务未完成(state<COMPLETING),就创建当前线程的等待节点,再次循环检查一次,状态仍未变则将其加入阻塞队列头部,再循环检查,若状态改变,则最后会返回状态结果,否则会被挂起等待唤醒。这里如果任务线程被中断抛异常给调用方。
cancel
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
逻辑是:任务还没开始执行或者已经开始还没完成,都会根据mayInterruptIfRunning将状态改为INTERRUPTING或CANCELLED,不同的是任务开始执行线程不为空(run方法中开始会将runner赋值)会将线程中断;
任务已将完成(完成,异常,取消,中断)都会返回false,代表不能取消。
所谓的取消成功它影响的是setXXX操作
我们来举例说明下可能的操作情况:
情况一:任务没开始,由于状态被改为INTERRUPTING或CANCELLED,那么它就不会被执行;
情况二:任务开始了,没完成,此时cancel并不一定会立即中止任务(设置中断标记,wait,sleep,join立刻抛出InterruptedException异常),任务继续计算,直到计算完毕调用setXXX,该方法开始时通过CAS改变状态,它会失败,也就是任务被取消;
还有一点:若你想中断执行任务的线程,mayInterruptIfRunning传true,执行t.interrupt(),那么你设计的执行任务线程应该有相应的处理策略,所以建议使用线程池,它为你提供了健全的线程设计;
finishCompletion
前面的set,setException,cancel操作最后都会调用该方法。
FutureTask主要有两条线,一是:由任务线程调用run,然后set或setException,改变状态,它在那一直计算执行着,执行而完后再从waiterNode(头节点)开始唤醒阻塞着的线程;另一条是:调用方所在线程(如上面例子里的主线程)获取任务结果,任务没完成被阻塞着。那么如何实现的这种功能?
利用LockSupport,主调用方线程get在任务未完成状态下(<=COMPLETING)会创建一个节点WaitNode ,此时主调用方线程被保存在节点里;LockSupport.park(this)阻塞主调用方线程
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
LockSupport.park(this)
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
那么什么时候唤醒呢?任务完成(状态为最终态)时唤醒,即set,setException,cancel方法成功取消后,唤醒主调用方线程获取结果;
如何唤醒?在finishCompletion方法中唤醒
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//waiters变量重置为null,因为该阻塞链表将全部被唤醒
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//循环取出各节点,唤醒相应线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
该方法就是在任务最终完成后,将等待队列中的节点全部唤醒;
被阻塞的线程停在了awaitDone的for循环中,唤醒后,将确认任务最终状态返回,逻辑回到get方法,调用report方法返回结果;
report
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
image.png
看源码容易迷失再里面,所以建议带着问题来看源码,否则每个方法你都能看懂,最后你还是没啥印象。
总结:
一方面:FutureTask的run里计算任务,正常完成或执行异常都会调用setXXX来写入结果,状态变为NORMAL/EXCEPTIONAL,这里也是cancel起作用的地方,cancel阻止了结果的写入或是压根没让你计算任务,状态变为CANCELED/INTERPUTED,之后finishCompletion总会被调用,作用就是唤醒所有链表中的线程。
另一方面:主调用线程利用状态变量来查看任务状态,最重要的方法get。阻塞直到任务完成被唤醒,调用report返回结果,结果通过状态来决定最终选择。
还有需要注意的就是cancel(true),建议与线程池搭配使用,否则你的执行线程应该健全,有处理中断标记的逻辑。
网友评论