Netty 的启动流程
服务端 Nio 创建步骤包括:1. 创建;2. 注册;3. 监听。
所以需要在 netty 启动也需要完成以上步骤
创建
创建步骤分为:创建 selector,创建 ServerSocketChannel
1. 创建 Selector
在创建 NioEventLoopGroup 的时候,会对每个 EventLoop 进行初始化,初始化 EventLoop时,也初始化 EventLoop 里的 selector
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup 构造方法调用链
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
// executor 为 null
// SelectorProvider.provider() 方法使用单例模式创建SelectorProvider
// SelectorProvider 是后续创建 Selector 的类
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// DefaultSelectStrategyFactory.INSTANCE 通过单例模式创建一个 DefaultSelectStrategyFactory
// DefaultSelectStrategyFactory 的用途是后续创建 SelectStrategy
// SelectStrategy 用来控制后续 select 轮训的策略
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
// RejectedExecutionHandlers.reject() 定义了 NioEventLoop 里线程池的拒绝策略
// super 表示调用 NioEventLoopGroup 的父类构造函数,即 MultithreadEventLoopGroup
super(nThreads, executor, selectorProvider,
selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// args[0]: selectorProvider
// args[1]: selectStrategyFactory
// args[2]: rejectedExecutionHandler
// super 表示 调用父类构造函数,即 MultithreadEventExecutorGroup
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// DefaultEventExecutorChooserFactory.INSTANCE 通过单例创建DefaultEventExecutorChooserFactory
// DefaultEventExecutorChooserFactory 的作用是通过 EventExecutor 数组创建一个 EventExecutorChooser
// EventExecutorChooser 的作用是通过不同的算法从 EventExecutor 数组 选择一个 EventExecutor
// NioEventLoop 继承了 EventExecutor,所以在 NioEventLoopGroup 里的 NioEventLoop数组即是EventExecutor数组
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
try{
// 创建 NioEventLoop
// 模板方法,由 NioEventLoopGroup 实现,selector 就在创建 NioEventLoop 时进行创建
children[i] = newChild(executor, args);
}
...
}
}
// NioEventLoopGroup#newChild()
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
// 创建 NioEventLoop
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
接下来就是 NioEventLoop 的构造函数
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 创建 selector,并返回一个selector 的元组对象,此元组包含了一个 unwrappedSelector 和 一个 selector
// 默认情况下 unwrappedSelector 和 selector 是相同的对象,都是java nio 原生的 selector
// 只有开启了优化配置,才会创建优化了的 selector
// 但是即使优化了selector,在注册的时候依然使用 unwrappedSelector 这个原始的 selector
final SelectorTuple selectorTuple = openSelector();
// selector: 是被 Netty 优化,由 Netty 继承自 selector 接口实现的类
this.selector = selectorTuple.selector;
// unwrappedSelector: 没有被 Netty 优化的,原始的JDK Nio selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
至此,便完成了 selector 的创建
接下来就是 ServerSocketChannel 的创建
2. ServerSocketChannel 的创建和注册
- ServerSocketChannel 是Java 原始的 Channel,在 Netty 中对这个ServerSocketChannel 进行了扩展,使用 NioServerSocketChannel 持有 ServerSocketChannel
// Java ServerSocketChannel 的在 NioServerSocketChannel 的位置
// NioServerSocketChannel -> AbstractNioMessageChannel -> AbstractNioChannel
// SelectableChannel 就是 java nio ServerSocketChannel 的接口类
private final SelectableChannel ch;
protected SelectableChannel javaChannel() {
return ch;
}
- 所以 ServerSocketChannel 的创建过程应该在 NioServerSocketChannel 创建的时候进行的
- 而 NioServerSocketChannel 则是在 ServerBootstrap中被创建的
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
...
bootstrap.bind(8080);
与 NioServerSocketChannel 相关代码有两个
-
bootstrap.channel(NioServerSocketChannel.class)
方法会创建一个 channelFactory,channelFactory 是一个 netty channel 的工厂方法,用来创建 NioServerSocketChannel 实例
// AbstractBootstrap#channel
public B channel(Class<? extends C> channelClass) {
// ReflectiveChannelFactory 封装了 channelClass,调用 ReflectiveChannelFactory#newChannel 方法时通过反射的方式实例化channelClass,返回channelClass类的对象实例
// channelFactory方法则将 ReflectiveChannelFactory 复制给 AbstractBootstrap的成员变量 channleFactory
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
// ReflectiveChannelFactory 构造方法
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
}
...
}
-
bootstrap.bind(8080)
用来实际创建NioServerSocketChannel
对象实例
进入 bootstrap.bind()
方法,该方法实际上进入了 ServerBootstrap 的父类 AbstractBootstrap的 bind 方法
// AbstractBootstrap#bind(int inetPort)
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
// 验证 bossGroup 和 channelFactory 是否存在
validate();
// 真正进行 bind 的方法
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化 NioServerSocketChannel ,并将 NioServerSocketChannel 里的注册到 seletor 中就在这里
final ChannelFuture regFuture = initAndRegister();
...
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 调用 上面所说的 ReflectiveChannelFactory#newChannel() 方法
// 初始化并返回 NioServerSocketChannel
channel = channelFactory.newChannel();
init(channel);
}
...
ChannelFuture regFuture = config().group().register(channel);
}
//ReflectiveChannelFactory#newChannel()
@Override
public T newChannel() {
try {
// 相当于 new NioServerSocketChannel();
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
接下来就是创建 NioServerSocketChannel 的流程
// NioServerSocketChannel 的构造函数
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
// 创建有个 java Nio ServerSocketChannel 实例
return provider.openServerSocketChannel();
...
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 调用父类的构造方法,即AbstractNioMessageChannel的构造方法
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 调用父类的构造方法,即 AbstractNioChannel 的构造方法
// parent 为 null
// ch 为 ServerSocketChannel
// readInterestOp 为 SelectionKey.OP_ACCEPT
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// parent 为 null, 调用父类构造函数,即 AbstractChannel 的构造函数
super(parent);
// 将 serverSocketChannel 赋值给 NioServerSocketChannel 的成员变量 ch
this.ch = ch;
// serverSocketChannel 只对 SelectionKey.OP_ACCEPT 感兴趣
this.readInterestOp = readInterestOp;
try {
// 将 serverSocketChannel 设置为非阻塞
ch.configureBlocking(false);
}
...
}
protected AbstractChannel(Channel parent) {
// parent 为 null
this.parent = parent;
// NioServerSocketChannel的 id
id = newId();
// 最后会调用 AbstractNioByteChannel 的 newUnsafe() 方法,返回 NioMessageUnsafe 对象
unsafe = newUnsafe();
// 熟悉的 pipeline,返回 DefaultChannelPipeline 对象
pipeline = newChannelPipeline();
}
所以,在创建 NioServerSocketChannel 时的 ServerSocketChannel 便已经被创建了,并赋值给了成员变量 ch
接下来就是把这个 ServerSocketChannel 注册到 selector 中,回到刚才 AbstractBootstrap 的 initAndRegister 方法
final ChannelFuture initAndRegister() {
Channel channel = null;
// 完成了 ServerSocketChannel 的创建,接下来就是将 ServerSocketChannel 注册到 selector 中
// config().group() 会返回 bossGroup,即是一个 NioEventLoopGroup 对象register(channel)
// 调用 config().group().register(channel) 就是调用 NioEventLoopGroup.register(channel)
// NioEventLoopGroup.register(channel) 方法的实现在其父类 MultithreadEventLoopGroup
ChannelFuture regFuture = config().group().register(channel);
}
// MultithreadEventLoopGroup#register
@Override
public ChannelFuture register(Channel channel) {
// next() 会使用 chooser(EventExecutorChooser) 选择一个 NioEventLoop
// 调用 NioEventLoop.register 方法,NioEventLoop.register 方法在 SingleThreadEventLoop.register 中实现
return next().register(channel);
}
// SingleThreadEventLoop#register
@Override
public ChannelFuture register(Channel channel) {
// 根据 NioServerSocketChannle 生成一个 DefaultChannelPromise
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
// promise.channel 返回 NioServerSocketChannel
// NioServerSocketChannel.unsafe() 返回一个 Unsafe
// Unsafe.register 方法会进入 NioByteUnsafe.register 方法, 而 NioByteUnsafe.register 方法的实现在 AbstractNioUnsafe
promise.channel().unsafe().register(this, promise);
return promise;
}
// AbstractNioUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
// 判断当前线程是否是在本 eventLoop,显然不是,当前线程是在 main thread,所以不进入
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 不是在 eventLoop中,则创建一个线程,放到 eventLoop中的线程池中,在线程池中调用register0() 方法
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
...
}
}
private void register0(ChannelPromise promise) {
...
// 此方法在 AbstractNioChannel中实现
doRegister();
...
}
// AbstractNioChannel#doRegister
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// javaChannel() 即是 ServerSocketChannel
// eventLoop().unwrappedSelector() 即是 selector
// 0 表示 ServerSocketChannel 对任何事件都不感兴趣,后续还会另外注册 Accept 事件到 selector
// this:表示把 NioServerSocketChannel 作为属性绑定到 serverSocketChannel 中,只要selector 返回了 serverSocketChannel 相关的事件,便会带回来这个 NioServerSocketChannel 对象
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
}
}
到目前为止,
- selector 的创建,
- serverSocketChannel 的创建,
- serverSocketChannel 向 selector 注册
已经完成
还有
- 给 serverSocketChannel设置监听端口
- 注册 serverSocketChannel accept事件 到 selector
- selector.调用 select() 方法轮询的步骤
还没完成
回到 AbstractBootstrap#dobind()
方法
private ChannelFuture doBind(final SocketAddress localAddress) {
// 上面的步骤创建了 NioServerSocketChannel,由于 NioServerSocketChannel注册到 selector 是放在 eventLoop 的线程池异步进行的,所以会返回一个regFuture 这个异步结果。
...
if (regFuture.isDone()) {
// 如果已经注册完成了NioServerSocketChannel,则开始 bind NioServerSocketChannel
// 具体的bind 方法在 doBind0() 中
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
//拿到 NioServerSocketChannel 所在的 eventLoop,然后将将 bind 方法丢给 eventLoop 里的线程池进行处理
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 调用 NioServerSocketChannel 的 bind 方法,此方法的实现在 AbstractChannel 类
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
//AbstractChannel#bind
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 调用 DefaultChannelPipeline 的 bind 方法
return pipeline.bind(localAddress, promise);
}
// DefaultChannelPipeline#bind
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 调用 TailContext 的 bind 方法,该方法会调用 AbstractChannelHandlerContext.bind() 方法
// 该方法会去寻找pipeline 中所有的 executionMask 包含了 Mask_Bind 的 Outbound Handler,默认情况下只有 HeadContext 才有,所以实际上调用的是 HeadContext 的 bind 方法
return tail.bind(localAddress, promise);
}
// HeadContext#bind
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// 该 unsafe 是 NioServerSocketChannel 中的 NioMessageUnsafe 对象,NioMessageUnsafe.bind 方法的实现在其父类 AbstractUnsafe 中
unsafe.bind(localAddress, promise);
}
// AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
try {
// 调用 NioServerSocketChannle 的 doBind 方法
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
...
}
// NioServerSocketChannel#Bind
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 调用 java nio 的 ServerSocketChannel 的 bind 方法
// backlog 表示等候排队的连接队列长度
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
至此,ServerSocketChannel bind 监听端口也已经完成
接下来还有
- 注册 serverSocketChannel accept 事件 到 selector
- selector.调用 select() 方法轮询的步骤
回到 AbstractUnsafe.bind
方法
// AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发 pipeline 中 ChannelHandler 的 channelActive() 方法
// 这里直接到 HeadContext 的 active() 方法
pipeline.fireChannelActive();
}
});
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
// 注册 Accept 的事件
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
// 调用 NioServerSocketChannel 的 read 方法
// 该方法的实现在 AbstractChannel 中
channel.read();
}
}
//AbstractChannel#read
@Override
public Channel read() {
// 调用 DefaultChannelPipeline 的 read() 方法
pipeline.read();
return this;
}
//DefaultChannelPipeline#read
@Override
public final ChannelPipeline read() {
// 该方法从 tail 开始,调用所有的 ChannelHandler 的 read 的方法
// 最终会到达 HeadContext 的 read(ChannelHandlerContext ctx) 方法
tail.read();
return this;
}
// HeadContext
@Override
public void read(ChannelHandlerContext ctx) {
// 调用 NioMessageUnsafe.beginRead() 方法
// 该方法的实现在 AbstractUnsafe 中
unsafe.beginRead();
}
// AbstractUnsafe#beginRead()
@Override
public final void beginRead() {
...
try {
// 调用 AbstractChannel 的 doBeginRead() 方法
// 该方法的实现在 AbstractNioChannel
doBeginRead();
} catch (final Exception e) {
...
}
...
}
// AbstractNioChannel#doBeginRead()
@Override
protected void doBeginRead() throws Exception {
// 这个 selectionKey 是在将 serverSocketChannel 注册到 selector 时返回的
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// readInterestOp 是在创建NioServerSocketChannel 时指定的,值为 SelectionKey.OP_ACCEPT
if ((interestOps & readInterestOp) == 0) {
// 设置对 accept 事件感兴趣
selectionKey.interestOps(interestOps | readInterestOp);
}
}
完成了 accept 注册后,最后一部就到了select 轮询
总结
netty启动过程.png
网友评论