Callable接口
Java中的子线程通常是通过Thread
或者Runnable
的方式实现的,但是这种方式只能通过回调,或者共享变量等方式来传递数据,而Callable
则是可以获取返回结果的一种子线程实现方式。
Callable是一个接口,源码如下:
public interface Callable<V> {
V call() throws Exception;
}
非常简单,只有一个方法,和一个泛型V,所以我们创建Callable
对象的时候,也只需要指定返回类型并实现call
方法就可以了。
Future接口
看完了Callable
接口,会发现它非常简单,没有办法在子线程中直接通过它来获取到返回结果的,这时候就需要Future
发挥作用了。源码如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
-
boolean cancel(boolean mayInterruptIfRunning)
在任务开始前取消,传入值表示任务开始后是否允许打断,返回值表示是否取消成功(任务已经开始不允许打断,已经运行结束,已经取消等等状态会返回失败) -
boolean isCancelled()
是否已经被取消 -
boolean isDone()
是否完成任务 -
V get()
尝试获取返回结果,阻塞方法 -
V get(long timeout, TimeUnit unit)
同上,可以指定超时时间
可以看到,Future
实际上可以理解为Callable
的管理类。
在线程池中执行任务时,除了execute
方法之外,还有一个submit
方法:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
它返回的就是一个Future
对象,可以通过它来回去Callable
任务的执行结果:
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Sub Thread is calculating...");
Thread.sleep(10000);
return 10;
}
};
Future<Integer> future = Executors.newCachedThreadPool().submit(callable);
try {
System.out.println("Main Thread start waiting result... ");
int res = future.get();
System.out.println("Main Thread get result: " + res);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//运行结果:
Main Thread start waiting result...
Sub Thread is calculating...
Main Thread get result: 10
FutureTask
如果不用Java提供的线程池,直接用Thread
怎样在子线程中运行Callable
呢? 这时候就要用到FutureTask
类了。
FutureTask
实现了Future
和Runnable
接口,这就意味着,它既可以放在Thread
中去运行,又能够对任务进行管理,下面是源码:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
可以看到,构造函数中传入了待运行的任务Callable
对象或者Runnable
对象和指定的返回结果,V result
用来指定运行完成后的返回值,如果不想用指定值,可以用Future<?> f = new FutureTask<Void>(runnable, null)
来返回null。采用Callable构造方法创建的FutureTask对象,执行完毕返回的是实际运算结果,而Runnable 构造函数返回值是传入的result。
task的状态
FutureTask中持有的任务对象,有以下几种状态:
private static final int NEW = 0; //新建或运行中
private static final int COMPLETING = 1;//任务运行结束,正在处理一些后续操作
private static final int NORMAL = 2;//任务已经完成,COMPLETING的下一个状态
private static final int EXCEPTIONAL = 3;//任务抛出异常,COMPLETING的下一个状态
private static final int CANCELLED = 4;//任务被取消
private static final int INTERRUPTING = 5;//收到打断指令,还没有执行interrupt
private static final int INTERRUPTED = 6;//收到打断指令,也执行了interrupt
可能的状态变化主要有以下几种:
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
task的执行过程
FutureTask实现了Runnable接口,是可以直接放在Thread中执行的,实际上运行的就是它的run方法:
public void run() {
//r如果当前状态不是NEW,说明任务已经执行完成了,直接返回
//如果当前状态是NEW,尝试用CAS方式将当前线程赋值给RUNNER,赋值前RUNNER的值应该是null,否则赋值失败
//赋值失败表示已经有线程执行了run方法,直接返回
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//运行中抛出异常
setException(ex);
}
//ran为true,说明正常运行结束,得到了返回结果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
执行结果其实是比较简单的,通过RUNNER
来记录执行任务的线程,从而保证只有一个线程可以执行该任务。运行结束后有两个出口:
-
setException(ex);
运行中出错,抛出异常 -
set(result);
任务执行完毕,获取到返回值
protected void setException(Throwable t) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
finishCompletion();
}
}
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒等待线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
这两个方法实际上是一样的,都是将状态赋值为COMPLETING
,然后保存结果(运行结果或错误信息),再执行finishCompletion
方法,通知WAITERS
里记录的等待线程继续执行,并清空WAITERS
。
获取返回结果
获取返回结果是通过get()
方法:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
//cas机制,将新建节点q的next指向原来的节点workers,然后将workers更新为新建的节点。workers(WAITERS)实际上就是持有了所有等待线程的一个链表
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
get()
方法比较简单,先判断当前状态,如果状态 < COMPLETING
,说明任务没有执行完毕,直接调用awaitDone
方法。
awaitDone
方法可以接受两个参数,用来指定是否设置超时时间。它内部是一个无限for循环。下面是awaitDone
方法的执行步骤(忽略超时设置):
-
进入
awaitDone
方法时,state一定是小于COMPLETING
的,第一次会走else if (q == null)
分支,创建一个WaitNode()
对象用来保存当前线程 -
第二次循环q已经不是null了,如果任务仍然没有结束,会执行
else if (!queued)
分支,queued
表示创建的WaitNode()
是否已经添加到链表里,如果没有尝试添加,直到添加成功为止。 -
等待线程添加成功以后进入下一个循环,此时如果任务仍然没有结束,会走到else分支,挂起当前线程(阻塞)
-
此处阻塞的是等待结果的线程,也就是调用
FutureTask
的get()
方法的线程,而不是执行任务的线程。阻塞线程用的是LockSupport.park(this)
方法,唤醒的方法是LockSupport.unpark()
,该方法在上边的finishCompletion()
中出现了,也就是说,任务执行结束(运行完,抛出异常,被取消)时,等待的线程才会被唤醒,继续下一次循环。 -
任务结束以后,如果state是
COMPLETING
状态,说明一些清理任务还没有执行完,等待的线程会让出cpu,让其他线程优先执行 -
直到state 大于
COMPLETING
,说明FutureTask
已经完全结束了,此时会会执行(s > COMPLETING)
分支,把节点置空,并返回。
awaitDone
返回以后,说明任务已经执行完成了,会进入report
方法:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
可以看到,如果是正常结束,或者抛出异常结束,会返回结果,而如果是被取消,则会抛出异常。
总结
-
FutureTask
可以视为一个管理Callable
任务的工具类,执行Callable
任务的是FutureTask
的run
方法,所以,可以通过new Thread(futuretask)
的方法来实现子线程执行任务 -
获取执行结果是通过
FutureTask
的get
方法,调用该方法后,如果线程会被挂起,知道任务结束为止 -
获取结果的线程数量没有限定,可以是任意个线程
-
获取结果的线程被挂起以后,可以通过取消,超时等方法在任务执行完毕以前结束挂起状态。
网友评论