20201022_Nio单线程模型入门
1概述
现在稳定推荐使用的主流版本还是Netty4,Netty5 中使用了 ForkJoinPool,增加了代码的复杂度,但是对性能的改善却不明显,所以这个版本不推荐使用,官网也没有提供下载链接。
高性能通信中间件Netty典型应用:高性能web服务器Nginx、高性能缓存服务器Redis。本节从简单的java nio入门开始,后续重点涉及如下知识点:
-
IO线程模型演进(单线程、主从2个线程、主从多线程)
-
NIO多路复用原理
-
NIO核心组件:selector、channel、buffer
-
netty核心组件:
Channel(NioServerSocketChannel(服务端)和NioSocketChannel(客户端))、ChannelFuture、EventLoop(Netty 中最核心的概念)、ChannelHandler 和 ChannelPipeline
-
netty串形化处理读写:借助pipe避免使用锁带来的性能开销。
EventLoop 的主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。
并且 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,即 Thread 和 EventLoop 属于 1 : 1 的关系,从而保证线程安全。
-
netty内存零拷贝(堆外内存):申请的内存可以重用,主要指直接内存。内部实现是用一颗二叉查找树管理内存分配情况。
-
netty高性能序列化协议:支持 protobuf 等高性能序列化协议。
-
netty服务端、客户端启动过程
1.1线程模型
1.1.1单线程模型
2021052711094668.png所谓单线程,即反应器和处理器在同一个线程中,new Runnable()只是生成对象。
所有I/O操作都由一个线程完成,即多路复用Selector、事件分发Dispatcher和处理Handler都是在一个Reactor线程上完成的。既要接收客户端的连接请求,向服务端发起连接,又要发送/读取请求或应答/响应消息。
一个NIO 线程同时处理成百上千的链路,性能上无法支撑,速度慢,若线程进入死循环,整个程序不可用,对于高负载、大并发的应用场景不合适。
1.1.2多线程模型
有一个NIO 线程(Acceptor) 只负责监听服务端,接收客户端的TCP 连接请求;NIO 线程池负责网络IO 的操作,即消息的读取、解码、编码和发送;1 个NIO 线程可以同时处理N 条链路,但是1 个链路只对应1 个NIO 线程,这是为了防止发生并发操作问题。但在并发百万客户端连接或需要安全认证时,一个Acceptor 线程可能会存在性能不足问题。
1.1.3主从多线程模型
Acceptor 线程用于绑定监听端口,接收客户端连接,将SocketChannel 从主线程池的Reactor 线程的多路复用器上移除,重新注册到Sub 线程池的线程上,用于处理I/O 的读写等操作,从而保证mainReactor只负责接入认证、握手等操作;
1.1.4Netty线程模型
Netty通过Reactor模型基于多路复用器接收并处理用户请求,内部实现了两个线程池,
boss线程池和work线程池,其中boss线程池的线程负责处理请求的accept事件,当接收到accept事件的请求时,把对应的socket封装到一个NioSocketChannel中,并交给work线程池,
其中work线程池负责请求的read和write事件(即IO读写事件),由对应的Handler处理(即逻辑处理)。
20210527110947446.png1.2Reactor反应器模式
Reactor模式也叫Dispatcher模式,即I/O多了复用统一监听事件,收到事件后分发(Dispatch给某进程),是编写高性能网络服务器的必备技术之一。
Reactor模型中有2个关键组成:
- Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。 它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人
- Handlers 处理程序执行I/O事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作
1.3关于selector.select()
线程启动时调用SingleThreadEventExecutor的构造方法,执行NioEventLoop类的run方法,首先会调用hasTasks()方法判断当前taskQueue是否有元素。如果taskQueue中有元素,执行 selectNow() 方法,最终执行selector.selectNow(),该方法会立即返回。如果taskQueue没有元素,执行 select(oldWakenUp) 方法
select(oldWakenUp):解决了nio中的bug,selectCnt记录selector.select()执行次数和标识是否执行过selector.selectNow(),若触发epool空轮询bug,则会返回执行selector.selector(timeoutMillis),变量selectCnt会逐渐变大,当selectCnt达到阈值(512),会进行selector重建,解决cpu占用100%的bug。
1.3.1netty在NIOEventLoopGroup源码中如何解决的?
rebuildSelector方法先通过openSelector方法创建一个新的selector。然后将old selector的selectionKey执行cancel。最后将old selector的channel重新注册到新的selector中。rebuild后,需要重新执行方法selectNow,检查是否有已ready的selectionKey。
接下来调用processSelectedKeys 方法(处理I/O任务),当selectedKeys != null时,调用processSelectedKeysOptimized方法,迭代 selectedKeys 获取就绪的 IO 事件的selectkey存放在数组selectedKeys中, 然后为每个事件都调用 processSelectedKey 来处理它,processSelectedKey 中分别处理OP_READ;OP_WRITE;OP_CONNECT事件。
非IO任务:最后调用runAllTasks方法(非IO任务),该方法首先会调用fetchFromScheduledTaskQueue方法,把scheduledTaskQueue中已经超过延迟执行时间的任务移到taskQueue中等待被执行,
IO任务:然后依次从taskQueue中取任务执行,每执行64个任务,进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。
每个NioEventLoop对应一个线程和一个Selector,NioServerSocketChannel会主动注册到某一个NioEventLoop的Selector上,NioEventLoop负责事件轮询。
1.4NioEventLoop
1.4.1每个Boss NioEventLoop循环执行的任务包含3步
- 1 轮询accept事件
- 2 处理accept I/O事件,与Client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个Worker NioEventLoop的Selector上
- 3 处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用eventloop.execute或schedule执行的任务,或者其它线程提交到该eventloop的任务。
1.4.2每个Worker NioEventLoop循环执行的任务包含3步
- 1 轮询read、write事件;
- 2 处I/O事件,即read、write事件,在NioSocketChannel可读、可写事件发生时进行处理
- 3 处理任务队列中的任务,runAllTasks。
1.5任务队列中的task有3种典型使用场景
- 1 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
//...
}
});
- 2 非当前reactor线程调用channel的各种方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的channel引用,然后调用write类方法向该用户推送消息,就会进入到这种场景。最终的write会提交到任务队列中后被异步消费。
- 3 用户自定义定时任务
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
}
}, 60, TimeUnit.SECONDS);
2代码实战(单线程版本)
2.1maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
2.2配置
2.3服务端
2.3.1SingleThreadEchoServerReactor
package com.kikop.myreactor.singlethreadapp.server;
import com.kikop.myreactor.singlethreadapp.config.NioDemoConfig;
import com.kikop.myreactor.singlethreadapp.server.accept.BossHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: EchoServerReactor
* @desc 单线程反应器(从头至尾, 只有一个线程)
* 问题:
* 1.未完全解决网络通信中的粘包、半包问题
* 2.发送次数与接收次数不对应
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class SingleThreadEchoServerReactor implements Runnable {
private ServerSocketChannel serverSocket;
private Selector selector;
/**
* 单线程版构造函数
*
* @throws IOException
*/
public SingleThreadEchoServerReactor() throws IOException {
// Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
// 绑定本地端口
serverSocket.socket().bind(address);
// 非阻塞
serverSocket.configureBlocking(false);
// 接收accept连接事件
// 服务端选择器(先注册连接接收OP_ACCEPT事件)
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// 绑定 SelectionKey的附件对象:BossHandler
// 注意:BossHandler 仍然和主 runnable同一线程
sk.attach(new BossHandler(selector, serverSocket));
System.out.println("服务端启动成功!");
}
/**
* 主线程监听轮询事件
* 等同于netty中 boss线程
* 1.连接(rdList就绪事件列表)
* 2.获得就绪IO事件并完成事件分发
*/
public void run() {
try {
while (!Thread.interrupted()) {
// 1.阻塞等待
selector.select();
// 2.获得就绪事件列表
Set<SelectionKey> selected = selector.selectedKeys();
// 3.遍历并进行分发
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
// Reactor负责 dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
// 4.置空,准备下一次遍历
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
/**
* 获得就绪IO事件并完成事件分发
*
* @param sk
*/
void dispatch(SelectionKey sk) {
// 1.获取 handler
// 1.1.如果是 ServerSocketChannel(代表服务端),则handler为 BossHandler
// 1.2.如果是 SocketChannel(代表连接的某个客户端网络通道),则handler为 MyWorkHandler
Runnable handler = (Runnable) sk.attachment();
// 调用之前 attach附件:BossHandler或 MyWorkHandler,执行选择键的 handler处理器对象
if (handler != null) {
System.out.println("dispatch:"+handler.getClass().toString());
handler.run();
}
}
public static void main(String[] args) throws IOException {
// 单线程版本:从头至尾,只有一个线程
new Thread(new SingleThreadEchoServerReactor()).start();
}
}
2.3.2BossHandler
package com.kikop.myreactor.singlethreadapp.server.accept;
import com.kikop.myreactor.singlethreadapp.server.handler.MyWorkHandler;
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: BossHandler
* @desc Boss连接处理器,等同 于netty:bossNioEventLoop
* boosGroup用于Accetpt连接建立事件并分发请求
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class BossHandler implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocket;
public BossHandler(Selector selector, ServerSocketChannel serverSocket) {
this.selector = selector;
this.serverSocket = serverSocket;
}
/**
* 事件分发 BossHandler 对应的处理逻辑
* 构建 MyWorkHandler,如果是多线程,则会将此时的 SocketChannel 注册到另外一个 selector
*/
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null) {
new MyWorkHandler(selector, channel);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.3.3MyWorkHandler
package com.kikop.myreactor.singlethreadapp.server.handler;
import com.kikop.util.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: EchoServerReactor
* @desc Worker处理器, 等同 于netty:workNioEventLoop
* 执行读写的业务处理逻辑(主线程main中)
* workerGroup用于处理I/O读写事件和业务逻辑
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class MyWorkHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk; // 选择键
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
/**
* MyWorkHandler
*
* @param selector
* @param c 和某个客户端建立的 SocketChannel
* @throws IOException
*/
public MyWorkHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
// 取得选择键设置感兴趣的IO事件
// 注意,此时 selector 中注册的通道有两种:
// 1.ServerSocketChannel
// 2.SocketChannel
sk = channel.register(selector, 0);
// 将 this:MyWorkHandler作为选择键的附件,
// 用于事件 dispatch
sk.attach(this);
// 注册Read就绪事件:服务端选择器(然后注册IO读-->写事件)
sk.interestOps(SelectionKey.OP_READ);
// 使尚未返回的第一个选择操作立即返回,唤醒的
// 原因是:注册了新的channel或者事件;channel关闭,取消注册;优先级更高的事件触发(如定时器事件),希望及时处理。
selector.wakeup();
}
/**
* SingleThreadEchoServerReactor主线程.dispatch来触发
* 处理IO读写事件和业务逻辑
*/
public void run() {
try {
if (state == SENDING) {
// 写入通道
channel.write(byteBuffer);
System.out.println("server send:" + new String(byteBuffer.array(), 0, byteBuffer.array().length));
// 写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
// 写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
// 写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
// 从通道读
int length = 0;
// 注意:channel.read 定义 1024,kernel内核有个socket网络缓冲区
// 数据很少可能会出现粘包
// 数据很少可能会出现拆包
while ((length = channel.read(byteBuffer)) > 0) {
// 打印:模仿业务逻辑
//Logger.info(new String(byteBuffer.array(), 0, length));
System.out.println("server recv:" + new String(byteBuffer.array(), 0, length));
}
// 读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
// 读完后,注册 write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
// 读完后,进入发送的状态
state = SENDING;
}
// 处理结束了, 这里不能关闭select key,需要重复使用
// sk.cancel();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
2.4客户端
2.4.1EchoClient
package com.kikop.myreactor.singlethreadapp.client.handler;
import com.kikop.myreactor.singlethreadapp.config.NioDemoConfig;
import com.kikop.util.Dateutil;
import com.kikop.util.Logger;
import com.kikop.util.Print;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: MyTaskRunnable
* @desc 客户端业务处理线程
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class MyTaskRunnable implements Runnable {
// 客户端选择器(提供通道的IO读、写事件的注册)
private final Selector selector;
private final SocketChannel channel;
/**
* MyTaskRunnable
*
* @param channel
* @throws IOException
*/
public MyTaskRunnable(SocketChannel channel) throws IOException {
selector = Selector.open(); // Reactor初始化
this.channel = channel;
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
// 获取就绪事件列表,底层用两个数组进行筛选,Buffer有读写事件,操作系统产生中断
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isWritable()) { // 通道对应的缓存可写
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
Scanner scanner = new Scanner(System.in);
Print.tcfo("请输入发送内容:");
if (scanner.hasNext()) {
SocketChannel socketChannel = (SocketChannel) sk.channel();
String next = scanner.next();
// 先写数据到buffer
buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
// buffer切为读
buffer.flip();
// 发送数据
// 通过 DatagramChannel数据报通道
socketChannel.write(buffer);
// 清空缓存
buffer.clear();
}
}
if (sk.isReadable()) { // 通道对应的缓存可读
// 若选择键的IO事件是“可读”事件,读取数据
SocketChannel socketChannel = (SocketChannel) sk.channel();
//读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
// 注意:socketChannel.read需要等到一定的数量
while ((length = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip(); // 方便下一次写数据到buffer
Logger.info("client recv:" + new String(byteBuffer.array(), 0, length));
// 业务处理完成,清空缓存
byteBuffer.clear();
}
}
// 处理结束了, 这里不能关闭 select key,需要重复使用
// selectionKey.cancel();
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
2.4.2MyTaskRunnable
package com.kikop.myreactor.singlethreadapp.client.handler;
import com.kikop.myreactor.singlethreadapp.config.NioDemoConfig;
import com.kikop.util.Dateutil;
import com.kikop.util.Logger;
import com.kikop.util.Print;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: MyTaskRunnable
* @desc 客户端业务处理线程
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class MyTaskRunnable implements Runnable {
// 客户端选择器(提供通道的IO读、写事件的注册)
private final Selector selector;
private final SocketChannel channel;
/**
*
*
* @param channel
* @throws IOException
*/
public MyTaskRunnable(SocketChannel channel) throws IOException {
selector = Selector.open(); // Reactor初始化
this.channel = channel;
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
// 获取就绪事件列表,底层用两个数组进行筛选,Buffer有读写事件,操作系统产生中断
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isWritable()) { // 通道对应的缓存可写
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
Scanner scanner = new Scanner(System.in);
Print.tcfo("请输入发送内容:");
if (scanner.hasNext()) {
SocketChannel socketChannel = (SocketChannel) sk.channel();
String next = scanner.next();
// 先写数据到buffer
buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
// buffer切为读
buffer.flip();
// 发送数据
// 通过 DatagramChannel数据报通道
socketChannel.write(buffer);
// 清空缓存
buffer.clear();
}
}
if (sk.isReadable()) { // 通道对应的缓存可读
// 若选择键的IO事件是“可读”事件,读取数据
SocketChannel socketChannel = (SocketChannel) sk.channel();
//读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
// 注意:socketChannel.read需要等到一定的数量
while ((length = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip(); // 方便下一次写数据到buffer
Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));
// 业务处理完成,清空缓存
byteBuffer.clear();
}
}
// 处理结束了, 这里不能关闭 select key,需要重复使用
// selectionKey.cancel();
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
总结
1.1优化演进
模式演进分为2个方面:
1)升级Refactor反应器。引入多个selector选择器,提升选择大量通道的能力。
2)升级Handler处理器。既要使用多线程,又要尽可能的高效率,则可以使用线程池。
总体如下:
1)将负责输入输出处理的IO Handler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责服务监听和IO事件查询的反应器线程相隔离,避免服务器的连接监听收到阻塞。
2)如果服务器为多核CPU,可以将反应器线程拆分为多个子反应器线程;同时,引入多个选择器,每一个反应器线程负责一个选择器,这样,充分释放系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力。
参考
1大规模分布式系统架构与设计实战(彭渊)
2 fourinone分布式协调设计解析
3Netty面试题(2021 最新版)
4阿里大牛总结的Netty最全常见面试题,面试再也不怕被问Netty了
https://www.zhihu.com/column/p/148726453?utm_medium=social&utm_source=weibo
网友评论