一、BIO概述
BIO 就是传统的java.io包,它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。(面向字节流/字符流)
图1-1 IO流 同步与异步的差别在于是否需要等待被依赖项目完成,才开始执行任务。
阻塞与非阻塞针对CPU的消耗来说的,阻塞就是 CPU 停下来等待一个慢的操作完成 CPU 才接着完成其它的事。非阻塞就是在这个慢的操作在执行时 CPU 去干其它别的事,等这个慢的操作完成时,CPU 再接着完成后续的操作。
二、NIO概述
NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO程序,同时提供了更接近操作系统底层高性能的数据操作方式。
三、NIO核心组件
NIO的核心组件:1)Channel(负责连接) 2)Buffer(负责存储) 3)Selector。通过Channel,可以从Channel把数据写到Buffer中,也可以把数据从Buffer写入到Channel中。
图3-1 Channel & Buffer关系图选择器允许单线程操作多个通道。使用Selector时,需要将Channel注册到Selector上。
图3-2 选择器3.1 Channel通道
1. 通道是双向的,可读也可写,流一般是单向的。
2. 通道可读也可写。
3. 通道基于Buffer读取。
3.1.1 Channel实现
FileChannel:文件读写
DatagramChannel:UDP数据读写
SocketChannel:TCP数据读写
ServerSocketChannel:监听TCP链接请求,每个请求会创建会一个SocketChannel
3.2 Buffer缓冲区
Buffer是一块内存区,用于读写数据。分为非直接缓冲区(JVM)和直接缓存区(物理内存)。使用该内存区包含以下步骤:
1. 把数据写入Buffer(Channel->Buffer)
2. 调用flip方法(写模式->读模式,调用flip方法会把position归零,并设置limit为之前的position的值。 也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置)
3. 从Buffer中读取数据,get方法
4. 清空Buffer,调用clear(清空整个Buffer)或compact(清空已读取数据)方法
Buffer包含3个属性,capacity(容量,以字节为单位)、position(位置)、limit(限制)。读和写模式下position和limit含义如图3-1所示:
Buffer类型包括:1)ByteBuffer 2)MappedByteBuffer 3)CharBuffer 4)DoubleBuffer 5)FloatBuffer 6)IntBuffer 7)LongBuffer 8)ShortBuffer。
示例代码如下所示:
package com.example.niotest;
//import省略
/**
* @Author luffy
*/
public class NioTest {
public static void main(String[] args) {
try {
RandomAccessFile file = new RandomAccessFile("nio.txt", "rw");
FileChannel channel = file.getChannel();
//allocate()/allocateDirect()
ByteBuffer buffer = ByteBuffer.allocate(48);
//将channel读取到buffer
int flag = channel.read(buffer);
while (flag != -1) {
//切换为读模式
buffer.flip();
while (buffer.hasRemaining()) {
System.out.println((char) buffer.get());
}
//清空buffer
buffer.clear();
flag = channel.read(buffer);
}
file.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.3 分散读与聚合写
Scattering Reads:把数据从单个Channel写入到多个Buffer,必须写满一个buffer后才会向后移动到下一个buffer,不适合消息大小会动态改变的情形。(依次填满)示意图如下所示:
图3-2 分散读ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);
Gathering Writes:把多个Buffer的数据写入到同一个Channel中,按顺序将数组内的内容写进channel,只写入Buffer中position到limit之间的数据,适用于可变大小的消息的情形。示意图如下所示:
图3-3 聚合写ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);
3.4 通道传输接口
FileChannel.transferFrom方法把数据从通道源传输到FileChannel,transferTo方法把FileChannel数据传输到另一个Channel,具体查看API文档。(直接缓冲区)
3.5 Selector选择器
Selector用于检查一个或多个NIO Channel的状态是否处于可读、可写。如此可以实现单线程管理多个Channels,也就是可以管理多个网络链接。
1. Selector创建
Selector selector = Selector.open();
2. 向Selector注册通道(FileChannel不适用)
ServerSocketChannel channel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8080);
channel.bind(address);
//非阻塞
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
/**
SelectionKey.OP_CONNECT 连接就绪
SelectionKey.OP_ACCEPT 接收就绪
SelectionKey.OP_READ 读就绪
SelectionKey.OP_WRITE 写就绪
*/
3. 选择器选择通道
//select()方法会返回读事件已经就绪的那些通道
int select():阻塞到至少有一个通道在你注册的事件上就绪了
int select(long timeout):同select()方法,除了最长会阻塞timeout毫秒
int selectNow():不会阻塞,不管什么通道就绪都立刻返回
Set<SelectionKey> selectedKeys():一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,
然后可以通过调用selector的selectedKeys()方法,访问已选择键集中的就绪通道
SelectionKey.channel方法返回的channel实例需要强转为我们实际使用的具体的channel类型。
由于调用select而被阻塞的线程,可以通过调用Selector.wakeup()来唤醒即便此时已然没有channel处于就绪状态。具体操作是,在另外一个线程调用wakeup(),被阻塞的select方法的线程就会立刻返回。
当操作Selector完毕后,需要调用close方法。close的调用会关闭Selector并使相关的SelectionKey都无效,channel本身不会被关闭。
3.6 FileChannel
FileChannel总是运行在阻塞状态下。无法直接打开一个FileChannel,需要通过使用一个InputStream、OutputStream或RandomAccessFile来获取一个FileChannel实例,关闭FileChannel使用close方法。
RandomAccessFile file = new RandomAccessFile("nio.txt", "rw");
FileChannel channel = file.getChannel();
//关闭
channel.close();
从FileChannel中读取数据,read()方法返回的int值表示了有多少字节被读到了Buffer中。如果返回-1,表示到了文件末尾。
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
向FileChannel写数据,FileChannel.write()是在while循环中调用的。因为无法保证write()方法一次能向FileChannel写入多少字节,因此需要重复调用write()方法,直到Buffer中已经没有尚未写入通道的字节。
String newData = "test data";
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
channel.write(buf);
}
可能需要在FileChannel的某个特定位置进行数据的读/写操作。可以通过调用position()方法获取FileChannel的当前位置。也可以通过调用position(long pos)方法设置FileChannel的当前位置。(如果将位置设置在文件结束符之后,注意此时的读写情况)
long pos = channel.position();
channel.position(pos +123);
FileChannel的size方法:返回该实例所关联文件的大小;FileChannel的truncate方法:截取一个文件,截取文件时,文件将中指定长度后面的部分将被删除;FileChannel的force方法:将通道里尚未写入磁盘的数据强制写到磁盘上,该方法有一个boolean类型的参数,指明是否同时将文件元数据(权限信息等)写到磁盘上。
long fileSize = channel.size();
channel.truncate(1024);
channel.force(true);
3.7 SocketChannel套接字通道
创建一个SocketChannel有两种方式:
1. 开一个SocketChannel并连接网络上的一台服务器。
2. 当ServerSocketChannel接收到一个连接请求时,会创建一个SocketChannel。
建立一个SocketChannel连接,如下所示:
SocketChannel channel = SocketChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1",8080);
channel.connect(address);
//关闭链接
channel.close();
向SocketChannel读写数据同FileChannel,将SocketChannel设置成非阻塞模式,此时connect()、read()、write()方法都是异步的。
channel.configureBlocking(false);
while (!channel.finishConnect()){
}
3.8 ServerSocketChannel服务端套接字通道
打开ServerSocketChannel,如下所示:
ServerSocketChannel channel=ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8080);
channel.socket().bind(address);
//非阻塞
channel.configureBlocking(false);
//关闭
channel.close();
监听链接,调用ServerSocketChannel的accept()方法,返回一个SocketChannel连接实例, accept()方法会一直阻塞到有新连接到达。
while (true) {
SocketChannel socketChannel = channel.accept();
if (socketChannel != null){
}
}
3.9 DatagramChannel通道
DatagramChannel是一个可以发送、接收UDP(面向无连接的网络协议)数据包的通道,无法直接读写数据。
DatagramChannel datagramChannel = DatagramChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1",8080);
datagramChannel.socket().bind(address);
//关闭
datagramChannel.close();
接收数据与发送数据,如下所示:
ByteBuffer buffer = ByteBuffer.allocate(48);
buffer.clear();
datagramChannel.receive(buffer);
datagramChannel.send(buffer,new InetSocketAddress("127.0.0.1",8080));
连接到特定的地址,连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel ,让其只能从特定地址收发数据。
datagramChannel.socket().connect(new InetSocketAddress("127.0.0.1",8080));
datagramChannel.read(buffer);
datagramChannel.write(buffer);
3.10 Pipe管道
Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。
图3-4 Pipe管道创建管道,进行读写如下所示:
Pipe pipe = Pipe.open();
Pipe.SinkChannel sinkChannel = pipe.sink();
String content = "hello world";
ByteBuffer buffer = ByteBuffer.allocate(48);
buffer.clear();
buffer.put(content.getBytes());
buffer.flip();
while (buffer.hasRemaining()){
sinkChannel.write(buffer);
}
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer newBuffer = ByteBuffer.allocate(48);
sourceChannel.read(newBuffer);
3.11 Path
创建Path实例,创建绝对/相对路径等如下所示:
Path path = Paths.get("c:\\data\\myfile.txt");
Path newPath = Paths.get("c:\\data","myfile.txt");
Path dir = Paths.get(".");
dir.toAbsolutePath();
// . 和 .. 的使用
//格式化
path.normalize();
Path搭配Files使用,具体参考API文档。
3.12 其他
异步文件通道和非阻塞式服务器见参考链接。
3.13 NIO Socket通信Demo
Demo包括服务端和客户端两部分,Github完整代码地址如下:
服务端代码如下所示:
//import 省略
/**
* @author luffy
* @date 2019-11-28
* @version 1.0
*/
public class ServerTest {
private static final String CHARSET = "utf-8";
private static final String END_MSG = "bye";
private Logger logger = Logger.getLogger(getClass().getName());
private Selector selector;
/**
* 连接准备
*/
private void connectReady() {
try {
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(9999);
serverSocketChannel.socket().bind(address);
//非阻塞
serverSocketChannel.configureBlocking(false);
//将ServerSocketChannel注册到Selector,将状态置为接收就绪
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 接收就绪
* @param key
*/
private void acceptReady(SelectionKey key) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel tmpChannel = null;
try {
//获取客户端连接
tmpChannel = channel.accept();
} catch (IOException e) {
e.printStackTrace();
}
logger.info("客户端IP:" + tmpChannel.socket().getRemoteSocketAddress() + "连接进来了!");
try {
//非阻塞
tmpChannel.configureBlocking(false);
//将channel注册到selector,切换为读就绪状态
tmpChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 读取客户端消息,切换为写就绪状态
* @param key
*/
private void readMsg(SelectionKey key) {
SocketChannel tmpSocketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int sum = 0;
try {
sum = tmpSocketChannel.read(buffer);
} catch (IOException e) {
e.printStackTrace();
}
if (sum != -1) {
//切换到读模式
buffer.flip();
String msg = null;
try {
msg = new String(buffer.array(), buffer.position(), buffer.limit(), CHARSET);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
//清空buffer
buffer.clear();
logger.info("接收来自客户端的数据:" + new String(buffer.array()));
//切换为写就绪态
key.interestOps(SelectionKey.OP_WRITE);
//如果接受到"bye",客户端断开连接
if (END_MSG.equals(msg)) {
logger.info(tmpSocketChannel.socket().getRemoteSocketAddress() + "断开连接!");
try {
key.channel().close();
key.cancel();
tmpSocketChannel.close();
return;
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 向客户端发送数据,切换为读就绪状态
* @param key
*/
private void writeMsg(SelectionKey key) {
SocketChannel tmpSocketChannel = (SocketChannel) key.channel();
String[] strArray = new String[]{"I do", "I love", "I think"};
Random random = new Random();
//随机数 0-2
int randomNum = random.nextInt(3);
String msg = strArray[randomNum];
byte[] resByte = msg.getBytes();
ByteBuffer buffer = ByteBuffer.wrap(resByte);
try {
tmpSocketChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
//切换为读就绪状态
key.interestOps(SelectionKey.OP_READ);
logger.info("发送给客户端完毕!");
}
/**
* 开启服务端
*/
private void startServer() {
this.connectReady();
while (true) {
int num = 0;
try {
num = selector.select();
} catch (IOException e) {
e.printStackTrace();
}
if (num > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isValid() && key.isAcceptable()) {
this.acceptReady(key);
} else if (key.isValid() && key.isReadable()) {
this.readMsg(key);
} else if (key.isValid() && key.isWritable()) {
this.writeMsg(key);
}
}
} else {
logger.info("没有连接!");
}
}
}
public static void main(String[] args) {
new ServerTest().startServer();
}
}
客户端代码如下所示:
//import省略
/**
* @author luffy
* @date 2019-11-28
* @version 1.0
*/
public class ClientTest {
private Logger logger = Logger.getLogger(this.getClass().getName());
private Selector selector;
private static final String IP_ADDRESS = "127.0.0.1";
private static final int PORT = 9999;
private static final String END_MSG = "bye";
/**
* 连接服务
*/
private void connectServer() {
try {
SocketChannel socketChannel = SocketChannel.open();
selector = Selector.open();
InetSocketAddress address = new InetSocketAddress(IP_ADDRESS, PORT);
//非阻塞
socketChannel.configureBlocking(false);
//指定IP和端口-连接服务器
socketChannel.connect(address);
//将channel注册到selector并设置为连接态
socketChannel.register(selector, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 连接就绪状态,切换为写就绪状态
* @param key
*/
private void connectReady(SelectionKey key) {
try {
SocketChannel channel = (SocketChannel) key.channel();
channel.finishConnect();
key.interestOps(SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 读就绪状态,读取服务端发回的数据,切换为写就绪状态
* @param key
*/
private void readMsg(SelectionKey key) {
ByteBuffer buffer = null;
try {
SocketChannel channel = (SocketChannel) key.channel();
buffer = ByteBuffer.allocate(1024);
//读取服务端数据
channel.read(buffer);
key.interestOps(SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
logger.info("收到服务端数据:" + new String(buffer.array()));
}
/**
* 写就绪状态,向服务端发送数据,切换为读就绪状态
* @param key
*/
private void writeMsg(SelectionKey key) {
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
} catch (Exception e) {
e.printStackTrace();
}
String[] strArray = new String[]{"hello", "world", "bye"};
Random random = new Random();
//随机取数0-2 向服务端发送数据
int randomNum = random.nextInt(3);
String msg = strArray[randomNum];
byte[] array = msg.getBytes();
ByteBuffer buffer = ByteBuffer.wrap(array);
try {
//将数据写入channel
channel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
logger.info("已向服务器发送消息:" + msg);
//切换为读就绪状态
key.interestOps(SelectionKey.OP_READ);
//如果发送"bye",则和服务器断开连接
if (END_MSG.equals(msg)) {
logger.info(channel.socket().getRemoteSocketAddress() + "已和服务器断开连接!");
try {
key.channel().close();
key.cancel();
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 客户端启动
*/
private void clientStart() {
this.connectServer();
while (true) {
int num = 0;
try {
num = selector.select();
} catch (IOException e1) {
e1.printStackTrace();
}
if (num > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isValid() && key.isConnectable()) {
this.connectReady(key);
} else if (key.isReadable()) {
this.readMsg(key);
} else if (key.isValid() && key.isWritable()) {
this.writeMsg(key);
}
}
}
}
}
public static void main(String[] args) {
new ClientTest().clientStart();
}
}
参考链接一:Java NIO简明教程
参考链接二:Java NIO 系列教程
参考链接三:Java NIO教程
参考链接四:Java NIO Tutorial
参考链接五:Java 8 API-中文
参考链接六:Java 8 API-英文
网友评论