写的比较糙,大家可能会看的比较懵。其实本文就是把debug出来的逻辑给记录下来了而已。
正文
从ListeningExecutorService的submit开始分析。
在AbstractListeningExecutorService中重写了newTaskFor方法。
newTaskFor返回的是TrustedListenableFutureTask对象。
AbstractExecutorService的submit逻辑:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
所以submit执行的是TrustedListenableFutureTask的run逻辑。
TrustedListenableFutureTask的run逻辑执行的是TrustedFutureInterruptibleTask的run逻辑。
TrustedFutureInterruptibleTask的run逻辑是继承InterruptibleTask的。
InterruptibleTask的run逻辑:
public final void run() {
Thread currentThread = Thread.currentThread();
if (this.compareAndSet((Object)null, currentThread)) {
boolean run = !this.isDone();
T result = null;
Throwable error = null;
try {
if (run) {
result = this.runInterruptibly();
}
} catch (Throwable var9) {
error = var9;
} finally {
if (!this.compareAndSet(currentThread, DONE)) {
this.waitForInterrupt(currentThread);
}
if (run) {
if (error == null) {
this.afterRanInterruptiblySuccess(NullnessCasts.uncheckedCastNullableTToT(result));
} else {
this.afterRanInterruptiblyFailure(error);
}
}
}
}
}
模板模式。
其中调用的runInterruptibly、afterRanInterruptiblySuccess、afterRanInterruptiblyFailure都是抽象方法,在子类实现。
@ParametricNullness
abstract T runInterruptibly() throws Exception;
abstract void afterRanInterruptiblySuccess(@ParametricNullness T var1);
abstract void afterRanInterruptiblyFailure(Throwable var1);
子类TrustedFutureInterruptibleTask的这三个方法:
@ParametricNullness
V runInterruptibly() throws Exception {
return this.callable.call();
}
void afterRanInterruptiblySuccess(@ParametricNullness V result) {
TrustedListenableFutureTask.this.set(result);
}
void afterRanInterruptiblyFailure(Throwable error) {
TrustedListenableFutureTask.this.setException(error);
}
runInterruptibly执行的就是Callable的call方法。
如果把子类实现串进去,整体上InterruptibleTask的run逻辑其实类似于JDK FutureTask的run逻辑。
外部类TrustedListenableFutureTask的set和setException方法(继承自TrustedFuture),都会调用complete方法。
complete方法会调用executeListener执行所有的回调逻辑。
回调逻辑封装在CallbackListener的run:
public void run() {
if (this.future instanceof InternalFutureFailureAccess) {
Throwable failure = InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess)this.future);
if (failure != null) {
this.callback.onFailure(failure);
return;
}
}
Object value;
try {
value = Futures.getDone(this.future);
} catch (ExecutionException var3) {
this.callback.onFailure(var3.getCause());
return;
} catch (Error | RuntimeException var4) {
this.callback.onFailure(var4);
return;
}
this.callback.onSuccess(value);
}
Futures.getDone
@ParametricNullness
@CanIgnoreReturnValue
public static <V> V getDone(Future<V> future) throws ExecutionException {
Preconditions.checkState(future.isDone(), "Future was expected to be done: %s", future);
return Uninterruptibles.getUninterruptibly(future);
}
Uninterruptibles.getUninterruptibly
@ParametricNullness
@CanIgnoreReturnValue
public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
boolean interrupted = false;
try {
while(true) {
try {
Object var2 = future.get();
return var2;
} catch (InterruptedException var6) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
这里的get不会阻塞,因为Callable任务已经执行完了,这里只是单纯获取执行结果。
回到上面的run逻辑,后面就是根据future的get结果调用对应的回调逻辑。
网友评论