// 1. 配置 bossGroup 和 workerGroup
final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
final EventLoopGroup workerGroup = new NioEventLoopGroup();
// 2. 创建业务逻辑处理器
final EchoServerHandler serverHandler = new EchoServerHandler();
// 3. 创建并配置服务端启动辅助类 ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});
// 4. 阻塞绑定端口
ChannelFuture f = b.bind(8081).sync();
// 5. 为服务端关闭的 ChannelFuture 添加监听器,用于实现优雅关闭
f.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
});
一、代码执行流程梯形图
/*********************************************** 1. 创建 NioEventLoopGroup ***********************************************/
new NioEventLoopGroup
--> SelectorProvider.provider() -- args
--> new DefaultSelectStrategyFactory() -- args
--> RejectedExecutionHandlers.reject() -- args
<!-- 1.1 创建 EventExecutor 选择器工厂 -->
--> new DefaultEventExecutorChooserFactory()
--> MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)
<!-- 1.2 创建线程工厂 -->
--> ThreadPerTaskExecutor(newDefaultThreadFactory())
<!-- 1.3 创建 EventExecutor 数组并且循环实例化数组元素 -->
--> EventExecutor[] children = new EventExecutor[nThreads]
--> children[i] = newChild(executor, args)
--> NioEventLoopGroup.newChild(Executor executor, Object... args)
<!-- 1.3.1 创建 select 策略 -->
--> DefaultSelectStrategyFactory.newSelectStrategy()
--> SelectStrategy INSTANCE = new DefaultSelectStrategy()
--> new NioEventLoop(NioEventLoopGroup parent,
Executor executor,
SelectorProvider selectorProvider,
SelectStrategy strategy,
RejectedExecutionHandler rejectedExecutionHandler)
--> SingleThreadEventExecutor(EventExecutorGroup parent,
Executor executor,
boolean addTaskWakesUp,
int maxPendingTasks,
RejectedExecutionHandler rejectedHandler)
<!-- 1.3.2 创建 taskQueue,用于存放非 NioEventLoop 线程提交的 task -->
--> Queue<Runnable> taskQueue = NioEventLoop.newTaskQueue(int maxPendingTasks)
--> PlatformDependent.<Runnable>newMpscQueue()
--> SingleThreadEventLoop.Queue<Runnable> tailTasks = NioEventLoop.newTaskQueue(int maxPendingTasks)
<!-- 1.3.3 创建 Selector -->
--> Selector selector = provider.openSelector() // 简化,netty 有优化过
<!-- 1.4 创建 -->
--> EventExecutorChooserFactory.EventExecutorChooser chooser = chooserFactory.newChooser(children)
--> new PowerOfTwoEventExecutorChooser(executors)
最终的 NioEventLoopGroup 实例:
-- EventExecutor[] children = new EventExecutor[1](实例是 NioEventLoop)
-- Thread thread = null // NIO 线程
-- Executor executor = new ThreadPerTaskExecutor(new DefaultThreadFactory)// 线程创建器
-- Selector selector = 根据系统创建不同的 selector // selector 选择器
-- SelectedSelectionKeySet selectedKeys // 存储被选中的 SelectionKey 列表
-- SelectionKey[] keys
-- SelectStrategy selectStrategy = new DefaultSelectStrategy()
-- int ioRatio = 50 // selected和队列中任务的执行时间比例
-- Queue<Runnable> taskQueue = new MpscUnboundedArrayQueue<T>(1024) // 任务队列
-- Queue<Runnable> tailTasks = new MpscUnboundedArrayQueue<T>(1024)
-- PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = null
-- RejectedExecutionHandler rejectedExecutionHandler // 回绝策略
-- EventExecutorGroup parent = this 即 NioEventLoopGroup 实例 // 所属的 NioEventLoopGroup
-- DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser // 线程选择器:从children中选择一个EventExecutor实例
-- EventExecutor[] children = new EventExecutor[1]
/*********************************************** 2. 创建并设置 ServerBootstrap ***********************************************/
new ServerBootstrap()
--> Map<ChannelOption<?>, Object> options = new LinkedHashMap<>()
--> Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<>()
--> Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<>()
--> Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<>()
--> ServerBootstrapConfig config = new ServerBootstrapConfig(this)
// 设置 group
ServerBootstrap.group(bossGroup, childGroup)
--> EventLoopGroup group = bossGroup
--> EventLoopGroup childGroup = childGroup
// 设置 channel
ServerBootstrap.channel(NioServerSocketChannel.class)
--> ChannelFactory channelFactory = new ReflectiveChannelFactory(Class<? extends T> clazz) // 设置channel创建工厂,反射建立 channel
// 设置 option
ServerBootstrap.option(ChannelOption.SO_BACKLOG, 100)
--> Map<ChannelOption<?>, Object> options.put
// 设置 handler
ServerBootstrap.handler(new LoggingHandler(LogLevel.INFO))
--> ChannelHandler handler = LoggingHandler实例
// 设置 childHandler
ServerBootstrap.childHandler(new ChannelInitializer<SocketChannel>{})
--> ChannelInitializer channelInitializer = new ChannelInitializer<SocketChannel>(){}
--> ChannelHandler childHandler = channelInitializer
/*********************************************** 3. bind ***********************************************/
ServerBootstrap.bind(int inetPort)
--> AbstractBootstrap.doBind(SocketAddress localAddress)
--> ChannelFuture regFuture = initAndRegister()
/********** 3.1 创建 NioServerSocketChannel *********/
--> Channel channel = channelFactory.newChannel() // channelFactory=ReflectiveChannelFactory
--> new NioServerSocketChannel()
--> newSocket(SelectorProvider provider)
--> provider.openServerSocketChannel() // 创建 java.nio.channels.ServerSocketChannel
--> NioServerSocketChannel(ServerSocketChannel channel)
--> AbstractChannel(Channel parent) // parent=null
--> CloseFuture closeFuture = new CloseFuture(this)
--> ChannelId id = DefaultChannelId.newInstance()
--> Unsafe unsafe = new NioMessageUnsafe() // 每一个 Channel 都有一个 Unsafe 对象
--> ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this)
--> List<Object> readBuf = new ArrayList<Object>()
--> DefaultChannelPipeline pipeline = new DefaultChannelPipeline(this) // 每一个 Channel 都有一个 ChannelPipeline 对象
--> this.channel = = this
--> AbstractChannelHandlerContext tail = new TailContext(this)
--> boolean inbound = true
--> boolean outbound = false
--> this.pipeline = pipeline
--> AbstractChannelHandlerContext head = new HeadContext(this)
--> boolean inbound = true
--> boolean outbound = true
--> this.pipeline = pipeline
--> Unsafe unsafe = pipeline.channel().unsafe() // NioMessageUnsafe
--> head<->tail 组建双链表 // 每一个 ChannelPipeline 对象都有一条 ChannelHandlerContext 组成的双向链表,每一个handler都由ChannelHandlerContext包裹
--> SelectableChannel ch = channel // channel = java.nio.channels.ServerSocketChannel
--> int readInterestOp = 16 // SelectionKey.OP_ACCEPT
--> ch.configureBlocking(false) // 配置 ServerSocketChannel 为非阻塞
--> ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket()) // tcp 参数配置类
--> ServerSocket javaSocket = javaChannel().socket() // ServerSocketChannel.socket(), 创建 ServerSocket
--> RecvByteBufAllocator rcvBufAllocator = new AdaptiveRecvByteBufAllocator()
--> ByteBufAllocator allocator = PooledByteBufAllocator(preferDirect = true)
--> int connectTimeoutMillis = 30000
--> WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT // low=32 * 1024 hign=64 * 1024
--> Channel channel = this,即 NioServerSocketChannel
/********** 3.2 初始化 NioServerSocketChannel 属性并添加 acceptorInitializer *********/
--> ServerBootstrap.init(Channel channel)
--> channel.config().setOption((ChannelOption<Object>) option, value) // ServerSocketChannelConfig 设置 option 配置
--> channel.attr(key).set(e.getValue()) // 设置 attr
--> ChannelInitializer acceptorInitializer = new ChannelInitializer<Channel>() {} // 非常重要
--> channel.pipeline().addLast(acceptorInitializer)
--> AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, childExecutor(group), name, handler)
// this=pipeline, handler=acceptorInitializer, childExecutor(group)=null
--> boolean inbound = true
--> boolean outbound = false
--> this.pipeline = pipeline
--> ChannelHandler handler = acceptorInitializer
--> addLast0(newCtx) // 将 acceptorInitializer 加入链表:head <-> acceptorInitializer <-> tail
--> callHandlerCallbackLater(newCtx, true) // 创建 new PendingHandlerAddedTask(ctx) 加入 PendingHandlerCallback 链表
/********** 3.3 执行注册:此时启动Nio线程 + 注册 channel 到 selector *********/
--> ChannelFuture MultithreadEventLoopGroup.register(Channel channel) // channel=NioServerSocketChannel
--> EventExecutor eventLoop = PowerOfTwoEventExecutorChooser.next()
--> SingleThreadEventLoop.register(Channel channel)
--> new DefaultChannelPromise(channel, this)
--> EventExecutor executor = eventLoop
--> Channel channel = NioServerSocketChannel
--> promise.channel().unsafe().register(this, promise) // this=eventLoop
--> AbstractChannel$AbstractUnsafe.register(this, promise)
--> EventLoop eventLoop = this // 同一个 channel 只由一个 EventLoop 来处理
--> 因为当前线程main不是当前 eventLoop 的那条NioEventLoop线程,所以将 register0(promise) 封装为 task,丢入 eventLoop 任务队列
--> SingleThreadEventExecutor.execute(Runnable task) // task register0
--> addTask(Runnable task)
--> offerTask(Runnable task)
--> taskQueue.offer(task) // 如果添加失败(例如队列容量满了),则使用回绝处理器,此处是抛出RejectedExecutionException
--> startThread()
--> 封装线程启动task 该任务就是 Runnable command,command 内容如下
// 该task 是在下一步创建线程并启动之后执行的
--> NioEventLoop.Thread thread = 下一步创建出来的线程
--> NioEventLoop.run()
--> processSelectedKeys()
--> runAllTasks(long timeoutNanos)
--> fetchFromScheduledTaskQueue() // 将到期的 scheduledTaskQueue 中的 task 加入 taskQueue
--> Runnable task = pollTask() // taskQueue.poll()
--> safeExecute(task) // task.run() 此时会执行 register0 任务
--> afterRunningAllTasks()
--> runAllTasksFrom(tailTasks) // 执行 tailTasks 中的所有 task
--> ThreadPerTaskExecutor.execute(Runnable command) // threadFactory.newThread(command).start() 创建线程并启动
--> wakeup(boolean inEventLoop) // inEventLoop=false
--> selector.wakeup() // 唤醒阻塞
/********** 3.4 执行bind *********/
--> Channel channel = regFuture.channel()
--> ChannelPromise promise = channel.newPromise()
--> pipeline.newPromise()
--> new DefaultChannelPromise(channel)
--> doBind0(regFuture, channel, localAddress, promise)
--> 将 channel.bind 封装为 task,丢入 eventLoop 任务队列
--> channel.eventLoop().execute(bind任务) // bind task
register0 task
--> doRegister()
--> selectionKey = javaChannel().register(Selector sel, // eventLoop().unwrappedSelector()
int ops, // 0
Object att) // this
--> pipeline.invokeHandlerAddedIfNeeded() // 会执行一些 handlerAdded() 事件
--> callHandlerAddedForAllHandlers() // 执行 pendingHandlerCallback 链表
--> acceptorInitializer.initChannel(final Channel ch)
--> pipeline.addLast(config.handler()) // handler配置
--> pipeline.addLast(new ServerBootstrapAcceptor(...)) // 添加 ServerBootstrapAcceptor
--> 从 pipeline 删除 acceptorInitializer(因为此时的 acceptorInitializer 的职责已经结束)
--> pipeline.fireChannelRegistered() // 会执行一些 channelRegister() 事件
bind task
--> doBind() // javaChannel().bind(localAddress, config.getBacklog())
--> pipeline.fireChannelActive()
--> ctx.fireChannelActive() // 执行 channelActive() 事件
--> readIfIsAutoRead()
--> NioMessageUnsafe.beginRead()
--> selectionKey.interestOps(interestOps | readInterestOp) // 为 NioServerSocketChannel 绑定 ACCEPT 事件
总结:
- 创建 NioEventLoopGroup
- 创建一个 EventExecutor[],并且实例化其内的每一个元素为 NioEventLoop(EventExecutor 是 NioEventLoop 的子类)
- 每个 NioEventLoop 都包含一条线程 Thread,Netty 默认是 FastThreadLocalThread,此处为 null,后续在执行注册任务的时候,会赋值并且开启线程(每个 NioEventLoop 都包含事先被创建好的 ThreadPerTaskExecutor,该线程执行器用于线程 FastThreadLocalThread 的创建)
- 每个 NioEventLoop 都包含一个 Selector,用于死循环进行 NIO 感兴趣事件的监听
- 每个 NioEventLoop 都包含一个 taskQueue,用于存放非 NioEventLoop 线程提交的任务
- 每个 NioEventLoop 都包含一个 scheduledTaskQueue,用于存放定时任务(例如连接超时任务),此处为 null - 懒创建
- 每个 NioEventLoop 都包含一个 ioRatio,用于决定 IO 事件和队列任务的执行时间比例
- 创建 NioEventLoop 选择器,用于后续从 EventExecutor[] 选择一个 NioEventLoop
- 创建并设置 ServerBootstrap
- options、attrs、handler 都是针对 ServerSocketChannel 起作用的;
- childOptions、childAttrs、childHandler 都是针对由 ServerSocketChannel.accept() 出来的 SocketChannel 起作用的
- 设置 channel 时,创建了 ReflectiveChannelFactory,用于反射创建 NioServerSocketChannel
- 绑定操作
- 使用 ReflectiveChannelFactory 反射创建 NioServerSocketChannel
- 创建 java.nio.channels.ServerSocketChannel,NioServerSocketChannel 是其包装类
- 为 NioServerSocketChannel 设置唯一ID
- 为 NioServerSocketChannel 设置 Unsafe 实例(服务端是 NioMessageUnsafe),Unsafe 是真正的进行底层 IO 操作的类 - 每一个 Channel 都有一个 Unsafe 对象
- 为 NioServerSocketChannel 设置 ChannelPipeline 对象 - 每一个 Channel 都有一个 ChannelPipeline 对象,每一个 ChannelPipeline 都包含一条由 ChannelHandlerContext 组成的双向链表(这条双向链表至少有两个节点 HeadContext 和 TailContext),每个 ChannelHandlerContext 内部都包裹着一个 ChannelHandler。
- 设置 java.nio.channels.ServerSocketChannel 为非阻塞
- 记录感兴趣事件为 ACCEPT 事件
- 初始化 NioServerSocketChannel 属性并添加 acceptorInitializer
- 设置 options、attrs 到 NioServerSocketChannel
- 创建 acceptorInitializer 并添加其到 NioServerSocketChannel 的 ChannelPipeline 对象中,此时的 ChannelPipeline 链是
HeadContext <-> acceptorInitializer <-> TailContext
,acceptorInitializer 包含一个 ServerBootstrapAcceptor
- 执行注册:此时启动Nio线程 + 注册 channel 到 selector
- 使用 NioEventLoopGroup 的线程选择器从 bossGroup 中选出一个 NioEventLoop
X
- 最终调用 NioMessageUnsafe 执行注册,由于当前执行线程不是当前 eventLoop 的那条 NioEventLoop 线程,所以创建注册任务,并加入
X
的 taskQueue 中,然后创建线程,赋值给X
的 thread 属性,之后启动线程,执行 NIO 死循环- NIO 死循环会按照 ioRatio 计算出来的时间比分别执行 “处理 NIO 感兴趣的事件” 和 “处理队列中的任务”(会将到期的 scheduledTaskQueue 中的 task 加入 taskQueue,之后统一从 taskQueue pollTask),此时会执行注册任务
- 注册任务:
- 将 NioServerSocketChannel 中的 java.nio.channels.ServerSocketChannel 注册到
X
的 Selector 上,选择键为0(此时不监听任何事件),attachment 为 NioServerSocketChannel 本身- pipeline.invokeHandlerAddedIfNeeded() 会执行到 acceptorInitializer,首先执行其 initChannel,此时创建
ServerBootstrapAcceptor
并添加到 ChannelPipeline,如下HeadContext <-> acceptorInitializer <-> ServerBootstrapAcceptor <-> TailContext
,最后删除 acceptorInitializer,最终的 ChannelPipeline 链是HeadContext <-> ServerBootstrapAcceptor <-> TailContext
(其中,ServerBootstrapAcceptor 中存储着 childOptions、childAttrs、childHandler 以及 workerGroup,后续有客户端进行连接时,服务端会监听到 ACCEPT 事件,进而会使用 ServerBootstrapAcceptor 做一些逻辑)- 注册完毕之后 执行 channelRegister() 事件
- 绑定
- 与注册操作一样绑定操作也是非当前 Channel 所属的 NioEventLoop 线程发起的,所以也要封装为任务,加入到
X
的 taskQueue 中(每加入队列一个任务,都会做一次 selector.wakeup 操作,起到及时执行任务的作用)
- 绑定任务的内容:pipeline.fireChannelActive()
- 执行 channelActive() 事件
- 为 NioServerSocketChannel 设置感兴趣的监听键为创建 NioServerSocketChannel 时所存储的 ACCEPT 事件
注意:
- bind 一定要发生在 register 之后,但是 Netty 中所有的执行都是异步的,register 也不例外,那么怎么保证 bind 一定发生在 register 之后,Netty 使用为 regsiter 添加执行完成的回调监听器,在该监听器中完成 bind 操作。
- 一个 Channel 的事情只能由一个 NioEventLoop 来操作,所以 EventExecutor#inEventLoop() 不是判断当前线程是不是 NioEventLoop 中的那条线程,而是判断是否是当前所操作的 Channel 所属的 NioEventLoop 的那条线程。在 accept 的操作过程中,调用注册的线程是 NioServerSocketChannel 的那条线程,而注册的 Channel 是 SocketChannel,所以还是要包装成任务,添加到 SocketChannel 所属的那条 NioEventLoop 中。
- 当 j 是 2的n次方的时候,i % j == i &(j-1),后者效率更高,PowerOfTwoEventExecutorChooser 是后者,也是默认;GenericEventExecutorChooser 是前者。
网友评论