NIO(Non-blocking I/O,在Java领域,也称为New I/O),是一种同步非阻塞的I/O模型,也是I/O多路复用的基础,已经被越来越多地应用到大型应用服务器,成为解决高并发与大量连接、I/O处理问题的有效方式。
由于Java NIO属于基础部分,本文稍微讲解下,具体请自行了解更多NIO设计。
传统BIO模型分析
以下传统的服务器端同步阻塞I/O处理(也就是BIO,Blocking I/O)的经典编程模型:
public class BioServer {
//initialize socket and input stream
private Socket socket = null;
private ServerSocket server = null;
private DataInputStream in = null;
public static void main(String args[]) {
BioServer server = new BioServer(5000);
}
// constructor with port
public BioServer(int port) {
// starts server and waits for a connection
try {
server = new ServerSocket(port);
System.out.println("Server started");
System.out.println("Waiting for a client ...");
socket = server.accept();
System.out.println("Client accepted");
// takes input from the client socket
in = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
String line = "";
// reads message from client until "Over" is sent
while (!line.equals("Over")) {
try {
line = in.readUTF();
System.out.println(line);
} catch (IOException i) {
System.out.println(i);
}
}
System.out.println("Closing connection");
// close connection
socket.close();
in.close();
} catch (IOException i) {
System.out.println(i);
}
}
}
这是一个经典的每连接每线程的模型,可以使用多线程(上面代码只有一个主现场,可以改成一个链接新建一个线程),主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。
当然一般都使用线程池,可以让线程的创建和回收成本相对较低。
不过,这个模型最本质的问题在于,严重依赖于线程。但线程是很"贵"的资源,主要表现在:
1.线程的创建和销毁成本很高,在Linux这样的操作系统中,线程本质上就是一个进程。创建和销毁都是重量级的系统函数。
2.线程本身占用较大内存,像Java的线程栈,一般至少分配512K~1M的空间,如果系统中的线程数过千,恐怕整个JVM的内存都会被吃掉一半。
3.线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统load偏高、CPU sy使用率特别高(超过20%以上),导致系统几乎陷入不可用的状态。
4.容易造成锯齿状的系统负载。因为系统负载是用活动线程数或CPU核心数,一旦线程数量高但外部网络环境不是很稳定,就很容易造成大量请求的结果同时返回,激活大量阻塞线程从而使系统负载压力过大。
对于像BAT,TMD这样的企业,每天面对千万次连接,传统的BIO模型是无能为力的,必然需要一种更高效的I/O处理模型。
NIO模型分析
我们先看一下 NIO 涉及到的关联类图,如下:
serversocketchannel.png
上图中有两个关键类:Channel 和 Selector,它们是 NIO 中两个核心概念。我们还用前面的城市交通工具来继续比喻 NIO 的工作方式,这里的 Channel 要比 Socket 更加具体,它可以比作为某种具体的交通工具,如汽车或是高铁等,而 Selector 可以比作为一个车站的车辆运行调度系统,它将负责监控每辆车的当前运行状态:是已经出战还是在路上等等,也就是它可以轮询每个 Channel 的状态。这里还有一个 Buffer 类,它也比 Stream 更加具体化,我们可以将它比作为车上的座位,Channel 是汽车的话就是汽车上的座位,高铁上就是高铁上的座位,它始终是一个具体的概念,与 Stream 不同。Stream 只能代表是一个座位,至于是什么座位由你自己去想象,也就是你在去上车之前并不知道,这个车上是否还有没有座位了,也不知道上的是什么车,因为你并不能选择,这些信息都已经被封装在了运输工具(Socket)里面了,对你是透明的。NIO 引入了 Channel、Buffer 和 Selector 就是想把这些信息具体化,让程序员有机会控制它.
下面是典型的一段 NIO 代码:
public class PlainNioServer {
public static void main(String[] args) throws Exception{
int port = 9876;
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel;
Selector selector;
serverChannel = ServerSocketChannel.open();
// 开启非阻塞模式
serverChannel.configureBlocking(false);
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
//绑定相应IP及port
ss.bind(address);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
while (true) {
try {
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
// handle in a proper way
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
System.out.println("Accept connection from " + client);
}
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
while (buffer.hasRemaining()) {
if (client.write(buffer) == 0) {
break;
}
}
System.out.println("服务器端写到客户端信息:" + buffer.toString());
client.close();
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer);
byte[] data = buffer.array();
System.out.println("服务器端收到客户端信息:" + data);
}
}
catch (IOException e) {
key.cancel();
try {
key.channel().close();
} catch (IOException ioe) {
System.out.println(ioe);
}
}
}
}
}
}
这个是NIO的服务端代码,调用 Selector 的EpollSelectorProvider创建一个选择器Selector, 创建的服务端的 ServerSocketChannel 绑定到一个 Socket 对象,并把这个通信channel注册到Selector上而且要监听OP_ACCEPT,客户端连接事件,再把channel设置为非阻塞模式并绑定9876端口。
代码可以看到,方法整体是一个死循环,轮询访问Selector,发生某些已经注册在Selector上的事件时,该方法返回。可以通过selector.selectedKeys获取发生的事件,我们通过key获取到注册它的那个Channel,在这里是ServerSocketChannel,通过server.accept()获取客户端连接,这里同样可以类比到传统的阻塞IO,在阻塞IO中我们可以通过ServerSocket.accept获取到socket,唯一不同的是,阻塞IO中的accept方法是阻塞操作,而NIO中是非阻塞的。
而轮询到的selectionKey是客户端的Writable,Readable的话,直接进行读写操作,利用NIOBuffer。
nio.png
上图中的 Selector 可以同时监听一组通信信道(Channel)上的 I/O 状态,选择器 Selector 可以调用 select() 方法检查已经注册的通信信道上的是否有 I/O 已经准备好,如果没有至少一个信道 I/O 状态有变化,那么 select 方法会阻塞等待或在超时时间后会返回 0。上图中如果有多个信道有数据,那么将会将这些数据分配到对应的数据 Buffer 中。所以关键的地方是有一个线程来处理所有连接的数据交互,每个连接的数据交互都不是阻塞方式,所以可以同时处理大量的连接请求。
优化线程模型
上面铺垫了这么多BIO,NIO知识,终于来到了我们今天的主题,Reactor模式。NIO虽然大大改善了BIO中,原来的阻塞读写(占用线程)变成了单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的I/O操作都是纯CPU操作,没有必要开启多线程。
但是现代计算机都是多核心的,如果能够利用多核心进行I/O,无疑对效率会有更大的提高。所以引入了Reactor 模式。
Reactor模式思想:分而治之+事件驱动
一. 分而治之
一个连接里完整的网络处理过程一般分为accept、read、decode、process、encode、send这几步。
Reactor模式将每个步骤映射为一个Task,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是Task,且采用非阻塞方式执行。
二. 事件驱动
每个Task对应特定网络事件。当Task准备就绪时,Reactor收到对应的网络事件通知,并将Task分发给绑定了对应网络事件的Handler执行。
三. 几个角色
Reactor:负责响应事件,将事件分发给绑定了该事件的Handler处理;
Handler:事件处理器,绑定了某类事件,负责执行对应事件的Task对事件进行处理;
Acceptor:Handler的一种,绑定了connect事件。当客户端发起connect请求时,Reactor会将accept事件分发给Acceptor处理。
单线程Reactor
单线程Reactor.png1. 优点:
不需要做并发控制,代码实现简单清晰。
2. 缺点:
a. 不能利用多核CPU;
b. 一个线程需要执行处理所有的accept、read、decode、process、encode、send事件,当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了). 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。
下面是单线程Reactor的代码:
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
int port;
public Reactor(int port) throws IOException {
this.port = port;
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
sk.attach(new Acceptor(this));
System.out.println("-->attach(new Acceptor()!");
}
@Override
public void run() {
while (true) {
try {
selector.select();
} catch (IOException e) {
e.printStackTrace();
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> it = readyKeys.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next()));
}
readyKeys.clear();
}
}
//运行Acceptor或handler
private void dispatch(SelectionKey sk) {
Runnable task = (Runnable) sk.attachment();
if (task != null) {
task.run();
}
}
}
public class Acceptor implements Runnable {
private Reactor reactor;
public Acceptor(Reactor reactor) {
this.reactor = reactor;
}
@Override
public void run() {
try {
SocketChannel socketChannel = reactor.serverSocketChannel.accept();
if (socketChannel != null) {
// 调用Handler来处理channel, 并向Reactor注册此Handler
new NioHandler(reactor.selector, socketChannel);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class NioHandler implements Runnable {
final SocketChannel socketChannel;
final SelectionKey selectionKey;
ByteBuffer input = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
String clientName = "";
NioHandler(Selector selector, SocketChannel c) throws IOException {
socketChannel = c;
c.configureBlocking(false);
selectionKey = socketChannel.register(selector, 0);
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
@Override
public void run() {
ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
inputBuffer.clear();
try {
int n = socketChannel.read(inputBuffer);
if (n != -1) {
byte[] data = new byte[n];
System.arraycopy(inputBuffer.array(), 0, data, 0, n);
System.out.println("Output: " + new String(data) + "\n");
}
System.out.println("Finish......");
} catch (IOException e) {
e.printStackTrace();
}
}
}
多线程Reactor
多线程Reactor.png特点:
a. Reactor 的多线程模型与单线程模型的区别就是 acceptor 是一个单独的线程处理, 并且有一组特定的 NIO 线程来负责各个客户端连接的 IO 操作
b. 有专门一个线程, 即 Acceptor 线程用于监听客户端的TCP连接请求.
c. 客户端发送的IO操作由一个worker reactor线程池负责,由线程池中的NIO线程负责监听客户端SocketChannel事件,进行消息的读取、解码、编码和发送。
d. 一个NIO线程可以同时处理N条链路,但是一个链路只注册在一个NIO线程上处理,防止发生并发操作问题。
4、主从多线程Reactor
主从多线程Reactor.png在绝大多数场景下,Reactor多线程模型都可以满足性能需求;但是在极个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。
特点:
a. 服务端用于接收客户端连接的不再是个1个单独的reactor线程,而是一个boss reactor线程池;
b. 服务端启用多个ServerSocketChannel监听不同端口时,每个ServerSocketChannel的监听工作可以由线程池中的一个NIO线程完成。
二、Netty线程模型
Netty线程模型.pnga. netty线程模型采用“服务端监听线程”和“IO线程”分离的方式,与多线程Reactor模型类似。
b. netty服务器端的 ServerSocketChannel 只绑定到了 bossGroup 中的一个线程, 因此在调用 Java NIO 的 Selector.select 处理客户端的连接请求时, 实际上是在一个线程中的, 所以对只有一个服务的应用来说, bossGroup 设置多个线程是没有什么作用的, 反而还会造成资源浪费.
c. Netty线程模型是多Reactor模型,bossGroup一个线程,workerGroup多个线程(cpu核数*2)。
网友评论