Java NIO
Java.nio 全称 Java non-blocking IO [实际上是 new IO],是指JDK 1.4及以上版本里提供的新 API[New IO] ,为所有的原始类型(布尔 Boolean 类型除外)提供缓存支持的数据容器,使用它可以提供非阻塞式的高伸缩性网络。
可能你会问 IO 和 NIO 的区别。 IO 是面向流的,阻塞的。例如我们刚学 Java 的时候那些属于字节流
的 InputStream/OutputStream,例如字符流
的 Reader/Writer。而 NIO 是面向块的、非阻塞的,它们常用类下面将会详细讲。
可是为什么我们需要 Java NIO 呢?其实有几个主要的问题:
- 没有数据缓冲区,I/O 性能存在问题
- 没有 C 或 C++ 的 Channel 概念,只有输入/输出流
- 同步阻塞 I/O 通信通常导致通信线程被长时间阻塞
- 支持的字符集有限,硬件可移植性不好
所以,Java NIO横空出世!其实在这篇文章 IO原理及四种 IO 模型里面有简单的介绍过 NIO I/O 多路复用技术。
<u>I/O多路复用技术通过把多个 I/O 的阻塞复用到同一个Select的阻塞上,从而使得系统在单线程的情况下可以同时通过多个客户端请求</u>。所以,多路复用技术可以较少系统的开销,降低了系统的工作量,页节省了资源。I/O多路复用技术的主要应用场景如下:
- 服务器需要同时处理多个处于监听状态或多个连接状态的套接字
- 服务器需要同时处理多种网络协议的套接字
目前支持 I/O 多路复用的系统调用有 select/pselect/poll/epoll。最常见的有 select 和 epoll。在 Linux 系统中,select 曾经有非常长时间的使用。但是由于自身的缺陷无法有更大的性能。所以 Linux 会在后续的内核版本寻找 epoll 作为替代方案。而 epoll 做出了几方面的改进:
- 支持一个进程的Socket描述符(FD)的数量不受限制。也就是单个 epoll 可监听的文件描述符号,这样子就可以提高了线程处理的任务的数量。我们可以通过命令行去设置数量,但是一般要考虑硬件的内存大小。
- I/O效率不会随着FD的数量增加而下降。
- 使用mmap加速内核与用户空间的消息传递
- epoll的API更加简单。简单的程度包括了创建一个 epoll 描述符,添加监听事件,阻塞等待锁监听的事件发生,关闭 epoll 描述符。
从 BIO 到 NIO,从 select 到 epoll,我们可以看出一步一步的发展使Java得到更高的性能。所以我们可以来看看java的I/O的发展简史:
JDK 1.0 - JDK 1.3
Java 的I/O类库非常原始,很多 UNIX 网络编程中的概念或者接口在I/O类库中都没有体现。
JDK 1.4
新增内容 |
---|
异步I/O的操作的缓冲区 ByteBuffer等 |
异步I/O操作的管道 Pipe |
各种I/O(异步或同步)的Channel,包括 ServerSocketChannel 和 SocketChannel |
多种字符集的编码能力和解码能力 |
实现非阻塞 I/O 操作的多路复用起 selector |
基于流行的 Perl 实现的正则表达式类库 |
文件通道 FileChannel |
不足之处 |
---|
没有文件属性(读例如写权限) |
API 能力比较弱,例如目录的级联创建和和递归遍历,往往需要自己实现 |
底层存储系统的一些高级 API 无法使用 |
所有的文件操作都是同步阻塞调用,不支持异步文件读写操作 |
JDK 1.7
1.7 比较大的更新是升级了库类 - 原来的 NIO 库升级了新版本 NIO 2.0。它主要几方面改进:
新增内容 |
---|
提供批量获取文件属性的 API,这些 API 具有平台无关性,不与特性的文件系统相耦合,另外它还提供了标准文件系统的 SPI,供各个服务商扩展实现 |
提供了AIO,支持基于文件的异步 I/O 操作和针对网络套接字的异步操作 |
完成JSR51定义的通道功能,包括对配置和多播数据报的支持等 |
NIO 组件
初学者对于 NIO 的组件比较难理解的。但是如果你以计算机原理的角度去理解,那就比较容易接受了。NIO 的组件包括三种:
- 缓冲区
Buffer
- 通道
Channel
- 多路复用器
Selector
Buffer 缓冲区
为什么叫缓冲区呢?
其实你可以理解为<u>这是一个传输的介质,或者是一个可以写入数据的内存块</u>。
原本数据就是在内存上,何必搞多一个概念缓冲区的概念出来?其实用户线程与内核数据交互是通过内存来交互,这点本质上是没变的。但是有个问题,直接放在内存上并不好操作。而<u>缓冲区提供了对数据的机构化访问以及维护了读写的位置 (limit) 等信息</u>。在这方面 NIO 起的作用就是提供了丰富的API来可以让你轻松使用内存块。
Buffer 有支持多种 Buffer 类型,如下:
-
ByteBuffer
字节缓冲区 -
CharBuffer
字符缓冲区 -
ShortBuffer
短整型缓冲区 -
IntBuffer
整型缓冲区 -
LongBuffer
长整型缓冲区 -
FloatBuffer
浮点型缓冲区 -
DoubleBuffer
双精度浮点缓冲区
大多数标准的I/O操作都使用 ByteBuffer 所以它除了具有一般缓冲区的操作之外还提供一些特有的操作,方便网络读写。
Channel 通道
还记得Java IO中的 Stream 吗?Java 将可读写文件封装到 Stream,我们会通过 read/write 的方式往 Stream 读取或写入数据。而在 NIO 中,也有类似的概念,那就是Channel。Channel 可以把文件/网络连接等封装成一个一个 Channel。
<u>但是 Channel 在使用的过程中是需要搭配 Buffer </u>。为什么需要 Buffer ?上面说了 Buffer 是抽象了内存的块,而 Channel 是类似于流的(负责传输数据),它们负责工作不相同。当它两搭配起来就好像这样:
当从Channel读取数据到Buffer
|----------| |-----------|
| Channel | ----> | Buffer |
|----------| |-----------|
当讲Buffer写入Channel
|----------| |-----------|
| Buffer | ----> | Channel |
|----------| |-----------|
从上图我们也可以发现,<u>Channel 既可以读取也可以写入,具备双向特性,这比 Stream 方便多了(单向传输)</u>。
同样对 Channel ,在NIO中也提供了多种实现类满足各类需求:
-
FileChannel
负责文件 -
DatagramChannel
UDP读写网络中的数据 -
SocketChannel
TCP读写网络中的数据 -
ServerSocketChannel
负责 TCP 服务端读写网络中的数据
select 多路复用器
前篇文章说的 I/O 多路复用模型的实现基础就是 select。多路复用器的主要作用是能够提供选择已经就绪的任务的能力。而基<u>本原理就是 select 通过某种方式找到已经就绪,可以读写的 Channel,然后让开发者通过 SelectionKey 来获取就绪 Channel 的集合</u>。
<u>实现原理一般来说就是轮询或函数回调</u>。
我们开发中最长接触的两种实现模式就是:select 和 epoll。上面我们说过了,select 是有 fd 最大句柄的限制;而 epoll 已经在这面做了极大的优化。
NIO 原生示范
上面基本上我们已经介绍了 Java NIO 的一些基本面情况。所以接下来我做一个简单的 demo 作为文章的收尾。
Java NIO 的 API 提供的挺齐全的,然而用起来确实挺繁杂的。可能是为了保持足够的原生态和灵活吧。但是如果不是有足够经验的开发,一般我们并不建议你直接去使用原声的 Java NIO API 去开发程序。至于原因,有兴趣的可以去 Google 了解一下。
在写程序之前,我先写一下 demo 中<u>服务端</u>交互的一个流程:
在流程中有三个角色:NioServer
/ ReactorThread
/ IoHandler
-
NioServer
打开 ServerSocketChannel -
NioServer
绑定监听地址 InetSocketAddress -
ReactorThread
创建 Select,启动线程 -
NioServer
创建 select,将 ServerSocketChannel 注册上去 -
ReactorThread
使用 select 轮询就绪的 key -
ReactorThread
使用 handlerAccept() 处理新接入的客户端 -
IoHandler
设置新建客户端连接的 Socket 参数 -
ReactorThread
设置新建客户端连接的 Socket 参数 -
ReactorThread
向 select 注册监听度操作的 SelectionKey.OP_READ - 使用 handlerRead() 异步读请求消息到 ByteBuffer
- decode 请求消息
- 异步写 ByteBuffer 到 SocketChannel
sequenceDiagram
NioServer->>ReactorThread: 1.打开 ServerSocketChannel
ReactorThread-->>NioServer: 已收到消息
ReactorThread-->>IoHandler: 123
下面首先是启动代码
public class TimeServer {
public static void main(String[] args) {
int port = 8080;
MultiplexerTimeServer server = new MultiplexerTimeServer(port);
new Thread(server, "server-01").start();
}
}
然后是服务端的实现内部代码
/* server */
public class MultiplexerTimeServer implements Runnable{
private Selector selector;
private ServerSocketChannel socketChannel;
private volatile boolean stop;
public MultiplexerTimeServer(int port) {
try {
/* 初始化 selector socketChannel 以及各种参数 */
selector = Selector.open();
socketChannel = ServerSocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.socket().bind(new InetSocketAddress(port), 1024);
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server init finsih");
} catch (Exception e) {
}
}
public void stop(){
this.stop = true;
}
@Override
public void run() {
while (!stop) {
try {
selector.select(1000);
//获取已就绪的key
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
//迭代
while (it.hasNext()) {
key = it.next();
//这里需要移除
it.remove();
try {
//交给ioHandler处理
handlerInput(key);
}catch (Exception e ) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
if (selector != null) {
try{
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handlerInput(SelectionKey key) throws IOException {
//查看key是否失效了
if (key.isValid()) {
//如果是负责监听的key
if (key.isAcceptable()) {
//获取serverSocketChannle
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//获取进来的请求
SocketChannel sc = ssc.accept();
//设置参数
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
//如果是可读的key
if (key.isReadable()) {
//获取对应的可读的channel
SocketChannel sc = (SocketChannel) key.channel();
//获取一个Buffer,用于数据装载
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
//如果大于0说明有数据
if (readBytes > 0 ) {
//切换读模式
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "utf-8");
String currentTime = "QUERY CURRENT TIME"
.equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() :
"BAD ORDER";
doWrite(sc, currentTime);
}else if(readBytes < 0){ //小于0说明有问题,关闭channel和channel
key.cancel();
sc.close();
}else {
;
}
}
}
}
//写入channel的操作(记得channel是双向的)
private void doWrite(SocketChannel sc, String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
//切换写模式
writeBuffer.flip();
sc.write(writeBuffer);
}
}
}
接着我们写客户端的代码
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
new Thread(new MultiplexerTimeClient("127.0.0.1", port), "123").start();
}
}
public class MultiplexerTimeClient implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public MultiplexerTimeClient(String host, int port) {
this.host = host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
//尝试连接
doConnnect();
}catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
SelectionKey key = null;
while (!stop) {
try {
//select
selector.select(1000);
//获取已就绪的key
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeySet.iterator();
while (it.hasNext()) {
key = it.next();
//移除
it.remove();
try {
//处理key
handlerInput(key);
}catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
//上面退出循环后,就停止selector
if (selector!=null) {
try {
selector.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
private void doConnnect() throws IOException {
//如果连接成功,就将SocketChannel挂上Selector,并且写入数据;如果失败,就说注册连接成功
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void handlerInput(SelectionKey key) throws IOException {
//判断key是否有效
if (key.isValid()) {
//
SocketChannel sc = (SocketChannel) key.channel();
//如果是连接的key,说明连接成功了
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
}else {
System.exit(1);
}
}
//如果是可读key,说明是数据从服务端回来了
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "utf-8");
System.out.println("do connect finish result " + body);
this.stop = true;
}else if(readBytes < 0) {
//对端关闭链路
key.cancel();
sc.close();
}else {}
}
}
}
private void doWrite(SocketChannel sc) throws IOException {
byte[] req = "QUERY CURRENT TIME".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(req);
//切换写模式
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("send order 2 server successd");
}
}
}
本文结尾
用 Java NIO 写客户端与服务端真的挺麻烦的。而且在上面的 demo 中还没考虑的非常齐全,例如半包读
或半包写
。
但是虽然考虑周全后代码更多,但是不妨碍 NIO 给我们带来的便利性。例如客户端
发起的连接/读写都是异步的;线程模型的优化可以让一个 Selector 线程处理成千上万的客户端连接,而且性能也不会因此而下降。所以值得我们好好理解这个 NIO。
完!
网友评论