本篇文章主要梳理了Netty服务端的一个启动过程,比较直接,阅读此篇文章需要对Netty的基本组件以及模型有一个基本的了解。
一个典型的Netty服务端代码如下所示:
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
ch.pipeline().addLast(new LineBasedFrameDecoder(2014));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
步骤如下:
1、创建两个EventLoopGroup
,一个是bossGroup,一个是workerGroup,前者主要负责获取新连接的操作,后者负责处理新连接的I/O操作。
2、定义一个ServerBootStrap
实例,初始化线程池(两个)、channel类型(NioServerSocketChannel
)、channel参数选项(SO_BACKLOG
)、添加NioServerSocketChannel
的handler(LoggingHandler
)以及新连接channel的handler(ChannelInitializer
)。
3、绑定端口号,启动服务器,主线程同步阻塞等待。
4、服务端的channel关闭之后,优雅关闭线程池
========================bind(port)=================================
bootstrap的bind
调用链:
AbstractBootstrap.bind(int inetPort)
-> AbstractBootstrap.bind(SocketAddress localAddress)
-> AbstractBootstrap.doBind(final SocketAddress localAddress)
最终调用的是下面的逻辑:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
通过上面的代码,可以看出来doBind
主要做了两件事:
1、initAndRegister()
;主要负责初始化NioServerSocketChannel
实例、以及将该channel注册到eventLoop等操作。
2、绑定地址;如果上一步的注册操作完成了,直接绑定地址, 没完成的话,对注册返回的Future
绑定监听器,在监听器中绑定地址。
======================initAndRegister()========================
initAndRegister()
的逻辑,去除了非关键的try catch
语句以及相关的异常处理:
final ChannelFuture initAndRegister() {
Channel channel = null;
channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
通过上面的代码,可以看出来initAndRegister
主要做了以下几件事:
1、创建ServerSocketChannel
实例,并为之创建了关联ChannelPipeline
实例,具体逻辑可以看ServerSocketChannel
的无参构造函数,这里就不详述了。
2、执行init
方法,进行channel的初始化操作。
3、执行注册逻辑,将ServerSocketChannel
实例注册到NioEventLoop
中。
======================init逻辑==============================
init
方法在ServerBootStrap
类中实现:
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
通过上面的代码,可以看出来初始化操作的主要逻辑有:
1、设置NioServerSocketChannel
的options参数以及attrs参数。
2、准备NioSocketChannel
的各种参数配置,包括eventLoop线程池、事件handler、options参数以及attrs参数,以备ServerBootstrapAcceptor
之用。
3、向NioServerSocketChannel
实例的pipeline中添加一个ChannelInitializer
实例。ChannelInitializer
的主要作用就是负责channel被注册到eventLoop后的初始化操作。
这里重点看一下pipeline的addLast()
操作,该方法在DefaultChannelPipeline
实现:
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
for (ChannelHandler h: handlers) {
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1、
checkMultiplicity(handler);
//2、
newCtx = newContext(group, filterName(name, handler), handler);
//3、
addLast0(newCtx);
//4、
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
//下面的逻辑都不会执行
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
主要的执行逻辑就在上面这个addLast
方法里,可以看一下该方法的主要逻辑,这里被添加进pipeline的handler就是上面的ChannelInitializer
实例:
1、首先检查要添加的handler的重复性。如果该handler不是Sharable
的并且已经被添加到其他的pipeline,就抛异常。
2、根据所给的eventLoopGroup、handler的name以及handler创建一个ChannelHandlerContext
实例。
3、将新创建的context实例添加到pipeline的双向链表中。
4、此时registered
为false
,也就是说pipeline对应的channel还没有被注册到eventLoop中,那么就设置该context的状态为ADD_PENDING
,同时将该context封装成一个PendingHandlerAddedTask
实例,将该实例添加到pipeline的一个专门的链表中,以备在channel被注册到eventLoop后执行该实例的调用。简单地看一下具体实现:
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
pendingHandlerCallbackHead
就是pipeline的一个专门的链表,用来维护需要执行的PendingHandlerAddedTask
实例任务。
以上的逻辑就是init初始化的一个具体操作。
======================register()=============================
接下来是register
逻辑,首先回顾一下initAndRegister
方法:
final ChannelFuture initAndRegister() {
Channel channel = null;
channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
register
的调用链(config().group()
返回的就是bossGroup):
MultithreadEventLoopGroup.register(Channel channel)
-> SingleThreadEventLoop.register(Channel channel)
-> SingleThreadEventLoop.register(final ChannelPromise promise)
-> AbstractUnsafe.register(EventLoop eventLoop, final ChannelPromise promise)
调用的代码逻辑如下所示:
public ChannelFuture register(Channel channel) {
//在EventLoopGroup中选择一个EventLoop线程进行注册操作。
return next().register(channel);
}
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
promise.channel().unsafe().register(this, promise);
return promise;
}
//AbstractUnsafe类
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//前面是检查操作,不贴代码了,
//如果eventLoop == null,抛出异常
//如果该Channel已经注册过,ChannelPromise设置失败
//如果该eventLoop不是NioEventLoop,ChannelPromise设置失败
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//异常处理
}
}
}
register
的主要操作逻辑如下:
1、将channel注册到所选择的eventLoop上,执行到这,channel就算找到了自己的归属。之后在该channel的整个生命周期内,所有的事件执行操作都由eventLoop负责。
2、判断当前执行线程是不是eventLoop所属的线程
如果是,直接执行register0
操作
否则,在eventLoop的线程中执行register0
3、如果你自己调试的话,此时你会发现eventLoop里面的线程属性还为空,所以当前执行线程肯定不是eventLoop中的线程。那么eventLoop中的线程什么时候创建呢,别急,往下看。
紧接着会执行eventLoop.execute()
(SingleThreadEventExecutor
类中):
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
**startThread();**
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
当前执行线程还是主线程,虽然进入到eventLoop的执行逻辑里面,但是线程还没有切换(也没有线程可以切换,因为此时eventLoop的线程还没有被创建)。所以紧接着会执行下面的逻辑。
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
//当前线程就是新创建的线程,分配给eventLoop:eventloop.thread=Thread.currentThread();
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//非关键代码不贴
}
}
});
}
上面代码中executor的类型是ThreadPerTaskExecutor
,其execute()
方法的逻辑就是每来一个任务,创建一个新线程执行,所以上面的new Runnable(...)
任务会在新创建的线程中执行,而这个新创建的线程就被分配给了eventLoop。
//ThreadPerTaskExecutor
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
到现在为止,eventLoop中的新线程已经开始执行任务了,什么任务?那就是NioEventLoop
中的run
方法:
SingleThreadEventExecutor.this.run();
回到eventLoop中的execute
代码中来,现在startThread
方法已经执行完,新线程已经创建,正在运行当中,接下来就是向eventLoop所属的队列中添加任务。代码比较简单就不贴了。
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
添加了一个什么任务呢?就是执行register0
的任务。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
任务添加到队列之后,eventLoop线程就去队列中取任务,然后执行。所以register0
是在eventLoop线程中执行的。
下面是eventLoop现成的调用栈,可以沿着调用栈查看eventLoop线程的执行逻辑,这里就不详述了:
image.png
=========================register0==========================
以下逻辑都是在eventLoop线程中执行的:
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See [https://github.com/netty/netty/issues/4805](https://github.com/netty/netty/issues/4805)
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
}
}
从上面的代码可以看出,主要的逻辑有:
1、执行doRegister
操作,将channel注册到selector上。
2、设置已注册标志:registered=true
3、回调handlerAdded
方法
4、回调ChannelRegistered
方法
这里重点看一下3和4:
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
HandlerAdded
方法确保是在eventLoop线程中执行的。可以看到callHandlerAddedForAllHandlers
的方法比较简单:首先拿到PendingHandlerCallBack
的链表头,依次执行链表中的每一个任务。PendingHandlerCallBack
实现了Runnable
接口,可以被线程调用。
private abstract static class PendingHandlerCallback implements Runnable
下面是PendingHandlerAddedTask
实现的execute
方法:
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
//当前线程不是eventLoop线程,调用eventLoop线程执行this任务,
//最终还是执行callHandlerAdded0(ctx)方法,看下面实现的run方法。
executor.execute(this);
} catch (RejectedExecutionException e) {
remove0(ctx);
ctx.setRemoved();
}
}
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
eventLoop线程直接执行callHandlerAdded0(ctx)
方法。
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
//异常处理,将handler从pipeline中移除掉,并调用handler的handlerRemoved方法。
}
}
这里面,ctx.handler()
返回的就是之前init过程中的ChannelInitializer
实例。所以这里调用的是ChannelInitializer
里面的handlerAdded
方法。
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
initChannel
主要做了这几件事:
1、初始化channel,这里面的initChannel
方法就是在init
里面实现的。
2、出现异常的话进行异常捕获
3、最终,将ChannelInitializer
的实例从pipeline中移除。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
initChannel
的逻辑就是:
1、向pipeline中添加用户在ServerBootStrap
中指定的handler。
2、调用eventLoop线程执行pipeline的addLast
逻辑,向pipeline中添加一个ServerBootstrapAcceptor
实例,该实例的作用就是当有连接到来,创建了新channel之后,对该channel进行初始化。
这里由于当前线程就是eventLoop线程,所以execute
执行逻辑就是将该任务添加到队列中,等该线程执行完当前任务后再从队列中取任务执行:
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
//非关键代码不贴
}
//非关键代码不贴
}
以上的逻辑就是channel注册到eventLoop之后回调的handlerAdded
方法。一句话,就是执行ChannelInitializer
的initChannel
方法,对channel进行初始化,添加额外的handler。
接下来将回调channelRegistered
方法。
pipeline.fireChannelRegistered();
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
回调完成之后,整个register0
的逻辑计算大体执行完了。
整个register0
方法就是在eventLoop中执行的,执行完之后,eventLoop线程继续进行无限for循环:如果队列中有任务,就取任务执行,否则进行selector.select
操作,具体逻辑可以到NioEventLoop
的run
方法去查看。整个eventLoop线程的大体执行逻辑就是这样。
===========================================================
然后我们回到主线程:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
整个register
逻辑就算执行完了,返回regFuture
。
紧接着就是执行doBind0
操作
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
//非关键代码不贴
}
}
可以看到绑定地址的操作也是在eventLoop线程中执行的,这里execute
就是提交任务到队列,由eventLoop线程去队列取任务执行。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
执行完之后,返回了一个ChannelPromise
实例,然后调用其同步方法sync
:
ChannelFuture future = bootstrap.bind(port).sync();
此时主线程已经被阻塞住,查看线程状态:
image.png主线程阻塞,eventLoop线程一直loop。至此,整个服务端就算已经启动完毕了。
网友评论