图解IO模型——BIO,NIO,AIO
服务架构模式——TPC/PPC模式
服务架构模式——单Reactor模式
服务架构模式——多Reactor模式
服务架构模式——Proactor模式
Proactor模式基于AIO,应用程序向内核注册一个Proactor和Handler,将所有I/O操作都交给内核来处理,内核完成数据读写后回调Handler完成业务处理。
Proactor模式
内核和应用程序交互分为“向内核注册”和“请求处理”两个过程,如下图所示。
向内核注册Proactor和Handler
- Proactor initiator创建Proactor
- Proactor initiator创建Handler
- Proactor通过内核提供的Asynchronous Operation processor把Proactor和Handle注册到内核
内核读取数据完成后回调handler
- 当IO操作完成时,内核通知Proactor
- Proactor根据不同的IO事件回调不同的Handler完成业务处理
Proactor代码实现
服务端初始化AsynchronousServerSocketChannel ,向内核注册连接事件由AIOAcceptHandler处理
public class AIOServer implements Runnable{
......
@Override
public void run() {
//打开异步ServerSocketChannel,绑定端口
try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()){
serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), Constant.port));
System.out.println("AIO server started at port " + Constant.port);
//注册Accept事件由AIOAcceptHandler处理
serverSocketChannel.accept(serverSocketChannel, new AIOAcceptHandler());
serverStatus.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
AIOAcceptHandler继承CompletionHandler,当连接事件完成后内核会回调这个类 completed方法,连接失败时回调 failed方法。 在completed方法中,应用程序向内核注册“读事件”的处理类为ReadCompletionHanlder。
public class AIOAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
@Override
public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel attachment) {
//异步处理下一个连接
attachment.accept(attachment, this);
//处理本连接
Buffers buffers = new Buffers();
channel.read(buffers.getReadBuff(), buffers, new ReadCompletionHanlder(channel));
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
exc.printStackTrace();
}
}
ReadCompletionHanlder也继承CompletionHandler,当IO读操作完成时,内核回调这个类
public class ReadCompletionHanlder implements CompletionHandler<Integer, Buffers> {
......
//数据读取完成时,回调此方法
@Override
public void completed(Integer result, Buffers buff) {
//读取请求
ByteBuffer readBuff = buff.getReadBuff();
readBuff.flip();
CharBuffer cBuff = utf8.decode(readBuff);
String request = cBuff.toString();
System.out.println("Received request: " + request);
//构建响应
ByteBuffer writeBuff = buff.getWriteBuff();
writeBuff.put(("Hello " + request).getBytes());
writeBuff.flip();
}
//数据读取失败时,回调此方法
@Override
public void failed(Throwable exc, Buffers attachment) {
exc.printStackTrace();
}
}
网友评论