在基于nio的编程中,一般是声明一个ServerSocketChannel对象,然后注册到selector中,监听accept事件,当有新的连接请求时,select方法返回,通过ServerSocketChannel的accept方法获取到新建立的SocketChannel连接,并注册到selector,同时监听read事件,那么netty是如何处理这一过程的呢
我们先来看下一个netty的常规编程模式,然后再跟踪源码,来了解这个注册监听流程
//1. 启动器,负责组装netty组件,启动服务器
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
new ServerBootstrap()
.group(bossGroup, workerGroup)
//选择服务器ServerSocketChannel的实现
.channel(NioServerSocketChannel.class)
//4 boss 负责处理连接 worker(child)负责处理读写,决定worker能执行哪些操作
.childHandler(
//5 代表和客户端进行数据读写的通道
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
.bind(8899);
这里面除了ServerBootstrap组件没有讲过,其他的在前面的几篇文章都说过了,可以简单的理解为ServerBootstrap就是负责连接各个组件,启动服务器。
这里直接看下bind方法的逻辑
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
doBind方法,大概可以分为这三个大步骤init、register、doBind0
其中,init就是初始化一个Channel对象,生成关联的pipeline对象,并且添加一个入站处理器ChannelInitializer,这个入站处理器的handlerAdd方法会在channel注册到selector后才会被调用,handlerAdd方法
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
这里面有一个很重要是,当handlerAdd方法触发时,又会往pipeline添加一个入站处理器ServerBootstrapAcceptor,并且将当前的处理器从pipeline移除。这个处理器就是用来注册新连接的读事件。
初始化channel对象后,接着就调用register方法,就会从NioEventLoopGroup(boss group)中挑选出一个NioEventLoop对象来注册channel,而NioEventLoop对象对channel的注册逻辑,跟进代码可以看到如下
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//省略....
}
}
}
这里最重要的有一个判断eventLoop.inEventLoop(),这个是判断currentThread是否为eventLoop的执行线程(EventLoop第一次执行任务时会关联到一个thread对象),若是的话,直接执行register0方法;不是的话,将其作为runnable对象提交给eventLoop的任务队列,在事件循环中处理。
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
这个注册方法里面第一步先执行doRegister,这个就是将channel注册到selector上,但此时并没有监控accept事件。然后再调用pipeline上ChannelHandler的handlerAdd方法,这里会执行上面提到的ChannelInitializer的initChannel方法,添加一个ServerBootstrapAcceptor入站处理器。
接着,调用safeSetSuccess方法,表明这个channel的注册完成,回调promise的监听器,然后调用pipeline上的ChannelHandler的channelRegistered方法。方法后面会判断isActive方法,因为netty中调用register方法是异步的,当方法register返回后,便会紧接着调bind方法,也就是刚刚说的三大步骤的第三部doBind0方法。
网友评论