一般使用多线程操作的时候会使用Thread+Runnable进行处理,但是这种方式中,Runnable是没有返回值的,假设我们需要获取Runnable的返回值,可能需要如下特殊处理,伪代码如下
String returnValue1 = "";
String returnValue2 = "";
CountDownLatch cdl = ....
new Thread(()->{
// xxxx操作
returnValue1 = "返回值";
cdl.countDown
});
new Thread(()->{
// xxxx操作
returnValue2 = "返回值";
cdl.countDown
});
cdl wait// 程序阻塞在这
print returnValue1
print returnValue2
当Runnable运行完并且赋值完毕则通知CDL,最后主线程在wait处等待两个线程执行完毕,然后获取Runnable的返回值。
这样的做法比较麻烦,而JDK提供了一个叫做Future的东西,他实现了上述的功能,且使用上更加的简便,看下例子
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<String> future = executorService.submit(() -> {
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "xxxx";
});
try {
String value = future.get();
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
操作上更加的简便,调用Future的get方法的时候,就类似cdl wait+获取returnValue1
submit方法
那么下面看下其中是如何实现的,先看下submit方法,实现在java.util.concurrent.AbstractExecutorService中
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 将Callable转换成RunnableFuture,它继承了Runnable和Future
RunnableFuture<T> ftask = newTaskFor(task);
// 线程池的execute方法,参数类型为Runnable
execute(ftask);
return ftask;
}
从submit方法可以看出,submit也是执行的execute方法,虽然参数不一样,但是其中Callable转换成Runnable,即RunnableFuture的实现,并将其返回,也就是上述例子中的Future。
这里和第一个例子做类比,这里返回的Future共包含几个功能,简化了使用
- 赋值->returnValue1="xxx"
- 阻塞->cdl.wait
- 取值
newTaskFor
newTaskFor方法如下:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public FutureTask(Runnable runnable, V result) {
// runnable封装成callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
返回一个FutureTask,即RunnableFuture的实现类,也是一个Runnable的子类,当调用execute方法的时候,就会执行FutureTask的run方法,这里先不看run方法实现,先看下FutureTask的内部属性
/**
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
// 当前Future的状态值,加了volatile修改,则代表一个线程改变后,马上对另外线程可见
private volatile int state;
private static final int NEW = 0;// 初始状态
private static final int COMPLETING = 1;// 执行完毕,还未进行赋值
private static final int NORMAL = 2;// 最终完成状态
private static final int EXCEPTIONAL = 3;// 出现异常
private static final int CANCELLED = 4;// 取消状态
private static final int INTERRUPTING = 5;// 正被中断
private static final int INTERRUPTED = 6;// 被中断
private Callable<V> callable;
private Object outcome; // 返回值,不一定是Callable的返回值,出现异常的时候放的是异常对象
private volatile Thread runner;// 执行run方法的线程
private volatile WaitNode waiters;//阻塞等待的节点
run方法
这时候再看下run方法
public void run() {
// 状态不为初始化值,证明已经执行过,直接返回
// 状态为NEW,则将runner设置为当前线程,如果失败,证明别的线程已经在操作,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 状态为初始化状态才执行,因为有可能被中断或者调用cancel取消
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行用户定义的call方法,并取的返回值
result = c.call();
// 执行成功
ran = true;
} catch (Throwable ex) {
// 有异常出现,返回值置空,设置成执行失败
result = null;
ran = false;
// 设置状态,和设置返回值为异常对象
setException(ex);
}
if (ran)// 执行成功,设置返回值
set(result);
}
} finally {
// runner在状态值改变之后才能设置为空,否则可能出现多个线程执行run的情况
runner = null;
// 中断处理
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
setException和set方法
其中主要看setException和set方法
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
调用了set或者setException方法,状态会变成COMPLETING,执行完成后,会将状态设置为NORMAL或者EXCEPTIONAL,而成功时outcome是正常返回值,失败则是Throwable对象。
finishCompletion方法
最后,调用finishCompletion将阻塞的线程唤醒,遍历waiters,调用unpark唤醒线程,处理和AQS有相似之处
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null;
}
get方法
get方法有两种形式,一种是程序会一直等待结果返回,而另外一种是有等待时间的,当时间到了之后,还未返回,则会抛出异常
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 还未完成,则进入awaitDone,阻塞线程
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 完成之后调用方法获取返回值
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 还未完成,则进入awaitDone,阻塞线程
// 时间到了之后会返回状态值s,如果此时还未完成,那么抛出异常
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
可以看到两种方法实现是类似的,但是有时间参数的方法,在指定时间内无法获得返回值的话是会抛出异常的,这点在使用的时候需要注意
awaitDone方法会在状态为未完成状态下阻塞线程,当完成或者指定时间到达的时候返回当前状态,此时有两种情况
- 如果没有时间参数:返回完成状态(>COMPLETING的)、
- 如果有时间参数:返回当时的状态,有可能是NEW,COMPLETING或者其他
接下来看下awaitDone方法实现
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 是否有指定时间,如果没有则为0
// 如果有,则deadLine为当前时间+超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;// 表示当前线程节点
boolean queued = false;// 是否在排队
for (;;) {
if (Thread.interrupted()) {// 当前线程被中断
removeWaiter(q);// 从链表中移除当前线程节点
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {// 正常完成状态,直接返回
if (q != null)
q.thread = null;
return s;
}
// 还在执行当中,但是从上可以知道,这种状态很快变化为最终状态
// 所以这里使用yield而不是阻塞可能就是这个原因吧
else if (s == COMPLETING)
Thread.yield();
else if (q == null)
// 状态为NEW且为第一次循环,则代表还未结束处理,则先构造一个线程节点
q = new WaitNode();
else if (!queued)
// 在上一个分支判断后,进入下一次循环,如果还是NEW的状态
// 则将线程节点挂在链表头部
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// 如果有时间参数,则阻塞特定时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 减少得到deadLine已经到了,则将该节点移除,并返回当前状态
removeWaiter(q);
return state;
}
// 阻塞特定时间
LockSupport.parkNanos(this, nanos);
}
else
// 没有时间参数的话则一直阻塞知道整个任务完成
LockSupport.park(this);
}
}
这里逻辑不太难懂,总结一下分支:
- 线程如果被中断,则从连接移除当前线程并返回异常
- 如果为正常完成状态,则直接返回状态值
- 如果为COMPLETING状态则让出CPU,重新进入循环重新判断
- 如果为NEW状态且第一次循环则创建线程节点
- 如果为NEW状态且非第一次循环,则将线程节点挂在链表头部
- 如果为NEW状态且已经挂在了链表头部但是有时间参数则阻塞特定时间
- 如果为NEW状态且已经挂在了链表头部但是没有时间参数则一直阻塞直到任务完成
从上可以知道,一个线程调用get方法后,最多会执行4次循环:
- 创建线程节点
- 节点加入链表
- 阻塞
- 任务完成唤醒后再进行一次判断并返回
可以看到这里的循环次数还是比较多的,相当于先来几次自旋再阻塞。
cancel方法
Future还提供了取消任务的入口,即cancel方法,内部将对应的线程进行中断,使正在执行的线程退出
public boolean cancel(boolean mayInterruptIfRunning) {
// 状态为NEW
// 如果mayInterruptIfRunning为true,状态设置成INTERRUPTING状态
// 否则设置成CANCELLED
// 如果状态不为NEW或者说状态设置失败了,则返回false
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// 如果设置为true,那么取出当前正在执行的线程,并将其中断
// 最后设置中断完成状态
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒阻塞线程
finishCompletion();
}
return true;
}
- mayInterruptIfRunning为true:状态设置为INTERRUPTING,设置运行run方法的线程的中断标志
- mayInterruptIfRunning为false:状态设置为CANCELLED
综合run方法的逻辑,cancel方法不能取消非NEW状态的任务,且只是设置了一个中断位的标志,如果run方法已经执行到判断状态位后的代码准备运行或者已经运行了,那么cancel还是无法终止任务的执行
题外话
Dubbo实现了自己的Future,整体的交互过程其实是类似的,但是逻辑会比JDK自带的Future会简单一点,因为其中没有多个线程对Future进行get的操作,所以从get的性能上讲,Dubbo的会快一点
网友评论