之前在讲channel是如何完成注册以及连接到服务端的时候,有这么一段代码。
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
1.完成初始化以及注册
final ChannelFuture regFuture = initAndRegister();
2.获取回调
final Channel channel = regFuture.channel();
3.如果完成了就直接连接服务端
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
连接服务端的代码
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
4.如果暂时没有完成注册
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
通过监听器,在完成注册之后,执行需要进行的操作。
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
}
上面的代码其实蛮简单的,但是其实也涉及到不少东西,比如ChannlFutrue,ChannelFuture,Promise。
下面先来说说基本的概念。
Future
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isCancellable();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> sync() throws InterruptedException;
Future<V> syncUninterruptibly();
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
V getNow();
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
从源码上看,这个接口其实是对java提供的Future进行拓展。
加了许多的新方法。
但这里感觉最重要的,就是对于监听器的增删改。GenericFutureListener。
说明了这个接口的基本功能。回调功能,以及监听器的管理(曾删改)
下面来说说GenericFutureListener
GenericFutureListener
public interface EventListener {
}
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
void operationComplete(F future) throws Exception;
}
监听器里面的泛型是netty自己定义的Future而不是java本身提供的,这点需要注意。
这个监听器的功能,就是对future进行监听。
ChannelFuture
public interface ChannelFuture extends Future<Void> {
Channel channel();
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
boolean isVoid();
}
ChannelFuture,这个名字很简单粗暴,其实就是将Future与Channel进行了绑定。
同时覆写了Futrue的方法,返回类型都变为ChannelFuture
Promise
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
boolean setUncancellable();
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
Promise是对Future的拓展,比较重要的点,还是对回调监听器的管理。
新添加的功能就是,判断future的状态,同时决定是否要唤醒监听器,或者是否要抛异常。
ChannelPromise
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
@Override
Channel channel();
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();
ChannelPromise unvoid();
}
从结构上来看,大家就知道这个接口具体提供什么功能了。
Promise, 就是对状态的判断,以及根据状态是否对监听器进行唤醒。
Future,回调功能,同时增加监听器的增删改
ChannelFuture,将Future与Channel绑定。
ChannelPromise = Channel + future + Promise
ChannelPromise结构图
说完上面的结构,下面的话就讲一下,netty是如何使用ChannelPromise的。
下面还是通过上面那个例子来说明一下吧。
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
1.完成初始化以及注册=======================先说说如何获取回调
final ChannelFuture regFuture = initAndRegister();
2.获取通道
final Channel channel = regFuture.channel();
3.如果完成了就直接连接服务端
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
连接服务端的代码
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
4.如果暂时没有完成注册
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
通过监听器,在完成注册之后,执行需要进行的操作。
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
}
从channel注册代码可以看到,netty中的注册流程大概是如下:
1.完成初始化以及注册,返回回调
2.从回调中获取通道,以供连接服务端使用
3.通过回调判断是否已经完成注册,从而觉得是进行连接,还是添加监听器。
ChannelFuture的是如何完成监听的
主要的代码位于io.netty.bootstrap.AbstractBootstrap.initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
最重要的还是这里,通过executor去注册通道。
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
一路debug,最终定位到生成ChannelFuture的位置
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
将当前eventLoop以及channel传入,生成DefaultChannelPromise
eventLoop是用于执行任务
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
unsafe其实是具体针对channel相关操作的实现,比如注册。
promise.channel().unsafe().register(this, promise);
return promise;
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
注册的操作
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
注册完成后,设置为成功,这里就是重点需要看的。。。。。。
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
设置成功的方法,这里面看是Promise接口的实现,因为本身promise用的是DefaultChannelPromise
protected final void safeSetSuccess(ChannelPromise promise) {
这里最重要的还是promise.trySuccess()方法。
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
}
DefaultChannelPromise中的实现
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
@Override
public boolean trySuccess() {
trySuccess方法是在父类DefaultPromise中实现的。
return trySuccess(null);
}
}
DefaultPromise中的实现
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
@Override
public boolean trySuccess(V result) {
这个方法必然是返回true
if (setSuccess0(result)) {
唤醒监听器
notifyListeners();
return true;
}
return false;
}
唤醒监听器
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
如果正在唤醒监听器,或者监听器为空则直接返回
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
将监听器列表赋值给局部变量
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
唤醒监听器
notifyListeners0((DefaultFutureListeners) listeners);
} else {
唤醒监听器
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
唤醒监听器之后,将监听器置空,说明其实监听器只会被执行一次。
if (this.listeners == null) {
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
看我们一开始举的例子里面,就是GenericFutureListener接口的实现。
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
这里其实为什么要用监听器呢?
因为很去执行注册,连接服务端的时候都是异步的(通过eventLoop去执行),获取到回调结果的时候,也许已经完成了,也许还没有开始,如果已经完成了的话,就没必要添加监听器了,如果没有完成了的话,添加监听器即可。
在完成操作后,就可以调用监听器去执行相应的操作了。
网友评论