概念
- IO指的是输入输出,主要是指文件的IO和网络的IO。
- 对于java程序来说,分成BIO,NIO,AIO。
- 各种IO的操作流程:
1.向操作系统请求数据。
2.操作系统将外部数据复制到内核缓冲区。
3.操作系统将缓冲区数据复制到进程缓冲区。
4.进程获取数据,完成IO操作。
BIO
- BIO是最早的阻塞IO,是同步阻塞IO,对应操作系统调用socket的函数,这些函数都是阻塞的,所以在BIO中,读取文件,等待网络连接,等到socket数据,这些操作统统都是阻塞的。
NIO
- 因为BIO的阻塞特性,所以每次网络操作,包括等待连接,获取数据,发送数据,都是阻塞的,给变成带来很大麻烦,只能是针对每一个网络客户端创建新的线程来处理,带来了严重的系统负担。
- NIO是基于操作的系统的select函数,linux的poll函数进行调用,针对于IO操作流程的第二点和第三点,这两部操作对于java进程来说,都是非阻塞的。所以可以实现为:向 系统注册了感兴趣的事件后,只要检查这些事件是否完成,即可得到所需的数据。
- 这时候可能读者要问:在我们写NIO的时候,也需要while(true)啊,如下代码:
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8090));
serverSocketChannel.configureBlocking(false);
//向通道注册选择器,并且注册接受事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//获取已经准备好的通道数量
int readyChannels = selector.selectNow();
//如果没准备好,重试
if (readyChannels == 0) continue;
//获取准备好的通道中的事件集合
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = (SelectionKey) keyIterator.next();
if (key.isAcceptable()) {
这里注册的是accept事件,
} else if (key.isConnectable()) {
} else if (key.isReadable()) {
} else if (key.isWritable()) {
}
//注意每次迭代末尾的keyIterator.remove()调用。
//Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。
//下次该通道变成就绪时,Selector会再次将其放入已选择键集中
keyIterator.remove();
}
}
这里第一个while(true)难道不会把进程阻塞起来?
其实这里的阻塞和操作系统调用时候的阻塞并不是同一个含义,NIO的非阻塞指的是客户端链接,客户端数据的读写这些操作是非阻塞的,也就是说我们的java进程不需要在等待客户端链接的时候阻塞,而是有客户端链接之后,我们的进程就会可以拿到这个链接,注意这里是可以拿到,而并不是已经拿到,并没有回调的功能,如果注册了回调,客户端链接上来我们的回调函数执行,这就是AIO的范畴了。
所以,我们可以拿到链接,可以拿到客户端发来的数据,可以写数据。但是这只是可以,具体的拿这个操作,还是需要自己完成的。
这里可以想象一个极端的例子,客户端链接成功能后,五分钟内肯定会发来一条消息,那么我们真的可以不要代码中的while(true)
,而是等五分钟后,直接从selector中获取数据。
- 如果像想使用NIO一样使用BIO呢? 等待连接是阻塞的,这里没办法了。链接上来之后保存这些链接,每个链接一个线程,然后阻塞的read,哪个线程read完成了,告诉另一个控制线程说我读完了。然后控制线程while(true)轮询,拿到各个客户端的数据。这里的每个客户端线程的阻塞read,read之后的挂载数据,其实就是NIO中操作系统代替我们完成的工作。
AIO
- 根据NIO的while(true), 如果不需要我们自己拿已完成的事件和数据,而是提供回调函数,等客户端链接完成或者数据收到,自动执行回调函数,这就是AIO了。这时候就真的是异步操作了。
Reactor模型
- 经过之前的基础概念的说明,可以看出NIO使用一个线程就可以完成所有感兴趣事件的注册和轮询了,那么NIO该如果设计和优化我们的服务端代码呢?
- 自POSA2 中的关于Reactor Pattern 介绍中,我们了解了Reactor 的处理方式:
1.同步的等待多个事件源到达(采用select()实现)
2.将事件多路分解以及分配相应的事件服务进行处理,这个分派采用server集中处理(dispatch)
3.分解的事件以及对应的事件服务应用从分派服务中分离出去(handler)
- 为何要是用reactor模型?
先来看一段NIO的代码:
public NIOServer(int port) throws Exception {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
//阻塞等待事件
selector.select();
// 事件列表
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
it.remove();
//分发事件
dispatch((SelectionKey) (it.next()));
}
} catch (Exception e) {
}
}
}
private void dispatch(SelectionKey key) throws Exception {
if (key.isAcceptable()) {
register(key);//新链接建立,注册
} else if (key.isReadable()) {
read(key);//读事件处理
} else if (key.isWritable()) {
wirete(key);//写事件处理
}
}
private void register(SelectionKey key) throws Exception {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
// 获得和客户端连接的通道
SocketChannel channel = server.accept();
channel.configureBlocking(false);
//客户端通道注册到selector 上
channel.register(this.selector, SelectionKey.OP_READ);
}
我们可以看到上述的NIO例子已经差不多拥有reactor的影子了:
1.基于事件驱动-> selector(支持对多个socketChannel的监听)
2.统一的事件分派中心-> dispatch
3.事件处理服务-> read & write
- 事实上NIO已经解决了上述BIO暴露的一些问题了,服务器的并发客户端有了量的提升,不再受限于一个客户端一个线程来处理,而是一个线程可以维护多个客户端(selector 支持对多个socketChannel 监听)。但这依然不是一个完善的Reactor Pattern ,首先Reactor 是一种设计模式,好的模式应该是支持更好的扩展性,显然以上的并不支持,另外好的Reactor Pattern 必须有以下特点:
1.更少的资源利用,通常不需要一个客户端一个线程
2.更少的开销,更少的上下文切换以及locking
3.能够跟踪服务器状态
4.能够管理handler 对event的绑定
那么好的Reactor Pattern应该是怎样的?
首先我们基于Reactor Pattern 处理模式中,定义以下三种角色:
- Reactor将I/O事件分派给对应的Handler
- Acceptor处理客户端新连接,并分派请求到处理器链中
- Handlers执行非阻塞读/写 任务
单Reactor单线程模型
image.png/**
* 等待事件到来,分发事件处理
*/
class Reactor implements Runnable {
private Reactor() throws Exception {
SelectionKey sk =
serverSocket.register(selector,
SelectionKey.OP_ACCEPT);
// attach Acceptor 处理新连接
sk.attach(new Acceptor());
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
it.remove();
//分发事件处理
dispatch((SelectionKey) (it.next()));
}
}
} catch (IOException ex) {
//do something
}
}
void dispatch(SelectionKey k) {
// 若是连接事件获取是acceptor
// 若是IO读写事件获取是handler
Runnable runnable = (Runnable) (k.attachment());
if (runnable != null) {
runnable.run();
}
}
}
/**
* 连接事件就绪,处理连接事件
*/
class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {// 注册读写
new Handler(c, selector);
}
} catch (Exception e) {
}
}
}
/**
* 处理读写业务逻辑
*/
class Handler implements Runnable {
public static final int READING = 0, WRITING = 1;
int state;
final SocketChannel socket;
final SelectionKey sk;
public Handler(SocketChannel socket, Selector sl) throws Exception {
this.state = READING;
this.socket = socket;
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);
socket.configureBlocking(false);
}
@Override
public void run() {
if (state == READING) {
read();
} else if (state == WRITING) {
write();
}
}
private void read() {
process();
//下一步处理写事件
sk.interestOps(SelectionKey.OP_WRITE);
this.state = WRITING;
}
private void write() {
process();
//下一步处理读事件
sk.interestOps(SelectionKey.OP_READ);
this.state = READING;
}
/**
* task 业务处理
*/
public void process() {
//do something
}
}
这是最基本的单Reactor单线程模型。其中Reactor线程,负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor进行处理,有IO读写事件之后交给hanlder 处理。Acceptor主要任务就是构建handler ,在获取到和client相关的SocketChannel之后 ,绑定到相应的hanlder上,对应的SocketChannel有读写事件之后,基于reactor 分发,hanlder就可以处理了(所有的IO事件都绑定到selector上,有Reactor分发)。该模型 适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。
单Reactor多线程模型
image.png/**
* 多线程处理读写业务逻辑
*/
class MultiThreadHandler implements Runnable {
public static final int READING = 0, WRITING = 1;
int state;
final SocketChannel socket;
final SelectionKey sk;
//多线程处理业务逻辑
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
this.state = READING;
this.socket = socket;
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);
socket.configureBlocking(false);
}
@Override
public void run() {
if (state == READING) {
read();
} else if (state == WRITING) {
write();
}
}
private void read() {
//任务异步处理
executorService.submit(() -> process());
//下一步处理写事件
sk.interestOps(SelectionKey.OP_WRITE);
this.state = WRITING;
}
private void write() {
//任务异步处理
executorService.submit(() -> process());
//下一步处理读事件
sk.interestOps(SelectionKey.OP_READ);
this.state = READING;
}
/**
* task 业务处理
*/
public void process() {
//do IO ,task,queue something
}
}
相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。
多Reactor多线程模型
image.png/**
* 多work 连接事件Acceptor,处理连接事件
*/
class MultiWorkThreadAcceptor implements Runnable {
// cpu线程数相同多work线程
int workCount =Runtime.getRuntime().availableProcessors();
SubReactor[] workThreadHandlers = new SubReactor[workCount];
volatile int nextHandler = 0;
public MultiWorkThreadAcceptor() {
this.init();
}
public void init() {
nextHandler = 0;
for (int i = 0; i < workThreadHandlers.length; i++) {
try {
workThreadHandlers[i] = new SubReactor();
} catch (Exception e) {
}
}
}
@Override
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {// 注册读写
synchronized (c) {
// 顺序获取SubReactor,然后注册channel
SubReactor work = workThreadHandlers[nextHandler];
work.registerChannel(c);
nextHandler++;
if (nextHandler >= workThreadHandlers.length) {
nextHandler = 0;
}
}
}
} catch (Exception e) {
}
}
}
/**
* 多work线程处理读写业务逻辑
*/
class SubReactor implements Runnable {
final Selector mySelector;
//多线程处理业务逻辑
int workCount =Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(workCount);
public SubReactor() throws Exception {
// 每个SubReactor 一个selector
this.mySelector = SelectorProvider.provider().openSelector();
}
/**
* 注册chanel
*
* @param sc
* @throws Exception
*/
public void registerChannel(SocketChannel sc) throws Exception {
sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
}
@Override
public void run() {
while (true) {
try {
//每个SubReactor 自己做事件分派处理读写事件
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
read();
} else if (key.isWritable()) {
write();
}
}
} catch (Exception e) {
}
}
}
private void read() {
//任务异步处理
executorService.submit(() -> process());
}
private void write() {
//任务异步处理
executorService.submit(() -> process());
}
/**
* task 业务处理
*/
public void process() {
//do IO ,task,queue something
}
}
第三种模型比起第二种模型,是将Reactor分成两部分:
- mainReactor负责监听server socket,用来处理新连接的建立,将建立的socketChannel指定注册给subReactor。
- subReactor维护自己的selector, 基于mainReactor 注册的socketChannel多路分离IO读写事件,读写网 络数据,对业务处理的功能,另其扔给worker线程池来完成。
第三种模型中,我们可以看到,mainReactor 主要是用来处理网络IO 连接建立操作,通常一个线程就可以处理,而subReactor主要做和建立起来的socket做数据交互和事件业务处理操作,它的个数上一般是和CPU个数等同,每个subReactor一个县城来处理。此种模型中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大量的提升,支持的可并发客户端数量可达到上百万级别。关于此种模型的应用,目前有很多优秀的矿建已经在应用了,比如mina 和netty 等。上述中去掉线程池的第三种形式的变种,也 是Netty NIO的默认模式。下一节我们将着重讲解netty的架构模式。
网友评论