我们知道 netty
里面基本上都是异步操作,那么如何在异常操作完成通知使用者,获取异常操作的结果呢?
在
jdk
中提供了Future
接口,来获取异常操作的结果,我们先从这个Future
接口开始讲起。
一. java.util.concurrent.Future 类
package java.util.concurrent;
public interface Future<V> {
/**
* 取消当前的Future。会唤醒所有等待结果值的线程,抛出CancellationException异常
* @param mayInterruptIfRunning 是否中断 计算结果值的那个线程
* @return 返回true表示取消成功
*/
boolean cancel(boolean mayInterruptIfRunning);
// 当前的Future是否被取消,返回true表示已取消。
boolean isCancelled();
// 当前Future是否已结束。包括运行完成、抛出异常以及取消,都表示当前Future已结束
boolean isDone();
// 获取Future的结果值。如果当前Future还没有结束,那么当前线程就等待,
// 直到Future运行结束,那么会唤醒等待结果值的线程的。
V get() throws InterruptedException, ExecutionException;
// 获取Future的结果值。与get()相比较多了允许设置超时时间。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
这个接口的方法都写好注释了,很容易理解。
我们思考一下,jdk
为什么给这个接口定义这五个方法呢?
首先我们考虑一下,对于一个异步任务,使用者关心那些方面:
- 获取异步操作结果,如果异步操作还未完成,那么就阻塞当前线程,直到异步操作完成,唤醒当前等待结果值的线程,得到结果。如果获取过程发生异常,那么也唤醒当前等待结果值的线程,抛出异常。这就是
V get()
方法.- 为了防止异步操作太耗时,等待结果线程一直被阻塞?提供两种方式:
- 获取结果时,可以设置超时时间,即
V get(long timeout, TimeUnit unit)
方法。- 可以手动取消这个异步操作,即
boolean cancel(boolean mayInterruptIfRunning)
方法。
具体可以参考我的 Java线程池_Future与Callable原理分析 这一章,有更详细的分析。
但是 java.util.concurrent.Future
接口有一个非常大的缺陷,它只能通过线程阻塞的方式等待异步任务完成。
可能有人觉得这个有什么大问题么?因为想获取异步操作结果的线程,只能阻塞等待,那么在异步操作完成这段时间里,这个线程不能做其他事情,全部时间都浪费在等待结果值上了。
根本原因是java.util.concurrent.Future
接口没有提供一种通知的方式,当异步操作完成时,直接将结果值回传给想获取值的线程,而不需要它们等待啊。
因此针对这种情况,netty
提供了自己的 Future
接口。
二. io.netty.util.concurrent.Future 类
2.1 Future
类源码
public interface Future<V> extends java.util.concurrent.Future<V> {
/**
* 只有当异步操作成功后,这个方法才会返回 true。
* 否则不管异步操作失败完成,还是被取消完成,这个方法都是返回false
*/
boolean isSuccess();
/**
* 只有当这个异步操作可以被取消,即可以调用 {@link #cancel(boolean)} 方法,这个方法才返回 true
*/
boolean isCancellable();
/**
* 只有当异步操作失败或者异步操作被取消,这个方法才会返回,异步操作失败的异常原因。
* 注意:如果异步任务是被取消的,那么这个方法返回 CancellationException 异常。
* 只有当异步操作成功完成或者还未完成,这个方法才是返回 null
*/
Throwable cause();
/**
* 添加一个回调监听方法,当异步操作完成时,就会调用这个监听方法。
* 异步任务完成,即包括运行完成、抛出异常以及取消
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* 添加多个回调监听方法,当异步操作完成时,就会调用这个监听方法。
* 异步任务完成,即包括运行完成、抛出异常以及取消
*/
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* 移除一个回调监听方法
*/
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* 移除多个回调监听方法
*/
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* 当前线程等待,直到这个异步操作完成。但是如果是异常完成,即 cause 方法有返回值,那么就抛出这个异常。
*/
Future<V> sync() throws InterruptedException;
/**
* 与 sync 方法作用相同,只不过不会抛出中断异常
*/
Future<V> syncUninterruptibly();
/**
* 当前线程等待,直到这个异步操作完成。
* 注:这个方法和 sync 方法的区别时,如果异步操作出现异常,并不会抛出这个异常
*/
Future<V> await() throws InterruptedException;
/**
* 当前线程等待,直到这个异步操作完成。
*/
Future<V> awaitUninterruptibly();
/**
* 当前线程等待,直到这个异步操作完成或者等待时间超时。
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 当前线程等待,直到这个异步操作完成或者等待时间超时。
*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
* 当前线程等待,直到这个异步操作完成或者等待时间超时。
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* 当前线程等待,直到这个异步操作完成或者等待时间超时。
*/
boolean awaitUninterruptibly(long timeoutMillis);
/**
* 不会阻塞当前线程,直接返回值。如果异步操作还未完成,那么返回值为 null
*/
V getNow();
/**
* 手动取消当前异步操作
*/
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
我们可以看到 Netty
的 Future
比 jdk
中的 Future
多了很多方法:
-
isSuccess()
和cause()
方法,能够知道当前Future
到底是成功了,还是失败了。而jdk
中的Future
只有isDone()
表示当前Future
已完成了。而当前Future
是否失败,只能通过get()
抛出异常才知道。 -
boolean isCancellable()
表示当前这个异步操作能不能取消。如果返回false
表示这个异步操作不能被取消。 -
addListener
和removeListener
系列方法,这个就是比jdk
中的Future
多的核心方法,注册或移除回调GenericFutureListener
,当异步操作Future
完成后,会回调GenericFutureListener
传递结果值。因此
netty
的Future
就不需要一直等待异步操作的完成了,只需要注册回调GenericFutureListener
就可以了。 -
sync
和await
系列方法,提供同步获取结果值的方法,即当前线程阻塞,一直等到Future
完成。其实
sync
系列方法方法就是调用await
系列方法实现的。只不过当异步操作发生异常时,sync
系列方法会重新抛出这个异常。 -
getNow
方法:不会阻塞当前线程,直接返回值。如果异步操作还未完成,那么返回值为null
。
2.2 AbstractFuture
类
public abstract class AbstractFuture<V> implements Future<V> {
@Override
public V get() throws InterruptedException, ExecutionException {
// 通过 await() 方法,等待异步操作 Future 执行完成
await();
// 先判断异步操作过程中是否产生异常
Throwable cause = cause();
// 没有异常,直接返回结果值
if (cause == null) {
return getNow();
}
// 判断是取消异步操作异常还是其他异常
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 抛出执行异常
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 通过 await(timeout, unit) 方法,超时等待异步操作 Future 执行完成
if (await(timeout, unit)) {
// 先判断异步操作过程中是否产生异常
Throwable cause = cause();
if (cause == null) {
return getNow();
}
// 判断是取消异步操作异常还是其他异常
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 抛出执行异常
throw new ExecutionException(cause);
}
// 抛出超时异常
throw new TimeoutException();
}
}
AbstractFuture
主要是为了实现 java.util.concurrent.Future
中的两个方法,这样子类就不用继续实现了。
当然它们都是通过
await
系列方法实现对应功能的。
2.3 重要的子接口
2.3.1 Promise
接口
public interface Promise<V> extends Future<V> {
/**
* 手动设置异步操作成功完成,并唤醒那些等待异步操作结果值的线程。
* 如果这个异步操作已完成,那么调用这个方法会抛出 {@link IllegalStateException} 异常
*/
Promise<V> setSuccess(V result);
/**
* 尝试设置异步操作成功完成,并唤醒那些等待异步操作结果值的线程。
* 如果返回 true,表示设置成功,如果返回false,说明这个异步操作已经完成了。
*/
boolean trySuccess(V result);
/**
* 手动设置异步操作失败完成,并唤醒那些等待异步操作结果值的线程。
* 如果这个异步操作已完成,那么调用这个方法会抛出 {@link IllegalStateException} 异常
*/
Promise<V> setFailure(Throwable cause);
/**
* 尝试设置异步操作失败完成,并唤醒那些等待异步操作结果值的线程。
* 如果返回 true,表示设置成功,如果返回false,说明这个异步操作已经完成了。
*/
boolean tryFailure(Throwable cause);
/**
* 强制设置这个异步操作不能被取消。
* 如果返回true,表示设置成功。 如果返回false,表示这个异步操作已经被取消了
*/
boolean setUncancellable();
// 剩下的方法 与 Future 一样,复写方法只不过是为了更换返回值 Promise。
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
这个是 Future
最重要的一个子接口,它对比Future
,多了一个非常神奇的功能,Promise
可以主动设置异步操作结果。而 Future
只能等待异步操作完成,或者手动取消这个异步任务。
- 通过
setSuccess(V result)
和trySuccess(V result)
来手动设置异步操作成功完成。- 通过
setFailure(Throwable cause)
和tryFailure(Throwable cause)
来手动设置异步操作失败完成。- 通过
setUncancellable()
方法强制设置这个异步操作不能被取消。- 最后在
Future
接口中,所有返回值是Future
方法都要被复写,返回值变成Promise
。
2.3.2 ChannelFuture
接口
public interface ChannelFuture extends Future<Void> {
/**
* 返回一个通道,与此通道相关联的I/O操作在该通道中发生。
* @return
*/
Channel channel();
// 剩下的方法 与 Future 一样,复写方法只不过是为了更换返回值 ChannelFuture。
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
/**
* 如果ChannelFuture是一个void future,则返回true,因此不允许调用以下任何方法:
* <ul>
* <li>{@link #addListener(GenericFutureListener)}</li>
* <li>{@link #addListeners(GenericFutureListener[])}</li>
* <li>{@link #await()}</li>
* <li>{@link #await(long, TimeUnit)} ()}</li>
* <li>{@link #await(long)} ()}</li>
* <li>{@link #awaitUninterruptibly()}</li>
* <li>{@link #sync()}</li>
* <li>{@link #syncUninterruptibly()}</li>
* </ul>
*/
boolean isVoid();
}
这个ChannelFuture
接口也是比较重要的子接口,它将 Future
和 Channel
联系到一起了。当然它还有什么 ChannelPromise
和 ChannelProgressivePromise
这些衍生接口啊,这不过都多了几个方法。
2.3.3 ChannelGroupFuture
接口
public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture> {
/**
* 返回与此future关联的ChannelGroup。
*/
ChannelGroup group();
/**
* 返回与指定通道相关联的单个I/O操作的ChannelFuture。
*
* @return 如果找到匹配的ChannelFuture。否则无效。
*/
ChannelFuture find(Channel channel);
/**
* 当且仅当与此future关联的所有I/O操作都成功而没有任何失败时返回true。
*/
@Override
boolean isSuccess();
@Override
ChannelGroupException cause();
/**
* 当且仅当与此未来相关的I/O操作部分成功而部分失败时返回true。
*/
boolean isPartialSuccess();
/**
* 当且仅当与此未来相关的I/O操作部分失败而部分成功时返回true。
*/
boolean isPartialFailure();
@Override
ChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelGroupFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelGroupFuture await() throws InterruptedException;
@Override
ChannelGroupFuture awaitUninterruptibly();
@Override
ChannelGroupFuture syncUninterruptibly();
@Override
ChannelGroupFuture sync() throws InterruptedException;
/**
* 返回枚举与此future关联的所有ChannelFutures的迭代器。
* 请注意,返回的Iterator是不可修改的,
* 这意味着ChannelFuture不能从这个future中删除。
*/
@Override
Iterator<ChannelFuture> iterator();
}
这个ChannelGroupFuture
接口也算重要的子接口,它将 Future
和 ChannelGroup
联系到一起了。
2.4 小结
Future
和它几个比较重要的子接口都讲解完了,下面我们将介绍它最重要的一个实现类 DefaultPromise
, Future
和 Promise
的基本功能都是在这个里面完成的。
三. DefaultPromise 类
3.1 重要静态属性
// 通过 cas 的方式,更改 result 变量
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 如果异步操作Future 没有结果值,那么就设置成SUCCESS, 和初始值 null 做区别
private static final Object SUCCESS = new Object();
// 将异步操作Future 设置成不可取消时,result的值就设置成 UNCANCELLABLE, 表示此时不可取消
private static final Object UNCANCELLABLE = new Object();
// 异步操作Future被取消后,result的值就设置成 CANCELLATION_CAUSE_HOLDER, 表示异步操作已经取消了啊
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
3.2 重要成员属性
// 异步操作Future的结果值
private volatile Object result;
// 与这个异步操作Future 关联的 事件执行器executor,
// 监控异步任务结果值的回调都必须在这个事件执行器executor中执行。
private final EventExecutor executor;
/**
* 一个或多个侦听器。
* 一个侦听器是 GenericFutureListener;
* 多个侦听器是 DefaultFutureListeners。
* 如果为 `null`, 则意味着要么没有添加监听器,要么所有监听器都得到了通知。
*/
private Object listeners;
/**
* 表示当前阻塞等待结果值的线程数量
* 线程同步。我们需要持有同步锁才能调用Java底层的wait()/notifyAll()。
*/
private short waiters;
/**
* 表示当前是否正在进行通知
* 线程同步,如果执行器发生变化,我们必须防止并发通知冲突,而是按照FIFO进行侦听器通知。
*/
private boolean notifyingListeners;
-
result
表示异步操作Future
的结果值,它是最重要的属性,也代表当前异步操作Future
的状态。- 当
result == null
表示异步操作Future
还没有完成。 - 当
result == UNCANCELLABLE
异步操作Future
也没有完成,但是这个异步操作Future
不可被取消。 - 当
result instanceof CauseHolder
表示异步操作Future
发生了异常。也分为两种:result.cause instanceof CancellationException
表示异步操作取消完成,其他的就表示异步操作异常完成。 - 除了上述三种情况,就表示异步操作
Future
成功完成。
- 当
-
executor
: 与这个异步操作Future
关联的 事件执行器executor
,监控异步任务结果值的回调都必须在这个事件执行器executor中执行。 -
waiters
表示正在同步阻塞等待异步操作结果的等待者数量,一般都是调用await
系列方法产生的。 -
listeners
和notifyingListeners
和异步任务结果监听器通知有关,防止并发通知冲突。
3.3 重要方法
3.3.1 设置异步操作完成
- 成功完成
private boolean setSuccess0(V result) { // 通过 setValue0 方法设置完成 // 如果 result == null,就使用 SUCCESS,和 null 进行区别 return setValue0(result == null ? SUCCESS : result); } @Override public Promise<V> setSuccess(V result) { // 通过 setSuccess0 尝试设置成功,如果成功就返回,失败就抛出异常 if (setSuccess0(result)) { return this; } throw new IllegalStateException("complete already: " + this); } @Override public boolean trySuccess(V result) { // 通过 setSuccess0 尝试设置成功 return setSuccess0(result); }
最终都是通过
setValue0()
方法进行异步操作完成设置。 - 失败完成
/** * 检查是否有等待者waiters,如果有,通知他们。 */ private synchronized boolean checkNotifyWaiters() { if (waiters > 0) { // 唤醒那些正在阻塞等待异步操作结果的线程 notifyAll(); } // 异步操作监听器是否有 return listeners != null; } private boolean setFailure0(Throwable cause) { // 通过 setValue0 方法设置完成, // 创建 CauseHolder 对象包装 cause, // 这样只要是 CauseHolder 实例,就表示失败了啊 return setValue0(new CauseHolder(checkNotNull(cause, "cause"))); } @Override public Promise<V> setFailure(Throwable cause) { // 通过 setFailure0 尝试设置失败, // 如果设置成功就返回,设置失败就抛出异常 if (setFailure0(cause)) { return this; } throw new IllegalStateException("complete already: " + this, cause); } @Override public boolean tryFailure(Throwable cause) { // 通过 setFailure0 尝试设置失败 return setFailure0(cause); }
最终都是通过
setValue0()
方法进行异步操作完成设置。 - 设置完成
private boolean setValue0(Object objResult) { // 未完成情况有两种,result == null 或 result == UNCANCELLABLE // 使用 CAS 来更新,如果失败,说明异步操作被其他线程完成了,那么直接返回 false // 如果成功,那么表示异步操作完成了,就要通知监听器 和 阻塞等待异步任务结果的线程 if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { // 这里需要通知两个: // 1. 同步阻塞等待异步任务结果的线程,调用 await 系列方法 // 2. 监听异步任务完成的监听器 if (checkNotifyWaiters()) { // 通知监听器,异步任务已经完成了 notifyListeners(); } return true; } return false; }
当异步操作完成后,即需要通知异步任务完成的监听器,还需要唤醒同步阻塞等待异步任务结果的线程。
3.3.2 异步操作状态
-
是否完成
private static boolean isDone0(Object result) { // result 不是 null 和 UNCANCELLABLE,就表示这个异步操作已经完成 return result != null && result != UNCANCELLABLE; } @Override public boolean isDone() { return isDone0(result); }
-
是否成功完成
@Override public boolean isSuccess() { Object result = this.result; // 成功完成,首先必须已经完成,即 result != null && result != UNCANCELLABLE // 不能是失败完成,即 !(result instanceof CauseHolder) return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); }
-
是否可取消
@Override public boolean isCancellable() { // 只有 result == null 时,这个异步操作才能取消 return result == null; }
-
是否已取消
private static boolean isCancelled0(Object result) { // 结果值 result 必须 CauseHolder,且 cause 是 CancellationException 异常 return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; }
-
获取结果值的异常
@Override public Throwable cause() { return cause0(result); } private Throwable cause0(Object result) { // 如果 result 不是 CauseHolder 实例,就没有异常,直接返回 if (!(result instanceof CauseHolder)) { return null; } if (result == CANCELLATION_CAUSE_HOLDER) { CancellationException ce = new LeanCancellationException(); if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) { return ce; } result = this.result; } // 返回包裹的异常 return ((CauseHolder) result).cause; }
3.3.3 同步等待结果
要同步等待异步操作结果,基本上都是靠 await
系列方法实现的。
-
await()
没有超时时间等待方法protected void checkDeadLock() { EventExecutor e = executor(); // 等待结果线程与异步操作线程是同一个线程 if (e != null && e.inEventLoop()) { throw new BlockingOperationException(toString()); } } @Override public Promise<V> await() throws InterruptedException { // 如果已经完成,那么不用等待了,直接返回 if (isDone()) { return this; } // 如果其他线程已经调用了本线程中断方法,那么直接就直接抛出中断异常 if (Thread.interrupted()) { throw new InterruptedException(toString()); } // 检查有没有产生死锁情况,即在异步操作线程中调用await()方法等待操作完成。 // 抛出 BlockingOperationException 异常 checkDeadLock(); // 获取同步锁 synchronized (this) { // 只要异步操作没有完成,继续循环 while (!isDone()) { // 将 waiters 值加一,表示多一个等待者 incWaiters(); try { // 等待,直到 checkNotifyWaiters 方法中调用 notifyAll() 方法唤醒 wait(); } finally { // 已经完成,最后将waiters 值减一 decWaiters(); } } } return this; }
利用同步锁
synchronized
和wait()
方法,让当前线程等待,直到checkNotifyWaiters
方法中调用notifyAll()
方法唤醒。 -
await0(long timeoutNanos, boolean interruptable)
带超时时间等待方法private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { // 如果已经完成,那么不用等待了,直接返回 true if (isDone()) { return true; } // 超时时间已经到了,返回 isDone() 的值 if (timeoutNanos <= 0) { return isDone(); } // 需要抛出中断异常时,才进行判断 if (interruptable && Thread.interrupted()) { throw new InterruptedException(toString()); } // 检查有没有产生死锁情况,即在异步操作线程中调用await()方法等待操作完成。 // 抛出 BlockingOperationException 异常 checkDeadLock(); // 从这里开始计算时间,而不是从方法的第一行开始计算,以避免和推迟 System.nanoTime()的性能成本。 final long startTime = System.nanoTime(); // 获取同步锁 synchronized (this) { // 是否有中断异常 boolean interrupted = false; try { // 记录超时时间 long waitTime = timeoutNanos; // 只有当异步操作没有完成 且 超时时间waitTime还大于0的时候,继续循环 while (!isDone() && waitTime > 0) { // 将 waiters 值加一,表示多一个等待者 incWaiters(); try { // 调用 wait 超时的等待方法进行等待。 // 直到超时时间到了或者checkNotifyWaiters 方法中调用 notifyAll() 方法唤醒 wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } finally { // 已经完成,最后将waiters 值减一 decWaiters(); } // 提前检查isDone(),避免以后计算耗时。 if (isDone()) { return true; } // 在这里而不是在while条件中计算经过的时间,尽量避免在while的第一个循环中System.nanoTime()的性能代价。 waitTime = timeoutNanos - (System.nanoTime() - startTime); } return isDone(); } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } }
和
await
方法实现基本逻辑差不多,只不过多了超时的判断。@Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } @Override public boolean await(long timeoutMillis) throws InterruptedException { return await0(MILLISECONDS.toNanos(timeoutMillis), true); } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { try { return await0(unit.toNanos(timeout), false); } catch (InterruptedException e) { // Should not be raised at all. throw new InternalError(); } } @Override public boolean awaitUninterruptibly(long timeoutMillis) { try { return await0(MILLISECONDS.toNanos(timeoutMillis), false); } catch (InterruptedException e) { // Should not be raised at all. throw new InternalError(); } }
-
sync
系列方法,都是借助await
实现的private void rethrowIfFailed() { Throwable cause = cause(); if (cause == null) { return; } // 如果有异常,就抛出 PlatformDependent.throwException(cause); } @Override public Promise<V> sync() throws InterruptedException { await(); // 如果有异常,重新抛出异常 rethrowIfFailed(); return this; } @Override public Promise<V> syncUninterruptibly() { awaitUninterruptibly(); // 如果有异常,重新抛出异常 rethrowIfFailed(); return this; }
3.3.4 监听器相关
-
添加监听器
@Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); // 使用同步锁,防止并发冲突 synchronized (this) { // 添加监听器 addListener0(listener); } // 如果已经完成,那么就要通知监听器 if (isDone()) { notifyListeners(); } return this; } @Override public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { checkNotNull(listeners, "listeners"); // 使用同步锁,防止并发冲突 synchronized (this) { for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null) { break; } // 添加监听器 addListener0(listener); } } // 如果已经完成,那么就要通知监听器 if (isDone()) { notifyListeners(); } return this; } private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { // 当前 listeners 是 null, 直接将 listener 赋值给listeners if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { // 如果 listeners 已经是DefaultFutureListeners,调用它的 add 方法添加监听器 ((DefaultFutureListeners) listeners).add(listener); } else { // 创建新的 DefaultFutureListeners 对象,包含两个监听器 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } }
-
移除监听器
@Override public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { removeListener0(listener); } return this; } @Override public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) { checkNotNull(listeners, "listeners"); synchronized (this) { for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null) { break; } removeListener0(listener); } } return this; } private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners instanceof DefaultFutureListeners) { // 如果 listeners 是DefaultFutureListeners,调用它的remove 移除对应监听器 ((DefaultFutureListeners) listeners).remove(listener); } else if (listeners == listener) { // 唯一一个监听器移除了,将listeners重新设置成 null listeners = null; } }
-
通知监听器
private void notifyListeners() { // 获取事件执行器 EventExecutor executor = executor(); // 当前线程是不是事件执行器的线程 if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { // 通知监听器 notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } // 将通知监听器调用包装成一个 Runnable ,交给事件执行器, // 保证 notifyListenersNow 方法是在事件执行器线程中执行 safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }
保证通知
notifyListenersNow()
操作是在事件执行器的线程中调用。private void notifyListenersNow() { Object listeners; // 使用同步锁,防止并发冲突 synchronized (this) { // 当监听器listeners为null或者 现在已经在通知了 notifyingListeners == true,直接返回 if (notifyingListeners || this.listeners == null) { return; } // 表示正在通知 notifyingListeners = true; listeners = this.listeners; this.listeners = null; } // 这里使用死循环,是为了处理在通知过程中,又有新的监听器添加了,也要通知这些添加的监听器 for (;;) { if (listeners instanceof DefaultFutureListeners) { // 如果是 DefaultFutureListeners,那么遍历一下,每个监听器都要通知 notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this, (GenericFutureListener<?>) listeners); } // 使用同步锁,处理可能添加的新的监听器 synchronized (this) { // listeners 还是 null,说明没有新的监听器添加 if (this.listeners == null) { // 将 notifyingListeners 设置成 false,表示已经完成通知了 // 因为这个内部方法不会抛出异常,所以设置 notifyingListeners = false不用放到 finally 的代码块中。 notifyingListeners = false; return; } // 得到新添加的监听器,进行通知 listeners = this.listeners; // 再将 this.listeners 设置为 null this.listeners = null; } } } private void notifyListeners0(DefaultFutureListeners listeners) { GenericFutureListener<?>[] a = listeners.listeners(); int size = listeners.size(); for (int i = 0; i < size; i ++) { // 遍历 DefaultFutureListeners 中的监听器 notifyListener0(this, a[i]); } } @SuppressWarnings({ "unchecked", "rawtypes" }) private static void notifyListener0(Future future, GenericFutureListener l) { try { // 回调监听器的方法,进行通知 l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } } }
在
notifyListenersNow()
方法中,为了防止并发冲突,使用同步锁;为了防止在通知过程中,新添加监听器不能通知,使用for (;;)
死循环,保证新添加监听器也能被通知到。
网友评论