学习目标
- 为什么了解Netty异步监听?
- Netty如何实现异步监听的?
Future简介
我们知道Netty的I/O操作都是异步的,例如bind,connect,write等操作,会返回一个Future接口。Netty源码中大量使用了异步回调处理模式。在做Netty应用开发时,我们也会用到,所以,了解Netty的异步监听,无论是做Netty应用开发还是阅读源码都是十分重要的。
Future接口剖析
image咱们通过上图的Echo Server的bind(...)方法来分析,返回一个ChannelFuture实例,点ChannelFuture接口,来看下类层次结构图:
future-2.png
可知:ChannelFuture继承了Netty的Future,Netty的Future继承了JDK的Future。
JDK的Future,我想大家应该非常熟悉了,用的最多的就是在线程池ThreadPoolExecutor时,通过submit方法提交任务返回一个Future实例,通过它来查询任务的执行状态和执行结果,最用常用的方法isDone()和get()。
Netty的Future,在继承JDK的Future基础上,扩展了自己方法,我们来看下:
public interface Future<V> extends java.util.concurrent.Future<V> {
// 任务执行成功,返回true
boolean isSuccess();
// 任务被取消,返回true
boolean isCancellable();
// 支付执行失败,返回异常
Throwable cause();
// 添加Listener,异步非阻塞处理回调结果
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 移除Listener
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 同步阻塞等待任务结束;执行失败话,会将“异常信息”重新抛出来
Future<V> sync() throws InterruptedException;
Future<V> syncUninterruptibly();
// 同步阻塞等待任务结束,和sync方法一样,只不过不会抛出异常信息
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 非阻塞,获取执行结果
V getNow();
// 取消任务
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
我们知道JDK的Future的任务结果获取需要主动查询,而Netty的Future通过添加监听器Listener,可以做到异步非阻塞处理任务结果,可以称为被动回调。
同步阻塞有两种方式:sync()和await(),区别:sync()方法在任务失败后,会把异常信息抛出;await()方法对异常信息不做任何处理,当我们不关心异常信息时可以用await()。通过阅读源码可知sync()方法里面其实调的就是await()方法。推荐使用:sync()方法。
// DefaultPromise 类
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
我们可以看到Future接口没有和IO操作关联在一起,接下来让我们来看看Future的子接口ChannelFuture,它和IO操作中的channel通道关联在一起了,用于异步处理channel事件,这个接口用的最多。
public interface ChannelFuture extends Future<Void> {
// 获取channel通道
Channel channel();
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
// 标记Futrue是否为Void,如果ChannelFuture是一个void的Future,不允许调// 用addListener(),await(),sync()相关方法
boolean isVoid();
}
ChannelFuture接口相比父类Future接口,就增加了channel()和isVoid()两个方法,其他方法都是覆盖父类接口,没有别扩展。
到这了,顺便了解下ChannelFuture的状态转换:
* <pre>
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = true |
* +--------------------------+ | | isSuccess() = true |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = false | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = true |
* | isCancelled() = false | | | cause() = non-null |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = true |
* | isCancelled() = true |
* +---------------------------+
* </pre>
ChannelFuture就两种状态Uncompleted(未完成)和Completed(完成),Completed包括三种,执行成功,执行失败和任务取消。注意:执行失败和任务取消都属于Completed。
让我们在来看下Future的另一个子接口Promise,它是个可写的Future,到底是写什么东西呢?
public interface Promise<V> extends Future<V> {
// 执行成功,设置返回值,并通知所有listener,如果已经设置,则会抛出异常
Promise<V> setSuccess(V result);
// 设置返回值,如果已经设置,返回false
boolean trySuccess(V result);
// 执行失败,设置异常,并通知所有listener
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 标记Future不可取消
boolean setUncancellable();
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
- Future接口只提供了获取返回值的get()方法,不可设置返回值。
- Promise接口在Future基础上,还提供了设置返回值和异常信息,并立即通知listeners。而且,一旦 setSuccess(...) 或 setFailure(...) 后,那些 await() 或 sync() 的线程就会从等待中返回。
接下来,让我们来看看ChannelFuture的可写的子接口ChannelPromise:
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
// 覆盖ChannelFuture接口
@Override
Channel channel();
// 覆盖Promise接口
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();
ChannelPromise unvoid();
}
ChannelPromise接口只是综合了ChannelFuture和Promise接口,没有新增功能。
截至到目前,异步监听相关的接口已经介绍完了,让我们通过一张图来概括下:
image
Promise的实现类
看看DefaultPromise的主要属性:
// setSuccess设置result为null时,设置成SUCCESS
private static final Object SUCCESS = new Object();
// 不可取消值
private static final Object UNCANCELLABLE = new Object();
// 取消异常信息持有者
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
//执行结果,使用volatile修饰,保证可见性
private volatile Object result;
// 当Promise执行完成时需要通知Listener,此时就使用这个executor
private final EventExecutor executor;
// 要通知的listener,因为它可能是不同的类型,所以定义为Object类型,使用时再判断类型
private Object listeners;
// 要使用wait()/notifyAll()机制,这个变量记录了waiter的数量
private short waiters;
// 是否正在通知listener
private boolean notifyingListeners;
从属性中可以看出, DefaultPromise 通过 Object 的 wait/notify 机制实现线程间的同步,通过 volatile 属性保证线程间的可见性。
通过setSuccess(...)方法,来了解下 执行成功后设置返回值并通知Listener的过程:
@Override
public Promise<V> setSuccess(V result) {
// 第一次成功设置返回值后,返回Promise对象
if (setSuccess0(result)) {
return this;
}
// 否则抛出异常
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
// result为null,设置为SUCCESS
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// CAS设置result值,只有当result为null或者UNCANCELLABLE,才可以执行成功
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 唤醒等待的waiter,同时判断是否存在listener
if (checkNotifyWaiters()) {
// 通知所有的listener
notifyListeners();
}
return true;
}
return false;
}
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
// 通知所有waiters
notifyAll();
}
// 判断是否添加了监听者listeners
return listeners != null;
}
private void notifyListeners() {
EventExecutor executor = executor();
// 是否是同一个线程
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 用自己的executor执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
DefaultPromise的方法实现逻辑挺简单,不再全部讲解了,只需知道通过 Object 的 wait/notify 机制实现线程间的同步和观察者设计模式进行通知就可以了。
DefaultPromise还有个子类DefaultChannelPromise,这个类在Netty中用的最多的,其内部逻辑调用的都是父类DefaultPromise的方法。DefaultChannelPromise类层次结构图如下:
应用实战
public class ChannelFuture_01 {
public static void main(String[] args) throws InterruptedException {
EventExecutor executor = new SelfEventExecutor();
final Promise<String> promise = new DefaultPromise<String>(executor);
promise.addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
System.out.println("执行完成结果:" + future.get());
}
});
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
promise.setSuccess("hello world!");
} catch (Exception ex) {
ex.printStackTrace();
}
}
});
thread.start();
promise.sync();
System.out.println("==================");
}
static class SelfEventExecutor extends SingleThreadEventExecutor {
public SelfEventExecutor() {
this(null);
}
public SelfEventExecutor(EventExecutorGroup parent) {
this(parent, new DefaultThreadFactory(SelfEventExecutor.class));
}
public SelfEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
}
@Override
protected void run() {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
}
Future总结
Netty的Future继承JDK的Future,通过 Object 的 wait/notify机制,实现了线程间的同步;使用观察者设计模式,实现了异步非阻塞回调处理。
网友评论