Neety 是 Reactor 模型的一个实现,那么什么是 Reactor 模型呢?
关于 Reactor 线程模型
- Reactor 模型是基于事件驱动的,特别适合处理海量的 I/O 事件
-
单线程模型:
- 单线程模型指的是所有的 I/O 操作都是在同一个 NIO 线程上面完成,由于 Reactor 模型使用的是 NIO,I/O 操作不会导致阻塞,理论上一个线程可以独立处理所有 I/O 相关的操作
- 从架构层面看,一个 NIO 线程确实可以完成其承担的职责,例如通过 Acceptor 类接收客户端的 TCP 连接请求,链路建立成功后通过 Dispatch 将对应的 ByteBuffer 派发到指定的 Handler 上进行消息处理并响应客户端
- 但一个 NIO 线程同时处理成百上千的链路,性能上无法支撑,即便 NIO 线程的 CPU 符合达到 100%,也无法满足海量消息处理。当负荷后处理速度变慢,导致大量客户端连接超时,最终导致大量消息积压和超时。且一旦 NIO 线程发生故障则会导致整个通信模块不可用
-
多线程模型:
- 多线程模型与单线程模型最大区别就是有一组 NIO 线程处理 I/O 操作
- 有专门一个 NIO 线程(Acceptor)用于接收客户端 TCP 连接请求,读写 I/O 操作由一个 NIO 线程池负责
- 一个 NIO 线程可以同时处理 N 条链路,但一个链路只对应一个 NIO 线程,防止并发操作问题
-
主从多线程模型:
- 绝大多数场景下,多线程模型都可以满足性能需求,但是再极个别特殊场景中,一个 NIO 线程处理客户端连接请求可能会存在性能问题,例如百万客户端连接,在这种情况下单独一个 Acceptor 线程可能会存在性能问题,为了解决性能问题,产生了主从多线程模型
- 它的特点是:服务端用于接收客户端连接的不再是单独一个 NIO 线程,而是一个独立的 NIO 线程池
- Acceptor 接收到客户端 TCP 连接请求处理完成后(可能包含接入认证等),将新创建的 SocketChannel 注册到 I/O 线程池的某个线程上,由它负责 SocketChannel 的读写和编解码工作
- Acceptor 线程池仅仅只用于客户端的登陆、握手和安全认证,由 I/O 线程负责后续的 I/O 操作
你可以先这样理解,EventLoop 是一个线程、EventLoopGroup 本质是一个线程池
NioEventLoopGroup 与 Reactor 线程模型的对应
- 上面介绍了三种 Reactor 的线程模型, 那么它们和 NioEventLoopGroup 又有什么关系呢? 其实, 不同的设置 NioEventLoopGroup 的方式就对应了不同的 Reactor 的线程模型
-
单线程模型
- 下面代码实例化了一个 NioEventLoopGroup,构造器参数为 1(表示线程池大小)
EventLoopGroup bossGroup = new NioEventLoopGroup(1); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup) .channel(NioServerSocketChannel.class) ...
- 值得注意的是这里只使用了一个 bossGorup,通过查看 ServerBootstrap 重写的 group 方法可以得知,当传入一个 group 时,那么 bossGourp 和 workGourp 就是同一个 NioEventLoopGroup,且这个 NioEventLoopGroup 只有一个线程,这样就会导致 Netty 中的 acceptor 和后续的所有客户端连接的 I/O 操作都是在同一个线程中处理的。就相当于 Reactor 单线程模型
@Override public ServerBootstrap group(EventLoopGroup group) { return group(group, group); }
-
多线程模型
- bossGroup 中只有一个线程, 而 workerGroup 中的线程是 CPU 核心数乘以2, 对应 Reactor 多线程模型
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ...
-
主从多线程模型
EventLoopGroup bossGroup = new NioEventLoopGroup(4); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ...
- 但 Netty 没有使用主从多线程模型,服务器端的 ServerSocketChannel 只绑定到了 bossGroup 中的一个线程, 因此在调用 Java NIO 的 Selector.select 处理客户端的连接请求时, 实际上是在一个线程中的, 所以对只有一个服务的应用来说, bossGroup 设置多个线程是没有什么作用的, 反而还会造成资源浪费
NioEventLoopGroup
NioEventLoopGroup- EventLoopGroup(其实是 MultithreadEventExecutorGroup)内部维护一个类型为 EventExecutor 的数组,其大小时 nThreads,这样就构成了一个线程池
- 如果我们在实例化 NioEventLoopGroup 时指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2
- 抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例
- NioEventLoop 属性:
- SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过 SelectorProvider.provider() 获取一个 SelectorProvider
- Selector selector 属性: NioEventLoop 构造器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.
NioEventLoop
- NioEventLoop 肩负着两种任务:
- 执行与 Channel 相关的 I/O 操作, 包括 调用 select 等待就绪的 I/O 事件、读写数据与数据的处理等
- 作为任务队列, 执行 taskQueue 中的任务, 例如调用 eventLoop.schedule 提交的定时任务也是这个线程执行的
- Netty 中, 每个 Channel 都有且仅有一个 EventLoop 与之关联
NioEventLoop 类层次结构
NioEventLoop- NioEventLoop 继承链如下:
NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor
- AbstractScheduledEventExecutor 中实现了 NioEventLoop 的 schedule 功能,即我们可以通过 NioEventLoop 实例的 schedule 方法来运行一些定时任务
- SingleThreadEventExecutor 内部通过 threadFactory.newThread 创建了一个新的 Java 线程,一个 NioEventLoop 和一个特定的线程绑定,且在其生命周期内都不会改变。这个线程中所做的事情主要就是调用其 run() 方法
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 省略清理代码
...
}
}
});
threadProperties = new DefaultThreadProperties(thread);
taskQueue = newTaskQueue();
}
- SingleThreadEventLoop 实现了任务队列的功能, 通过它我们可以调用 NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.
EventLoop 的启动
- NioEventLoop 本身就是一个 SingleThreadEventExecutor, 因此 NioEventLoop 的启动, 其实就是 NioEventLoop 所绑定的本地 Java 线程的启动
- 从代码中搜索, thread.start() 被封装到 SingleThreadEventExecutor.startThread() 方法中
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
thread.start();
}
}
}
- 而 startThread 是在 SingleThreadEventExecutor.execute 方法中调用的
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread(); // 调用 startThread 方法, 启动EventLoop 线程.
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
- 当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动.
网友评论