前言
作为一个 Java 程序员,必须知道Java社区最强网络框架-------Netty,且必须看过源码,才能说是了解这个框架,否则都是无稽之谈。今天楼主不会讲什么理论和概念,而是使用debug 的方式,走一遍 Netty (服务器)的启动过程。
1. demo 源码
楼主 clone 的 netty 的源码,值得一提的是,netty 提供了大量的 demo 供用户使用和测试。今天我们就通过netty的例子,来逐步 debug。ok ,开始吧。
启动类源码
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
/**
* @see io.netty.channel.nio.NioEventLoop
*/
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(8);
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//new ReflectiveChannelFactory<C>(channelClass)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))// ServerSocketChannel 专属
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception { // SocketChannel 专属
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
处理器源码
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws UnsupportedEncodingException {
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.err.println(body);
String reqString = "Hello I am Server";
ByteBuf resp = Unpooled.copiedBuffer(reqString.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
这两个源码都在 io.netty.example.echo 包下,大家可以自行下载。
2. demo 分析
我们先分析一下我们的 demo 源码,知道他们有哪些作用。
先看启动类:
main 方法中,首先创建了关于SSL 的配置类,这个不是我们今天的重点。略过。
重点来了,创建了两个EventLoopGroup 对象:
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(8);
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
这两个对象是整个 Netty 的核心对象,可以说,整个 Netty 的运作都依赖于他们。bossGroup 用于接受 Tcp 请求,他会将请求交给 workerGroup ,workerGroup 会获取到真正的连接,然后和连接进行通信,比如读写解码编码等操作。
try 块中创建了一个 ServerBootstrap 对象,他是一个引导类,用于启动服务器和引导整个程序的初始化。
随后,变量 b 调用了 group 方法将两个 group 放入了自己的字段中,用于后期引导使用。
然后添加了一个 channel,其中参数一个Class对象,引导类将通过这个 Class 对象反射创建 Channel。
然后添加了一些TCP的参数。
再添加了一个服务器专属的日志处理器 handler。
再添加一个 SocketChannel(不是 ServerSocketChannel)的 handler。
然后绑定端口并阻塞至连接成功。
最后main线程阻塞等待关闭。
finally 块中的代码将在服务器关闭时优雅关闭所有资源。
再看 EchoServerHandler 类
这是一个普通的处理器类,用于处理客户端发送来的消息,在我们这里,我们简单的解析出客户端传过来的内容,然后打印,最后发送字符串给客户端。
好,我们已经大致讲解了我们的 demo 源码的作用。当然,这里讲的很简单,我们将在后面的debug 的时候详细介绍他们的作用。
3. 首先看创建 EventLoopGroup 的过程:
上面的这些都是一些重载的构造方法,并加入了一些默认值,比如为null 的 executor,还有熟悉的 NIO 的 SelectorProvider.provider(),也有一个单例的选择策略工厂,还有一个默认的线程池拒绝策略,最后还有一个线程的默认数量:CPU 核心数 * 2。最后还有一个默认的线程选择策略工厂。
最后,才是 NioEventLoopGroup 真正的构造方法,在抽象父类MultithreadEventExecutorGroup中,这里我们看到了模板模式,代码如下 :
// 1.默认0,2executor 默认null, 3.nio provider,4.new DefaultSelectStrategyFactory() 是个单例,5.默认拒绝策略:抛出异常
// args : 3-5, 线程数默认: NettyRuntime.availableProcessors() * 2,也就是 CPU core * 2
// 1.默认 core *2, 2.null, 3. 单例new DefaultEventExecutorChooserFactory(), 4, 3-5
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// 类名为名称的线程工厂
// 该线程池没有任何队列,提交任务后,创建任何线程类型都是 FastThreadLocalRunnable, 并且立即start。
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 创建一个事件执行组
children = new EventExecutor[nThreads];
// 初始化线程数组
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建 new NioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 如果创建失败,优雅关闭
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
虽然代码很长,实际上就那么几件事情,我们拆分来看:
- 如果 executor 是null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。
- 根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。
- 循环填充数组中的元素。如果异常,则关闭所有的单例线程池。
- 根据线程选择工厂创建一个
线程选择器
,默认是对2取余(位运算),也可以顺序获取。 - 为每一个单例线程池添加一个关闭监听器。
- 将所有的单例线程池添加到一个 HashSet 中。
本篇先于篇幅,不会再继续拆解这些步骤,后面我们找机会继续拆解。
4. 再看 ServerBootstrap 创建和构造过程
ServerBootstrap 是个空构造,什么都没有,但注意,有默认的变量:
注意其中的 ServerBootstrapConfig 对象,这个对象将会在后面起很大作用。
再看后面的链式调用:group 方法,将 boss 和 worker 传入,boss 赋值给 group 属性,worker 赋值给 childGroup 属性。
channel 方法传入 NioServerSocketChannel class 对象。会根据这个 class 创建 channel 对象。
option 方法传入 TCP 参数,放在一个LinkedHashMap 中。
handler 方法传入一个 handler 中,这个hanlder 只专属于 ServerSocketChannel 而不是 SocketChannel。
childHandler 传入一个 hanlder ,这个handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用。
5. 再看 bind 方法
注意,整个服务器就是在这个方法里启动完成的,从这里开始,将会是一段跳来跳去的过程(因为是异步的),我们将使用强大的 IDEA 编辑器进行调试。
开始吧!
上面很简单,创建了一个端口对象,并做了一些空判断,就不讲了,最重要的是下面的 doBind 方法。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = ((NioServerSocketChannel)channel).newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
ServerBootstrap 的这个方法不长也不短,但经过我们的拆解,可以分为以下 2 部分:
- initAndRegister 初始化 NioServerSocketChannel 通道并注册各个 handler,返回一个 future。
- 执行 doBind0 方法,完成对端口的绑定。
看着很简单,就两个步骤。
但实际上,这两个步骤蜿蜒曲折,各种异步跳转,请做好准备。
6. initAndRegister 方法
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();//NioServerSocketChannel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
同样的,先进行拆解:
- 通过 ServerBootstrap 的通道工厂反射创建一个 NioServerSocketChannel。
- init 初始化这个 NioServerSocketChannel。
- config().group().register(channel) 通过 ServerBootstrap 的 bossGroup 注册 NioServerSocketChannel。
- 最后,返回这个异步执行的占位符。
4个步骤,调用层次极深,非战斗人员请注意安全。
第 1 个步骤,反射创建。
channelFactory.newChannel() 方法
反射创建。 传入一个 NIO 的 provider 实例并调用 newSocket 方法 可以看到这个静态变量且是 final 的 熟悉的 NIO,创建了一个 ServerSocketChannel 实例 调用父类的构造器,并传入 ACCEPT,并创建了一个 config 对象用于展示自己 继续调用父类 这里也很熟悉了,调用父类,设置非阻塞,设置感兴趣的事件,设置 Channel 属性,也就是 JDK 的 ServerSocketChannelImpl ,Netty 的 NioServerSocketChannel 代理了 JDK 的 Socket。 这里继续给父类赋值, 通过 debug 看到,parent 是 null, id 是通过算法生成唯一ID, 并创建了一个 unsafe和 pipeline 一个工厂 创建 channelID 回到创建 unsafe 的过程,NIO 的 message 的操作类,是 Netty 的核心组件 回到创建 pipeline 的方法,一个默认的 pipeline ,参数是 this ,即 NioServerSocketChannel DefaultChannelPipeline 的构造方法,是一个双向链表,并将 NioServerSocketChannel 设置为自己的属性回到 NioServerSocketChannel 的构造方法,还有 config 的构造方法没看。
这里返回的是我们各个设置的 JDK 的 ServerSocketChannel 继续调用父类,但是,限于篇幅,我们不再深入,我们只需知道,这个 config 对象用于配置这个 NioServerSocketChannel ,用于外部获取参数和配置到此为止,我们看到了整个 NioServerSocketChannel 的构造过程,可谓非常的复杂,尽管我们贴了很多图,但仍然没有到底。但这不妨碍我们这次的主体内容。我们总结一下构造过程:
- 通过 NIO 的SelectorProvider 的 openServerSocketChannel 方法得到JDK 的 channel。目的是让 Netty 包装 JDK 的 channel。同时设置刚兴趣的事件为 ACCEPT和非阻塞
- 创建了一个唯一的 ChannelId,创建了一个 NioMessageUnsafe,用于操作消息,创建了一个 DefaultChannelPipeline 管道,是个双向链表结构,用于过滤所有的进出的消息。
- 创建了一个 NioServerSocketChannelConfig 对象,用于对外展示一些配置。
好,NioServerSocketChannel 对象创建完了, 现在进入到第二个步骤,init 方法,这是个抽象方法,由 ServerBootstrap 自己实现。
init 方法
代码如下:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = ((NioServerSocketChannel)channel).pipeline();//NioServerSocketChannel
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
(p).addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();// ServerBootstrapConfig
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
同样的,该方法不长也不短,继续拆解,不能陷入细节中:
- 设置 NioServerSocketChannel 的 TCP 属性。
- 由于 LinkedHashMap 是非线程安全的,使用同步进行处理。
- 对 NioServerSocketChannel 的 ChannelPipeline 添加 ChannelInitializer 处理器。
从上面的步骤可以看出, init 的方法的精华在和 ChannelPipeline 相关。
从 NioServerSocketChannel 的初始化过程中,我们知道,pipeline 是一个双向链表,并且,他本身就初始化了 head 和 tail,这里调用了他的 addLast 方法,也就是将整个 handler 插入到 tail 的前面,因为 tail 永远会在后面,需要做一些系统的固定工作。
我们进入到 addLast 方法内查看:
循环添加addLast(EventExecutorGroup group, String name, ChannelHandler handler) 方法:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
这里就是 pipeline 方法的精髓了,拆解如下:
- 检查该 handler 是否符合标准,如果没有 Sharable 注解且已经被使用过了,就抛出异常。
- 创建一个 AbstractChannelHandlerContext 对象,这里说一下,ChannelHandlerContext 对象是 ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 Pipeline 中时,都会创建 Context。Context 的主要功能是管理他所关联的 Handler 和同一个 Pipeline 中的其他 Handler 之间的交互。
- 然后将 Context 添加到链表中。也就是追加到 tail 节点的前面。
- 最后,同步或者异步或者晚点异步的调用 callHandlerAdded0 方法,在该方法中,调用之前的 handler 的 handlerAdded 方法,而该方法内部调用了之前的 ChannelInitializer 匿名类的 initChannel 方法,并且参数就是 context 的 channel(通过 pipeline 获取),也就是 NioServerSocketChannel。这个 Context 的标准实现就是 DefaultChannelHandlerContext。这个 Context 内部会包含一些重要的属性,比如 pipeline,handler,属于出站类型还是入站类型等。
从上面的分析我们可以看出,pipeline 的 addLast 方法,实际上创建一个 Context 对象包装了 pipeline 和 handler,然后通过同步或者异步的方式,间接执行 handler 的 自定义方法-------initChannel 方法。而这个 context 也加入到了 pipeline 的链表节点中。
好了,针对 addLast 方法,我们暂且就分析到这里。
config().group().register(channel) 方法
回到 initAndRegister 方法中,继续看 config().group().register(channel) 这行代码,config 方法返回了 ServerBootstrapConfig,这个 ServerBootstrapConfig 调用了 group 方法,实际上就是 bossGroup。bossGroup 调用了 register 方法。
这个 next 方法调用的是 EventExecutorChooser 的 next 方法,我们看看该方法的实现:
注意,这里是著名的 Netty 对性能压榨的一个例子,Netty 对于选取数组中的线程有着2套策略。
- 如果数组是偶数,则使用位运算获取下一个EventLoop(单例线程池)(效率高)。
- 如果是奇数,使用取余(效率低)。
所以,如果是自定义数组长度的话,最好是偶数,默认的就是CPU 核心的2倍,即偶数。
并且,在判断数组是否是偶数的算法中,也没有使用取余,而是位运算。如下:
isPowerTwo 方法回到正题。拿到下一个单例线程池后,调用他的 register 方法:
this 就是 EventLoop(单例线程池),这里多次提到单例线程池,为什么使用单例线程池呢?一个线程还使用线程池有什么意义呢?答:需要任务队列,有很多任务需要进行调度,所以需要线程池的特性。但为了多线程的切换导致的性能损耗和为了消除同步,所以使用单个线程。
继续,这里创建了一个 DefaultChannelPromise ,这里需要说一下 Promise 的作用,其实类似 Future,事实上也继承了 JDK 的 Future,但增加了很多功能,比如 JDK 的 Future 虽然是异步的,但仍需要 get 方法 阻塞获取结果才能坐之后的事情,而 Promise 可以通过设置监听器的方式,在方法执行成功或者失败的情况下无需等待,就能执行监听器中的任务,效率币 Future 高很多。从某种程度上说,Future 是非阻塞,而Promise 才是正在的异步。
他的构造方法也很简单:
public DefaultChannelPromise(Channel channel, EventExecutor executor) {
super(executor);
this.channel = checkNotNull(channel, "channel");
}
// 父构造
public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}
好了,回到 register 方法,有了刚刚创建的 Promise,EventLoop 继续调用自己的 register 方法:
register通过调用 promise 的 channel 方法获取了 NioServerSocketChannel ,然后再调用 NioServerSocketChannel 的 unsafe方法获取创建NioServerSocketChannel对象时同时创建的 NioMessageUnsafe 对象,最后调用 NioMessageUnsafe 的 register 方法,参数时 promise 和 NioEventLoop。最后返回了这个 promise 方法。
我们可以先思考一下,之所以使用 promise ,register 内部肯定时异步执行了某个方法,让 promise 立刻返回。执行完毕后再执行设置的监听器的方法。
我们去看个究竟:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {// 开始真正的异步,boss 线程开始启动
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
该方法不长也不短,我们拆解一下:
- 先是一系列的判断。
- 判断当前线程是否是给定的 eventLoop 线程。注意:这点很重要,Netty 线程模型的高性能取决于对于当前执行的Thread 的身份的确定。如果不在当前线程,那么就需要很多同步措施(比如加锁),上下文切换等耗费性能的操作。
- 异步(因为我们这里直到现在还是 main 线程在执行,不属于当前线程)的执行 register0 方法。
实际上,聪明如你,一定知道 register0(promise) 才是最重要的方法。我们来看看该方法逻辑:
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
继续拆解:
- 首先状态判断。
- 执行 doRegister 方法。
- 执行 pipeline.invokeHandlerAddedIfNeeded() 方法。
- 执行 pipeline.fireChannelRegistered() 方法。
可以看到 doRegister 应该就是真正的执行方法,而后面的就是管道开始调用 handller 的一些注册成功之后的回调方法。先看doRegister 方法:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
是不是很熟悉?该方法在一个死循环中向 JDK 中注册感兴趣的事件。如果成功,则直接结束,如果失败,则 调用 EventLoop 内部的 JDK 的 select 的 selectNow 方法立即返回,然后尝试第二次注册,如果还是报错,则抛出异常。注意,这里同时还把自己(NioServerSocketChannel)作为attach 绑定了该 selectKey 上。大家可能奇怪,为什么注册的是0,而不是16 Accpet 事件呢?楼主也不知道。但是最后还是会删除这个读事件,重新注册 accpet 事件的。netty 不知道是怎么想的。
好了,回到 register0 方法,还有2个步骤,分别是执行 pipeline 的 invokeHandlerAddedIfNeeded 方法和 fireChannelRegistered 方法,同时设置 promise 为成功,这个时候,promise就会执行监听器的方法。
invokeHandlerAddedIfNeeded 方法是做什么的呢?还记得我们之前在 pipeline 的 addLast 方法中,添加了一个 handler 吗?我们说该方法可能会晚点执行,因为这个方法被包装成了 task,这里就会执行该方法。这个方法的意思就是,如果管道中有需要执行的任务,就去执行。我们回忆一下那个方法:
addLast 方法,added 属性为truecallHandlerCallbackLater 方法会根据 added 属性包装成一个 task(add 任务或 removed 任务),成为任务链表上的一个节点。
而 add 任务和 removed 任务的不同在于,add 任务是pipeline 初始化之后调用的任务(通过 Channel 的handlerAdded 方法),removed 是pipeline 结束后执行(通过 Handler 的 handlerRemoved 方法)。
回到 register0 方法,在执行完 invokeHandlerAddedIfNeeded 方法后,也就是我们刚开始的init 方法里的 ChannelInitializer 匿名类的 initChannel 方法。
safeSetSuccess(promise) 方法就是通知 promise 已经成功了,你可以执行监听器的方法了,而这里的监听器则是我们的 dobind 方法中设置的:
doBind方法至于内部执行,我们稍后再说,先回到我们的 register0 方法中,在 safeSetSuccess 方法执行后,执行 pipeline.fireChannelRegistered() 方法。看名字是执行 handler 的注册成功之后的回调方法。我们跟进去看看:
Pipeline 中的 静态方法,并传入了 head Context 获取 head 的 执行器EventLoop,用于判断是否在当前线程,如果在当前线程,则立即执行 invokeChannelRegistered 方法,否则异步执行,我们这里当然是在当前线程。所以同步执行 在这里执行 Context 对应的 handler 的 channelRegistered 方法, 和之前一样,会检查通道中是否有需要延迟执行的任务,如果有,就执行,然后调用 Context 的 fireChannelRegistered 方法,而不是 pipeline 的 fireChannelRegistered 方法 该方法调用的是 head Context 的 invokeChannelRegistered 静态方法,注意,这里的参数很重要,我们进入 findContextInbound 方法内部查看 注意,我们说 pipeline 是一个双向链表,这里链表起作用了,通过找到当前节点的下一个节点,并返回,但这判断的是:必须是入站类型的回到 fireChannelRegistered 方法,看看 invokeChannelRegistered 是如何调用的?
注意到了吗,next 节点就是我设置的 LoggingHandler 对应的 Context,获取对应的 EventLoop。从这里我们总结一下 netty 的 Handler 设计:Netty 初始了一个 pipeline,pipeline 内部维护着一个 ChannelContextContext 双向链表,Context 是对 Handler 的封装,是 pipeline 和 Handler 沟通的关键,每次信息入站,从 head 节点开始,执行 context 的 handler 的对应方法,执行结束通过 findContextInbound() 方法找到下一个节点,继续执行。
tail 节点的 channelRegistered 什么都不做。
好,终于可以回到我们 ServerBootStrap 的 initAndRegister 方法中了。
还没有完。我们上面对于第三个步骤 对 NioServerSocketChannel 的 ChannelPipeline 添加 ChannelInitializer 处理器。
只是一笔带过。也就是说,我们从NioServerSocketChannel 的pipeline 的的addLast 方法中一直分析到现在。我们再回头看看该方法:
我们分析了 addLast 方法,但下面还有一个回调方法,什么呢?想 NioServerSocketChannel 的 EventLoop 提交了一个任务,也就是 pipeline 的 addLast 方法。是一个 ServerBootstrapAcceptor 对象,而这个 ServerBootstrapAcceptor 也是一个 handler,你可以想到了吧,从该 handler 名字就可以看出来,该 handler 是用于处理 accept 事件的。我们看看他的构造方法:
上面没有上面好说的,下面有一个 task,任务内容是 设置 该 channel 的autoread 属性为 true,这里我们记一下。还要注意一点,有一个 childHandler 属性,是什么呢?就是我们 main 方法中的 ChannelInitializer 匿名内部类,聪明的你应该想到了,既然该 handler 是接受 accept 事件的,那么,肯定需要初始化管道等操作,不然我们怎么在管道中操作我们的逻辑呢?所以就需要这个 ChannelInitializer 通道初始化对象了。
好了,到这里,我们的 initAndRegister 方法终于算是结束了。
回到我们的 doBind 方法。
7. 回到 doBind 方法
doBind 方法终于回来了,继续分析,我们上面说完了 dobind 方法有2个重要的步骤,initAndRegister 说完了,接下来看 doBind0 方法,该方法的参数为 initAndRegister 的 future,NioServerSocketChannel,端口地址,NioServerSocketChannel 的 promise。我们进入看看:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
该方法想 NioServerSocketChannel 的 eventLoop 提交了一个任务,当 future(其实就是 promise) 成功后执行
NioServerSocketChannel 的 bind 方法,并添加一个关闭监听器。我们主要关注 bind 方法。
层层调用来到了 NioServerSocketChannel 的 pipeline 的 tail 节点的 bind 方法,该方法首先找到出站节点,然后执行出站节点的 invokeBind 方法。
寻找 tail 节点的上一个节点,且必须是出站类型的,根据我们的设置,tail 的上一个节点应该是 LoggingHandler 因为他既是是出站类型也是入站类型 根据 UML 可知 LoggingHandler 类型接下来,将调用 LoggingHandler 的 invokeBind 方法。
context 调用 handler 的 bind 方法 当然就是打印日志而已 继续循环,ctx 的 bind 方法就是先寻找下一个节点或者下一个节点,然后调用节点的 invokeBind 方法,然后调用 handler 的 bind 方法 来到了head 节点的bind方法,这里调用了 unsafe 的 bind方法 这里的 unsafe 来自 NioServerSocketChannel 的 unsafe我们看看 unsafe 的 bind 方法:
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
可以看到,这里最终的方法就是 doBind 方法,执行成功后,执行通道的 fireChannelActive 方法,告诉所有的 handler,已经成功绑定。那我们就进入 doBind 方法查看:
很熟悉,获取 JDK 的channel 进行绑定。
回到 bind 方法,然后调用 invokeLater 方法,代码如下:
private void invokeLater(Runnable task) {
try {
eventLoop().execute(task);
} catch (RejectedExecutionException e) {
logger.warn("Can't invoke task later as EventLoop rejected it", e);
}
}
将这个任务提交。而这个 fireChannelActive 和之前 pipeline 的所有方法都类似,遍历所有节点,执行 ChannelActive 方法。
回到 bind 方法,最后一步:safeSetSuccess(promise),告诉 promise 任务成功了。其可以执行监听器的方法了。虽然这个 promise 没有任何监听方法。
如果到这里,楼主告诉你,整个启动过程已经结束了,你肯定和诧异,什么?服务器不应该是监听 Accept 事件吗,我们分析了这么多,只发现在 doRegister 方法中注册了 0 (read) 事件,竟然没有监听 Accept 事件,和我们平时写的 Nio 代码不同啊?
是的,如果你想到了这里,说明你思考了。
一切就在上面的 fireChannelActive 方法中。该方法回先调用 head 节点的 channelActive 方法,而 head 节点的 channelActive 代码如下:
readIfIsAutoRead 默认返回 true。然后像之前的 pipeline 一样,继续在链表中调用。最后,来到了一个关键的地方:
Head 节点 的read 方法调用的是 NioServerSocketChannel 的 unsafe 的 beginRead 方法。继续查看:
最后来到了这里
我们看看 NioServerSocketChannel 的 doBeginRead 方法。
拿到 selectionKey ,如果 key 的监听事件是0 的话,就改为 readInterestOp ,也就是我们初始化NioServerSocketChannel 时设置的值:
好了,到这里,整个服务器就完整的启动了,可谓艰难。但整体而言,Netty 的结构设计还是很紧凑的。虽然调用层次很深,但这是所有源码的特点。
8. 总结启动过程
好了,从源码层面已经分析完了,我们来总结一下启动的过程。
- 首先创建2个 EventLoopGroup 线程池数组。数组默认大小CPU*2,方便chooser选择线程池时提高性能。
- BootStrap 将 boss 设置为 group属性,将 worker 设置为 childer 属性。
- 通过 bind 方法启动,内部重要方法为 initAndRegister 和 dobind 方法。
- initAndRegister 方法会反射创建 NioServerSocketChannel 及其相关的 NIO 的对象, pipeline , unsafe,同时也为 pipeline 初始了 head 节点和 tail 节点。同时也含有 NioServerSocketChannelConfig 对象。然后向 pipeline 添加自定义的处理器和 ServerBootstrapAcceptor 处理器。这个处理器用于分配接受的 请求给 worker 线程池。每次添加处理器都会创建一个相对应的 Context 作为 pipeline 的节点并包装 handler 对象。注册过程中会调用 NioServerSocketChannel 的 doRegister 方法注册读事件。
- 在register0 方法成功以后调用在 dobind 方法中调用 doBind0 方法,该方法会 调用 NioServerSocketChannel 的 doBind 方法对 JDK 的 channel 和端口进行绑定,之后在调用 pipeline 的fireChannelActive 最后会调用 NioServerSocketChannel 的 doBeginRead 方法,将感兴趣的事件设置为Accept,完成 Netty 服务器的所有启动,并开始监听连接事件。
好了,时间不早了,good luck! 请期待后续关于 Netty 源码分析的文章!
网友评论