图解IO模型——BIO,NIO,AIO
服务架构模式——TPC/PPC模式
服务架构模式——单Reactor模式
服务架构模式——多Reactor模式
服务架构模式——Proactor模式
Reactor基于NIO,本质上是事件驱动模式。服务端Reactor线程监听多路IO事件(accept,read,write等),把收到的事件分发给不同的类进行处理。
单Reactor单线程模式
只有一个Reactor线程负责处理多个连接,Reactor线程负责处理连接、IO事件以及业务逻辑。
服务端初始化
- ServerSocketChannel监听端口
- ServerSocketChannel向selector注册,关注Accept事件
- 当收到Accept事件,调用dispatch分发给Acceptor处理
public class Reactor implements Runnable {
private Charset utf8 = Charset.forName("UTF-8");
private Selector selector;
private ServerSocketChannel serverSocket;
public static void main(String[] args) {
new Thread(new Reactor(), "Reactor Thread").start();
}
@Override
public void run() {
try {
selector = Selector.open();//创建选择器
/*-------------------创建server Channel,监听端口-------------**/
serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), Constant.port), 100); //绑定监听端口,设定最大连接缓冲数为100
System.out.printf("Listening at " + InetAddress.getLocalHost());
/*-------------------注册感兴趣的TCP连接事件-------------**/
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
/*-------------------监听事件,处理请求-------------**/
while (true) {
selector.select(); //阻塞事件
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> itor = selected.iterator();
while (itor.hasNext()) {
SelectionKey key = itor.next();
itor.remove(); //清理key, key的remove很重要防止因为处理异常,导致无法remove
try {
dispatch(key);
} catch (Exception ex) {
key.cancel(); //一些异常状况会导致死循环,需要cancel掉监听
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
事件分发
Reactor线程负责接收所有的客户端IO事件,把连接事件分发给handleNewConnection方法处理,把read事件分发给handleRead方法处理,把write事件分发给handleWrite方法处理
private void dispatch(SelectionKey key) throws Exception {
/*-------------------TCP连接事件-------------*/
if (key.isAcceptable()) {
handleNewConnection();
}
/*-------------------读事件-------------------*/
else if (key.isReadable()) {
handleRead(key);
}
/*---------------------写事件------------------*/
else if (key.isWritable()) {
handleWrite(key);
}
else {
System.out.printf("Unknown event");
}
}
处理连接事件
private void handleNewConnection() throws IOException {
SocketChannel socketChannel = serverSocket.accept(); //accept一个SocketChannel,每个SocketChannel对应一个客户端连接,
socketChannel.configureBlocking(false);
//注册socketChannel到selector,感兴趣事件为读事件
socketChannel.register(selector, SelectionKey.OP_READ, new Buffers());
}
处理读事件
private void handleRead(SelectionKey key) throws Exception {
//获取通道
SocketChannel channel = (SocketChannel) key.channel();
//获取通道对应的缓冲区
Buffers buffers = (Buffers) key.attachment();
ByteBuffer readBuff = buffers.getReadBuff();
//从ByteBuffer读取请求信息
String request = decode(readBuff )
String response = handleRequest(request )
//构建响应消息,写入到writebuff中
writeBuff.put(response.getBytes());
//设置socketChannel对写事件感兴趣
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
处理写事件
private void handleWrite(SelectionKey key) throws IOException {
Buffers buffers = (Buffers) key.attachment();
ByteBuffer writeBuff = buffers.getWriteBuff();
writeBuff.flip();
SocketChannel channel = (SocketChannel) key.channel();
while (writeBuff.hasRemaining()) {
int len = channel.write(writeBuff);
//说明底层缓冲区已满
if (0 == len) {
break;
}
}
writeBuff.flip();
writeBuff.clear();
//数据写入完成后,取消对写事件的感兴趣
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
}
Reactor单线程模式基于NIO异步处理IO事件,一个线程可以处理多个连接。但是把IO和业务处理放到一个线程里,由于IO速度远远慢于CPU的速度,在大并发场景下会导致频繁的CPU切换。因此需要把业务处理从Reactor线程中剥离出来,也就是Reactor多线程模式。
单Reactor多线程模式
在单线程模式的基础上,多线程模式把业务逻辑处理从Reactor线程中剥离,Reactor专注于处理IO事件, 业务逻辑由专门的业务线程池处理。
reactor-worker-pool.PNG
代码实现
代码实现时,只要在单线程模式基础上修改handleRead方法。我们引入一个线程池专门用来处理业务逻辑。
- 从channel读取请求
- 把请求提交给专用的线程池进行处理
- 向selector注册关注写事件
ExecutorService pool = Executors.newFixedThreadPool(10);
private void handleRead(SelectionKey key) throws Exception {
try{
String request = readRequest();
//把业务处理请求从IO请求中隔离
pool.submit(()->{
//处理业务逻辑
response = process(request)
//修改为对写事件感兴趣,attack一个Sender
key.attach(new Sender());
key.interestOps(SelectionKey.OP_WRITE);
//wakeup的作用的触发select,如果不显式调用,会卡住不动
selector.wakeup();
});
}catch (Exception ex){
key.cancel(); //一些异常状况会导致死循环,需要cancel掉监听
}
}
Reactor专门处理IO事件,业务处理交给业务线程,这样就可以提高服务的吞吐量。如果用户连接数继续增加,一个Reactor可能处理不过来,这时就需要多Reactor模式了。
网友评论