1. nio的底层系统的支持
liunx的io模型:(只是大概一个感性的认识,liunx的网络原理不深究。)
堵塞io:数据准备好,并且复制到应用内存,才返回。期间一直堵塞。
非堵塞io:io命令并不堵塞,数据没准备好,则返回一个标识错误,轮询检查数据是否准备好并复制到应用内存。
i/o复用模型:一个select或poll轮询检查所有的socket io是否准备好数据,用一个select处理了所有socket io,liunx的epoll性能更高,一个select检查的socket io没有限制,基于事件驱动,而不是轮询扫描,数据准备好则回调方法。
信号驱动io模型:通过发送信号的方式,socket io发送信号,数据准备好发送信号回调。
异步io:当数据准备并且复制到应用内存之后,内核发送通知给应用可以进行io操作了。
nio使用的就是io多路复用原理
linux针对多路复用采用的select、poll等都存在缺陷,一个select轮询的socket数量有限制,效率低。之后采用epoll,epoll的优点有:轮询的socket数量没有限制,效率高:基于socket的callback回调,所以不会把性能浪费在非活跃socket上。epoll和内核mmap共享同一块内存区域,减少了一次复制过程。
nio的缺点:针对文件系统的处理方法能力有点不足。
2.bio
bio的缺点:
- 线程个数和客户连接数是1对1关系。(可以用线程池解决)
- read和write操作都是堵塞的,数据没准备好,则线程一直堵塞,浪费资源。
3. nio
new io或者no block io。
nio基于的主要的对象是:Buffer、Channel、Selector
Buffer:buffer可以开辟堆内存,堆内存就是普通的java堆,堆外内存则是直接在java堆之外,开辟和回收代价大,但是进行比如socket数据交换的时候少了一次java堆到操作系统的内存复制。
Channel:channel是一个双向操作的通道。
Select:多路复用select,select轮询注册到select上的channel,如果channel有状态变更,用户再去处理这个channel。
关键代码:
服务端:
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);
// select轮询捕捉servChannel的accept状态
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void run() {
while (!stop) {
try {
// select轮询捕捉状态变更,如果没有channel状态发生变更,则一直堵塞,直到传入的1000毫秒超时了,返回捕捉数0。
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
// 把这个selectKey移除,不移除selectedKeys中永远存在这个selectkey
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null)
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// socketchannel也注册到select中去,select捕捉其read状态变更
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// Read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order : "
+ body);
String currentTime = "QUERY TIME ORDER"
.equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString()
: "BAD ORDER";
doWrite(sc, currentTime);
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
private void doWrite(SocketChannel channel, String response)
throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
客户端
public class TimeClientHandle implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null)
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 判断是否连接成功
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
} else
System.exit(1);// 连接失败,进程退出
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
this.stop = true;
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
private void doConnect() throws IOException {
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
private void doWrite(SocketChannel sc) throws IOException {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining())
System.out.println("Send order 2 server succeed.");
}
}
nio总结:代码其实和啰嗦繁琐,而且很容易出问题,比如例子中的nio会有粘包拆包现象,所以nio一般不直接使用。
4.Aio
nio 2引入的概念。真正的异步非堵塞io,基于事件驱动的,而不是nio一样轮询select。
服务端
public class AsyncTimeServerHandler implements Runnable {
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncTimeServerHandler(int port) {
this.port = port;
try {
// 这是nio包下的类
asynchronousServerSocketChannel = AsynchronousServerSocketChannel
.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
// countDownlatch只是为了不让线程结束
latch = new CountDownLatch(1);
doAccept();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doAccept() {
// 传入一个accept的处理类
asynchronousServerSocketChannel.accept(this,
new AcceptCompletionHandler());
}
}
public class AcceptCompletionHandler implements
CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {
// 数据准备好并且复制到应用内存之后,回调这个方法
@Override
public void completed(AsynchronousSocketChannel result,
AsyncTimeServerHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
}
客户端
public class AsyncTimeClientHandler implements
CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;
public AsyncTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer, buffer, this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(
readBuffer,
readBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result,
ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer
.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes,
"UTF-8");
System.out.println("Now is : "
+ body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,
ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
exc.printStackTrace();
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}
那么为什么netty不使用aio而使用nio呢?netty作者说,在unix系统上aio不会更快比nio,netty已经有一个稳定的nio的封装了。
Not faster than NIO (epoll) on unix systems (which is true)
There is no daragram suppport
Unnecessary threading model (too much abstraction without usage)
几种io对比:
网友评论