NIO是什么?
NIO全称NON-BLOCKING I/O,非阻塞I/O。在传统IO中, 一个线程调用read()或者write()的时候,当数据尚未到达时,线程会发生阻塞。而在NIO中,线程会不断轮询注册到selector的通道,检测通道是否准备就绪,如果已经有响应,就把该通道返回过来。
为什么不会阻塞?
不同于传统IO基于流,NIO基于通道和缓冲区。
打个不恰当的比喻,我们可以把程序比作工厂,把数据比作工人;工厂位于用户态中,而工人们在内核态里;工人要去工厂,但是中间隔了一条河(可以理解成用户态和内核之间的隔阂)。我们可以把流(传统IO)或者通道(NIO)理解成连接系统用户态和内核态的桥梁,有了这座桥梁,内核态的工人才能到用户态的工厂上班,但是工人只能达到桥梁上,而不能直接找到工厂,这时就需要一个车带他们到工厂中。在传统IO中,工厂会派车(可以理解成各种inputstream/outputstream)到桥边等工人来,如果没人,那辆车就会一直等啊等(即阻塞)。而在NIO中,我们可以把缓存区看做是这辆车。那什么是selector呢?selector可以看做是一个监控室,每一条桥梁都会注册到监控室某一台的显示器上。线程每隔一段时间会遍历一遍所有的显示器,如果某些桥上已经有工人来了,就会派车去桥上把人载走。(Linux上的显示器更牛逼,如果桥上来人,他还会报警,线程就不用遍历所有显示器了,只需要留意报警的显示器就可以,这就是epoll)。这也是不会阻塞的原因。
怎么使用NIO
-
channel
定义 channel类似于传统IO中的流,不同的是channel可以双向。数据可以从channel写到buffer中,也可以从buffer读到channel中。
实现
- FileChannel(文件的通道)
- DatagramChannel (UDP的通道)
- SocketChannel (从TCP中读写网络数据的通道)
- ServerSocketChannel (监听TCP连接的通道,可以监听所有新进来的TCP连接,像web服务器那样。对每一个新进来的连接创建SocketChannel。)
-
buffer
定义 buffer本质上一块可以从中读取数据并且把数据写到其中的内存。这块内存被封装成NIO对象。
buffer详解
- 当数据从channel写入到buffer中时,buffer处在写模式中(channel.write()/buffer.put())
- 当我们需要从buffer中获取数据时, buffer处于读状态(channel.read()/buffer.get())
- capacity 内存块的固定大小值,buffer不管是读还是写模式,其值一样
- position 当你把数据写到buffer中时,position代表当前位置,从0开始。当切换到写模式的时候,position会被重新置为0。
- limit 写模式下含义跟capacity一样。当buffer从写模式切换到读模式,其limit等于写模式下的position值,就是这个buffer中一共有多少数据。
使用思想
- 把channel的数据写入buffer中
- 调用buffer的flip方法,调用flip方法会把buffer从写模式切换到读模式
- 从buffer中读取数据, 比如 byteBuffer.get()获取一个字节
- 调用clear方法或者compact方法,清空buffer中的数据,clear是清空buffer中的所有数据,compact是清空buffer中所有读过的数,任何未读的数据会被移到buffer的起始位置,新鞋进来的数据会被放到缓冲区的后面。
实现
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
- MappedByteBuffer
基本用法
- 分配
ByteBuffer buffer = ByteBuffer.allocate(1024);
- 读数据
buffer.get() / channel.write(buffer)
- 写数据
buffer.put() / channel.read(buffer)
- buffer反转,从写状态转成读状态,limit=position,position=0
buffer.flip()
- 标记与重置
mark/reset
- 清除
clear/compact
-
selector
定义 可以用于监听多个channel的事件,比如连接打开,数据到达。因此一个线程可以处理多个数据通道。
使用思想 要使用selector,首先要向selector注册channel及其对应事件,然后调用它的select()方法,这个方法会阻塞到channel中有事件就绪,一旦这个方法返回,线程就可以处理这些事件。
使用
- 创建selector
Selector selector = Selector.open();
- 将通道注册到selector中
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);
1. connect SelectionKey.OP_CONNECT 连接就绪
2. accept SelectionKey.OP_ACCEPT 接收就绪
3. read SelectionKey.OP_READ 读事件
4. write SelectionKey.OP_WRITE 写事件
当要监听多个事件的时候,可以用 | 来表示:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
- SelectionKey
通道注册到selector之后,会把channel,selector, interest等封装成一个Selectionkey对象。
public class SelectionKeyImpl extends AbstractSelectionKey {
//通道
final SelChImpl channel;
//选择器
public final SelectorImpl selector;
//其值表示该SelectionKey对象存储在与其关联的Selector对象中所在的位置
private int index;
//监听的事件
private volatile int interestOps;
//已经准备就绪的事件集合
private int readyOps;
...
}
- interest集合 int,就是register中的事件码
isAcceptable() 当有SocketChannel连接到ServerSocketChannel时,ServerSocketChannel会有接收就绪的状态
isConnectable() 当SocketChannel成功连接到ServerSocketChannel时,SocketChannel会是连接就绪的状态
isReadable() 当有数据来到通道时,通道会是可读就绪的状态。
isWritable() 等待写数据的通道可以说是“写就绪”。
- select()
当selector注册了channel以后,就可以调用selector的select方法,来获取已经就的通道key了。
- int select() 阻塞到在你所注册的通道上至少有一个事件就绪
- int select(long timeout) 阻塞到在你所注册的通道上至少有一个事件就绪,如果在timeout事件内没有就返回空
- int selectNow() 不会阻塞,不管通道是否就绪都立即返回
注意:当调用一次select以后,返回一个就绪事件,在这个时间内你没有处理事件,当再次调用select,如果此刻没有事件方法,就会返回0
- selectedKeys()
返回所有准备就绪的通道集合。
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
如果处理完事件以后,需要remove移除掉。selectionKey.channel()转为你需要处理的事件类型,比如SocketChannel。
- wakeUp
wakeUp 当对selector调用select阻塞了,让其它线程再去调用该selector的wakeUp方法即可返回。
close 关闭selector,通道本身不会关闭。
使用NIO做一个聊天室
服务端代码:
public class Server {
private int port;
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private List<SocketChannel> clients = newArrayList();
/**
* 读buffer
*/
private ByteBuffer rBuffer = ByteBuffer.allocate(1024);
/**
* 写buffer
*/
private ByteBuffer wBuffer = ByteBuffer.allocate(1024);
/**
* charset, 方便string与buffer的转换
*/
private Charset charset = Charset.forName("utf-8");
public Server(int port) {
this.port = port;
init();
}
/**
* 初始化
*/
private void init() {
try {
selector = Selector.open();
//初始化serverSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
//注册为非阻塞的
serverSocketChannel.configureBlocking(false);
//将通道注册到选择器上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
private void listen() {
while (true) {
try {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
handle(key);
//移除已经处理过的key
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handle(SelectionKey key) throws IOException {
//无效的selectionKey不处理
if (!key.isValid())
return;
if (key.isAcceptable()) {
//当serverSocketChannel有接收事件
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//接收连接进来的socketChanel
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
//注册到选择器上,读事件
socketChannel.register(selector, SelectionKey.OP_READ);
//添加到客户端列表
clients.add(socketChannel);
System.out.println("accept client success, channel = " + socketChannel);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
int length = socketChannel.read(rBuffer);
if (length > 0) {
//接收并处理信息
rBuffer.flip();
String message = charset.decode(rBuffer).toString();
System.out.println(socketChannel + ":" + message);
rBuffer.clear();
//转到其他socketChannel上去
dispatch(socketChannel, message);
}
}
}
private void dispatch(SocketChannel socketChannel, String message) {
clients.stream()
.filter(channel -> channel != socketChannel)
.forEach(channel -> {
try {
//将数据放到写buffer上
wBuffer.put(charset.encode(message));
//反转
wBuffer.flip();
//写到channel中去
channel.write(wBuffer);
} catch (IOException e) {
e.printStackTrace();
}finally {
//不管有没有写成功,都必须把buffer清空
wBuffer.clear();
}
});
}
public static void main(String[] args) {
Server server = new Server(7789);
server.listen();
}
}
客户端代码:
public class Client {
private int port;
private String host;
private String name;
private Selector selector;
private SocketChannel socketChannel;
private ByteBuffer rBuffer = ByteBuffer.allocate(1024);
private ByteBuffer wBuffer = ByteBuffer.allocate(1024);
private Charset charset = Charset.forName("utf-8");
public Client(String host, int port, String name) {
this.port = port;
this.host = host;
this.name = name;
try {
init();
listen();
} catch (IOException e) {
e.printStackTrace();
}
}
private void init() throws IOException{
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress(host, port));
}
private void listen() throws IOException{
while (true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if(!key.isValid()) continue;
handle(key);
}
}
}
private void handle(SelectionKey key) throws IOException{
if(key.isConnectable()){
SocketChannel channel = (SocketChannel)key.channel();
if(!channel.isConnectionPending()) return;
//channel是非阻塞的,为确保连接成功
while (!channel.finishConnect()){}
System.out.println("连接成功!!");
key.interestOps(SelectionKey.OP_READ);
new Thread(new Runnable() {
@Override
public void run() {
while (true){
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = name + ":" + scanner.next() ;
try {
wBuffer.put(charset.encode(message));
wBuffer.flip();
channel.write(wBuffer);
} catch (IOException e) {
e.printStackTrace();
}finally {
wBuffer.clear();
}
}
}
}
}).start();
}else if(key.isReadable()){
SocketChannel channel = (SocketChannel)key.channel();
try {
int length = channel.read(rBuffer);
if(length > 0){
rBuffer.flip();
System.out.println(charset.decode(rBuffer));
}
rBuffer.clear();
} catch (IOException e) {
e.printStackTrace();
}finally {
rBuffer.clear();
}
}
}
public static void main(String[] args) {
Client client = new Client("localhost", 7789, "小蓝");
}
}
NIO进阶
pipe
定义
两个线程之间的单向数据连接。有一个source管道和一个sink管道,数据写入到sink管道,然后从source管道被读取。
用法
-
创建
Pipe pile = Pipe.open();
-
写入
Pipe.SinkChannel sinkChannel = pipe.sink(); sinkChannel.write(buf);
-
读取
Pipe.SourceChannel sourceChannel = pipe.source(); sourceChannel.read(buff);
网友评论