概述
NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector。
传统IO基于字节流和字符流
面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区进行操作;
NIO基于Channel和Buffer(缓冲区)
进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但同时也需要编程人员注意及时检查和处理缓冲区中的数据,否则可能导致被清除或者被覆盖的问题。
Selector(选择区)
用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。
IO的各种流是阻塞
的。这意味着,当一个线程调用read()
或 write()
时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
NIO的非阻塞
模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
Channel
首先说一下Channel,国内大多翻译成“通道”。Channel和IO中的Stream(流)是差不多一个等级的。只不过Stream是单向的,譬如:InputStream, OutputStream.而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。
NIO中的Channel的主要实现有:
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
Buffer
NIO中的关键Buffer实现有:ByteBuffer
, CharBuffer
, DoubleBuffer
, FloatBuffer
, IntBuffer
, LongBuffer
, ShortBuffer
,分别对应基本数据类型: byte
, char
,double
, float
, int
, long
, short
。当然NIO中还有MappedByteBuffer
, HeapByteBuffer
, DirectByteBuffer
等这里先不进行陈述。
Selector
Selector
运行单线程处理多个Channel
,如果你的应用打开了多个通道,但每个连接的流量都很低,使用Selector就会很方便。例如在一个聊天服务器中。要使用Selector, 得向Selector注册Channel,然后调用它的select()
方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新的连接进来、数据接收等。
FileChannel
BIO vs NIO
bio读取文件
private String read(File file) throws IOException {
StringBuilder sb = new StringBuilder();
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
String buf;
while ((buf = br.readLine()) != null) {
sb.append(buf);
}
}
return sb.toString();
}
nio读取文件
private String nioRead(File file) throws IOException {
StringBuilder sb = new StringBuilder();
try (FileChannel channel = new FileInputStream(file).getChannel()) {
// 分配大小为1024的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readLen = 0;
while (-1 != (readLen = channel.read(buffer))) {
// 将缓冲区指针重置到起点
buffer.flip();
byte[] bytes = buffer.array();
sb.append(new String(bytes, 0, readLen));
buffer.clear();
}
}
return sb.toString();
}
Buffer
buffer
可以理解为在channel
中传输数据的载体,nio中所有的读写都必须使用buffer后面会专门有一期出关于buffer的内容。
Selector
Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。
image.png使用selector的流程:
- 创建一个
ServerSocketChannel
并配置阻塞模式
为非阻塞,配置需要绑定的端口信息。
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
- 创建一个selector,将serverSocketChannel注册给selector
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey
为通过Selector监听Channel时对什么事件感兴趣,有以下4种类型
类型 | 说明 |
---|---|
Connect | 链接就绪 |
Accept | 接收就绪 |
Read | 读就绪 |
Write | 写就绪 |
如果一个selector想监听通道的多个事件,可以使用|
进行链接,如:
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT | SelectionKey.OP_READ);
一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。
下面是select()方法:
- int select()
- int select(long timeout)
- int selectNow()
select()
阻塞到至少有一个通道在你注册的事件上就绪了。
select(long timeout)
和select()
一样,除了最长会阻塞timeout毫秒(参数)。
selectNow()
不会阻塞,不管什么通道就绪都立刻返回(译者注:此方法执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。)。
select()
方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()
方法后有多少通道变成就绪状态。如果调用select()
方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()
方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。
一旦调用了select()
方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector
的selectedKeys()
方法,访问“已选择键集(selected key set)”中的就绪通道。如下所示:
Set selectedKeys = selector.selectedKeys();
注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
SelectionKey.channel()
方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。
SelectionKey
这个对象包含了一些你感兴趣的属性:
- interest集合
- ready集合
- Channel
- Selector
- 附加的对象(可选)
-
interest集合:就像向Selector注册通道一节中所描述的,interest集合是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合。
-
ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。可以这样访问ready集合:
int readySet = selectionKey.readyOps();
可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
- 从SelectionKey获取Channel和Selector。如下:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
- 可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
也可以在用register()方法向Selector注册Channel的时候附加对象。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
Talk is Cheap show me the code
知道了上面的基础,就可以进行简单的编程了。
一个简单的示例,客户端不断发送hello,server,服务端接收到消息后,回复hello,client
- 服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
public class NioServer {
@SuppressWarnings("all")
public static void main(String[] args) throws Exception {
Selector serverSelector = Selector.open();
Selector clientSelector = Selector.open();
Thread bossThread = new Thread(() -> {
try {
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
listenerChannel.socket().bind(new InetSocketAddress(8000));
listenerChannel.configureBlocking(false);
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
while (true) {
if (serverSelector.select(1) > 0) {
Set<SelectionKey> selectionKeys = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
try {
SocketChannel clientChannel = ((ServerSocketChannel) (key.channel())).accept();
clientChannel.configureBlocking(false);
clientChannel.register(clientSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
} finally {
keyIterator.remove();
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
});
bossThread.start();
Thread workerThread = new Thread(() -> {
try {
while (true) {
if (clientSelector.select(1) > 0) {
Set<SelectionKey> selectionKeys = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
try {
SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
if (selectionKey.isReadable()) {
clientChannel.read(byteBuffer);
byteBuffer.flip();
String msg = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
System.out.println(msg);
selectionKey.interestOps(SelectionKey.OP_WRITE);
selectionKey.attach("hello,client");
} else if (selectionKey.isWritable()) {
Object attachment = selectionKey.attachment();
if (Objects.nonNull(attachment)) {
byteBuffer.put(((String) attachment).getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
clientChannel.write(byteBuffer);
}
selectionKey.interestOps(SelectionKey.OP_READ);
}
} finally {
byteBuffer.clear();
keyIterator.remove();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
});
workerThread.start();
}
}
- 客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
public class NioClient {
@SuppressWarnings("all")
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
InetSocketAddress socketAddress = new InetSocketAddress("localhost", 8000);
socketChannel.configureBlocking(false);
socketChannel.connect(socketAddress);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
while (true) {
if (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
try {
SelectionKey selectionKey = iterator.next();
process(selector, selectionKey);
} finally {
iterator.remove();
}
}
}
}
}
private static void process(Selector selector, SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if (selectionKey.isConnectable()) {
while (!socketChannel.finishConnect()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
selectionKey.attach("hello,server");
} else if (selectionKey.isReadable()) {
socketChannel.read(buffer);
buffer.flip();
String msg = Charset.defaultCharset().newDecoder().decode(buffer).toString();
buffer.clear();
System.out.println(msg);
socketChannel.register(selector, SelectionKey.OP_WRITE);
selectionKey.attach("hello,server");
} else if (selectionKey.isWritable()) {
Object attachment = selectionKey.attachment();
if (Objects.nonNull(attachment)) {
buffer.put(((String) attachment).getBytes(StandardCharsets.UTF_8));
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
}
上面的代码只是一个简单的Demo,里面还有很多细节问题没有处理,比如:断开连接处理,消息的拆包封包等。而且,直观的看,nio的编程模型比bio要复杂一些,如果细节处理不到位的话,很容易导致我们的应用程序出现问题。
网友评论