Java NIO简介
- Java NIO(New I0或NonBlockingIO非阻塞)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO APl。
- NIO与原来的IO有同样的作用和目的,但是使用的方式完全不同,NIO支持
面向缓冲区的
、基于通道
的IO操作。NIO将以更加高效的方式进行文件的读写操作。 - Java NIO和传统IO的主要区别:
IO | NIO |
---|---|
面向流(Stream Oriented) | 面向缓冲区(Buffer Oriented) |
阻塞IO(Blocking IO) | 非阻塞IO(Non Blocking IO) |
(无) | 选择器(Selectors) |
通道和缓冲区
- Java NIO系统的核心在于:
通道
(Channel)和缓冲区
(Buffer)。通道表示打开到IO设备(例如:文件、套接字)的连接。若需要使用NIO系统,需要获取用于连接IO设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。 - 简而言之,
Channel负责传输
,Buffer负责存储
。 - 缓冲区(Buffer):一个用于特定基本数据类型的容器(底层用
数组
实现)。由java.nio包
定义,所有缓冲区类都是Buffer抽象类的子类
,有:ByteBuffer
、CharBuffer
、ShortBuffer
、IntBuffer
、LongBuffer
、FloatBuffer
、DoubleBuffer
。注意:除boolean类型外
! - 上述缓冲区的管理方式几乎一致,都是通过
allocate(int capacity)
方法来获取缓冲区。 - Java NIO中的Buffer主要用于
与NIO通道进行交互
,Channel本身不存储数据。 - 缓冲区存取数据的两个核心方法:
put()
:存入数据到缓冲区中;get()
:从缓冲区中获取数据。 - 缓冲区中的4个核心属性:
capacity
:表示缓冲区中最大存储数据的容量。一旦声明就大小不能改变
。
limit
:表示缓冲区中可以操作数据的范围。
position
:表示缓冲区中正在操作数据的位置。
mark
:表示记录当前position的位置。下次通过reset()
方法可以恢复到mark标记的位置。
其中,大小必须满足 0 mark position limit capacity。
package com.zzw.nio;
import java.nio.ByteBuffer;
import org.junit.Test;
public class TestBuffer {
@Test
public void test1() {
String str = "abcde";
// 1、分配一个指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
System.out.println("-------------------allocate---------------------");
System.out.println(buf.position()); // 0
System.out.println(buf.limit()); // 1024
System.out.println(buf.capacity()); // 1024
// 2、利用put()存入数据到缓冲区中
buf.put(str.getBytes()); // put()操作会使得指针position移动
System.out.println("-------------------put---------------------");
for (int i = 0; i < 2; i++) {
System.out.println(buf.get()); // 注意:get()方法会使得position指针位置移动,这里移动2个单位,但是limit()值不变
}
System.out.println(buf.position()); // 7
System.out.println(buf.limit()); // 1024
System.out.println(buf.capacity()); // 1024
// 3、切换成读取数据的模式,那么position的值就变为0
buf.flip();
System.out.println("-------------------flip---------------------");
System.out.println(buf.position()); // 0
System.out.println(buf.limit()); // 5
System.out.println(buf.capacity()); // 1024
// 4、利用get()读取缓冲区的数据
byte[] dst = new byte[buf.limit()];
buf.get(dst);
System.out.println(new String(dst, 0, dst.length));
System.out.println("-------------------get---------------------");
System.out.println(buf.position()); // 5
System.out.println(buf.limit()); // 5
System.out.println(buf.capacity()); // 1024
// 5、rewind():可重复读数据,重新定位position值为0
buf.rewind();
System.out.println("-------------------rewind---------------------");
System.out.println(buf.position()); // 0
System.out.println(buf.limit()); // 5
System.out.println(buf.capacity()); // 1024
// 6、clear():清空缓冲区,但是缓冲区中的数据依然存在,不够处于“被遗忘”的状态,指针位置回到最初状态值
buf.clear();
System.out.println("-------------------clear---------------------");
System.out.println(buf.position()); // 0
System.out.println(buf.limit()); // 1024
System.out.println(buf.capacity()); // 1024
}
@Test
public void test2() {
String str = "abcde";
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(str.getBytes());
buf.flip(); // 切换成读模式
byte[] dst = new byte[buf.limit()];
buf.get(dst, 0, 2); // 从下标0开始连续读2个字节,即取区间为[0,1]的子字符串
System.out.println(new String(dst, 0, 2));
System.out.println("当前position的值为" + buf.position()); // 2
// mark()标记当前position的值
buf.mark(); // mark = position;
buf.get(dst, 2, 2);// 从下标2开始连续读2个字节,即取区间为[0,1]的子字符串
System.out.println(new String(dst, 2, 2));
System.out.println("当前position的值为" + buf.position()); // 4
// reset():恢复到mark标记的位置
buf.reset(); // position = mark;
System.out.println("恢复后的position的值为" + buf.position()); // 2
// 判断缓冲区是否还有剩余数据可取
if (buf.hasRemaining()) {
// 获取缓冲区中还可以操作的数量
System.out.println(buf.remaining()); // 3
}
}
@Test
public void test3() {
// 分配直接缓冲区
ByteBuffer buf = ByteBuffer.allocateDirect(1024);
// 判断是直接还是非直接缓冲区
System.out.println(buf.isDirect());
}
}
直接缓冲区与非直接缓冲区:
- 字节缓冲区要么是直接的,要么是非直接的。若为直接字节缓冲区,则JVM会尽最大努力直接在此缓冲区上执行本机I/O操作。即在每次调用基础操作系统的一个本机I/O操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。
- 直接字节缓冲区可以通过调用此类的
allocateDirect()
工厂方法来创建。此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区
。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对应用程序的内存需求量造成的影响可能并不明显。所以,建议将直接缓冲区主要分配给那些易受基础操作系统的本机I/O操作影响的大型、持久的缓冲区。一般情况下,最好仅在直接缓冲区能在程序性能方面带来明显好处时分配给它们。 - 直接字节缓冲区还可以通过
Filechannel的map()
方法将文件区域直接映射到内存中来创建。该方法返回MappedByteBuffer
。Java平台的实现有助于通过JNI从本机代码创建直接字节缓冲区。如果以上这些缓冲区中的某个缓冲区实例指的是不可访间的内存区域,则试图访问该区域不会更改该缓冲区的内容,并且将会在访间期间或稍后的某个时间导致抛出不确定的异常。 - 字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其
isDirect()
方法来确定。提供此方法是为了能够在性能关键型代码中执行显式缓冲区管理。
-
非直接缓冲区
:通过allocate()
方法分配缓冲区,将缓冲区建立在JVM的内存中
。 -
直接缓冲区
:通过allocateDirect()
方法分配直接缓冲区,将缓冲区建立在操作系统的物理内存中
,可以提高效率!
- 通道(Channel):由
java.nio.channels包
定义。Channel表示源节点与目标节点的连接。Channel类似于传统的“流”。只不过Channel本身不能直接访问数据,Channel只能与Buffer进行数据交互。
- 通道的主要实现类:
FileChannel
、SocketChannel
、ServerSocketChannel
、DatagramChannel(数据包通道)
。 - 3种方式获取通道:(JDK1.7版本以后)
1、Java 针对支持通道的类提供了
getchannel()
方法,有
本地IO:FileInputStream
/FileOutputStream
、RandomAccessFile
网络IO:Socket
、ServerSocket
、DatagramSocket
。
2、在JDK1.7中的 NIO.2 针对各个通道提供了静态方法open()
。
3、在JDK1.7中的 NIO.2 的Files工具类提供的方法,如:newByteChannel()
等。
分散(Scatter)与聚集(Gather)
-
分散读取
(Scattering Reads)是指从Channel中读取的数据“分散”到多个Buffer中。 - 注意:从Channel中读取的数据按顺序依次将各个Buffer填满。
-
聚集写入
(Gathering Writes)是指将多个Buffer中的数据“聚集”
到Channel中。 - 注意:按照缓冲区的顺序,写入position和limit之间的数据到Channel。
package com.zzw.nio;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.junit.Test;
public class TestChannel {
// 1.利用通道完成文件的复制(非直接缓冲区)
@Test
public void test1() {
long start = System.currentTimeMillis(); // 单位:毫秒
FileInputStream fis = null;
FileOutputStream fos = null;
// ①获取通道
FileChannel inChannel = null;
FileChannel outChannel = null;
try {
fis = new FileInputStream("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\1.jpg");
fos = new FileOutputStream("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\2.jpg");
inChannel = fis.getChannel();
outChannel = fos.getChannel();
// ②分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// Reads a sequence of bytes from this channel into a subsequence of the given
// buffers.
// ③将通道1中的数据存入缓冲区 buf 中
while (inChannel.read(buf) != -1) {
buf.flip(); // 切换成读取数据的模式,因为下面写操作要从缓冲区中读数据
// Writes a sequence of bytes to this channel from a subsequence of the given
// buffers.
// ④将缓冲区中的数据写入通道2中
outChannel.write(buf);
buf.clear(); // 清空缓冲区
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (outChannel != null) {
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inChannel != null) {
try {
inChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
long end = System.currentTimeMillis();
System.out.println("复制文件所耗费的时间为:" + (end - start) + "ms!");
}
// 使用直接缓冲区完成文件的复制(内存映射文件)只支持ByteBuffer缓冲区,其他类型不支持
@Test
public void test2() throws IOException {
long start = System.currentTimeMillis(); // 单位:毫秒
FileChannel inChannel = FileChannel.open(Paths.get("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\1.jpg"),
StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\3.jpg"),
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);// CREATE_NEW表示不存在则创建,存在则报错;而CREATE表示不存在则创建,存在则覆盖
// 内存映射文件,现在的缓冲区在物理内存当中,那么就直接操作缓冲区,不用通过通道来进行数据交互
MappedByteBuffer inMappedByteBuffer = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size()); // 只读通道
MappedByteBuffer outMappedByteBuffer = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size()); // 读写通道
// 直接对缓冲区进行数据的读写操作
byte[] dst = new byte[inMappedByteBuffer.limit()];
inMappedByteBuffer.get(dst);
outMappedByteBuffer.put(dst);
inChannel.close();
outChannel.close();
long end = System.currentTimeMillis();
System.out.println("复制文件所耗费的时间为:" + (end - start) + "ms!");
}
// 通道间的数据传输(直接缓冲区)
@Test
public void test3() throws IOException {
FileChannel inChannel = FileChannel.open(Paths.get("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\1.jpg"),
StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\4.jpg"),
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);// CREATE_NEW表示不存在则创建,存在则报错;而CREATE表示不存在则创建,存在则覆盖
// inChannel.transferTo(0, inChannel.size(), outChannel);
outChannel.transferFrom(inChannel, 0, inChannel.size());
inChannel.close();
outChannel.close();
}
// 分散读取与聚集写入
@Test
public void test4() throws IOException {
RandomAccessFile raf1 = new RandomAccessFile("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\1.txt", "rw");
// 1、获取通道
FileChannel channel1 = raf1.getChannel();
// 2、分配多个指定大小的缓冲区
ByteBuffer buf1 = ByteBuffer.allocate(100);
ByteBuffer buf2 = ByteBuffer.allocate(1024);
// 3、分散读取
ByteBuffer[] bufs = { buf1, buf2 };
channel1.read(bufs);
// 增强for
for (ByteBuffer byteBuffer : bufs) {
byteBuffer.flip(); // 将缓冲区都切换成读模式
}
System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
System.out.println("-----------------------------------------------");
System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()));
// 4、聚集写入
RandomAccessFile raf2 = new RandomAccessFile("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\2.txt", "rw");
FileChannel channel2 = raf2.getChannel();
channel2.write(bufs); // 写入通道,也就是写到文件中
// 5、关闭流和通道
raf1.close();
channel1.close();
raf2.close();
channel2.close();
}
// 获取支持的字符集编码
@Test
public void test5() {
Map<String, Charset> sortedMap = Charset.availableCharsets();
Set<Entry<String, Charset>> entrySet = sortedMap.entrySet();
// 支持的字符集
for (Entry<String, Charset> entry : entrySet) {
System.out.println(entry.getKey() + "=" + entry.getValue());
}
}
// 操作编码和解码
// 编码:其他类型 -> 字节数组
// 解码:字节数组 -> 对应类型
@Test
public void test6() throws CharacterCodingException {
Charset cs1 = Charset.forName("GBK");
// 获取编码器
CharsetEncoder ce = cs1.newEncoder();
// 获取解码器
CharsetDecoder cd = cs1.newDecoder();
CharBuffer cBuf1 = CharBuffer.allocate(1024);
cBuf1.put("好好学习,天天向上!");
cBuf1.flip(); // cBuf1 切换成读模式,因为 解码要读取缓冲区
// 对字符缓冲区进行编码得到字节缓冲区
ByteBuffer bBuf = ce.encode(cBuf1);
for (int i = 0; i < bBuf.limit(); i++) {
// 为了安全起见,上界使用limit值,如果没有这个操作,那么下面直接将字节缓冲区切换成读模式时导致可读取的limit值为0
System.out.println(bBuf.get()); // position指针继续向后移动到最后
}
System.out.println("当前字节缓冲区的position的值为:" + bBuf.position());
System.out.println("当前字节缓冲区可操作范围limit的值为:" + bBuf.limit());
System.out.println("当前字节缓冲区的容量capacity的值为:" + bBuf.capacity());
bBuf.flip(); // 切换成读模式,因为解码要读取缓冲区,也就是将limit=position,并且重新置position=0
// 对字节缓冲区进行解码得到字符缓冲区
CharBuffer cBuf2 = cd.decode(bBuf);
System.out.println(cBuf2.toString());
System.out.println("-------------------------------------------");
Charset cs2 = Charset.forName("UTF-8");
bBuf.rewind(); // 设置为可重复读,即position = 0,也可使用bBuf.flip();
CharBuffer cBuf3 = cs2.decode(bBuf);
System.out.println(cBuf3.toString()); // 出现乱码
}
}
- 使用NIO完成网络通信的三个核心:
1、通道(Channel):负责连接。规范:java.nio.channels.Channel 接口;抽象类:SelectableChannel;子类实现:TCP协议:SocketChannel、ServerSocketChannel;UDP协议:DatagramChannel;管道:Pipe.SinkChannel、Pipe.SourceChannel。
2、缓冲区(Buffer):负责存取数据。
3、选择器(Selector):是SelectableChannel的多路复用器。用于监控SelectableChannel的IO状况。
package com.zzw.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import org.junit.Test;
// 阻塞式网络IO
public class TestBlockingNIO {
// 客户端1
@Test
public void client1() throws IOException {
// 1、获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
FileChannel inChannel = FileChannel.open(Paths.get("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\1.jpg"),
StandardOpenOption.READ);
// 2、分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 3、读取本地文件,并发送到服务端
while (inChannel.read(buf) != -1) {
buf.flip(); // 从缓冲区读取并写入通信通道前先将其切换成读模式
sChannel.write(buf);
buf.clear();
}
// 4、关闭通道
inChannel.close();
sChannel.close();
}
// 服务端1
@Test
public void server1() throws IOException {
// 1、获取服务端通信通道、本地文件写入通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
FileChannel outChannel = FileChannel.open(Paths.get("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\5.jpg"),
StandardOpenOption.WRITE, StandardOpenOption.CREATE);
// 2、绑定连接,指定通信端口
ssChannel.bind(new InetSocketAddress(9898));
// 3、获取客户端连接通道
SocketChannel sChannel = ssChannel.accept();
// 4、分配指定大小的缓冲区,获取客户端的数据,并保存到本地
ByteBuffer buf = ByteBuffer.allocate(1024);
while (sChannel.read(buf) != -1) {
buf.flip();
outChannel.write(buf);
buf.clear();
}
// 5、关闭通道
sChannel.close();
outChannel.close();
ssChannel.close();
}
// 客户端2
@Test
public void client2() throws IOException {
// 1、获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
FileChannel inChannel = FileChannel.open(Paths.get("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\1.jpg"),
StandardOpenOption.READ);
// 2、分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 3、读取本地文件,并发送到服务端
while (inChannel.read(buf) != -1) {
buf.flip(); // 从缓冲区读取并写入通信通道前先将其切换成读模式
sChannel.write(buf);
buf.clear();
}
// 告诉服务端,客户端已经将数据发送完毕,否则一直阻塞在这里
// 相当于给流中加入一个结束标记-1,这样子,服务器的输入流的read()方法就会读到一个-1退出循环写入
// sChannel.shutdownOutput();关闭的是客户端的输出流,同时服务器端的输入流也随之关闭。
// sChannel.shutdownIntput();也是同样的道理,关闭客户端的输入流,同时服务器端的输出流也随之关闭。
sChannel.shutdownOutput();// 当把客户端socket的输出流关闭了,服务端的输出流状态仍然处于开放状态。
// 4、接收服务端的反馈
int len = 0; // 实际读取的字节数
while ((len = sChannel.read(buf)) != -1) {
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}
// 5、关闭通道
inChannel.close();
sChannel.close();
}
// 服务端2
@Test
public void server2() throws IOException {
// 1、获取服务端通信通道、本地文件写入通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
FileChannel outChannel = FileChannel.open(Paths.get("D:\\eclipse_pro\\threadTest\\src\\com\\zzw\\nio\\6.jpg"),
StandardOpenOption.WRITE, StandardOpenOption.CREATE);
// 2、绑定连接,指定通信端口
ssChannel.bind(new InetSocketAddress(9999));
// 3、获取客户端连接通道,accept()方法阻塞等待远程连接
SocketChannel sChannel = ssChannel.accept();
// 4、分配指定大小的缓冲区,获取客户端的数据,并保存到本地
ByteBuffer buf = ByteBuffer.allocate(1024);
while (sChannel.read(buf) != -1) {
buf.flip();
outChannel.write(buf);
buf.clear();
}
// 5、发送反馈给客户端
buf.put("服务端成功接收数据".getBytes());
buf.flip();
sChannel.write(buf);
// 6、关闭通道
sChannel.close();
outChannel.close();
ssChannel.close();
}
}
选择器(Selector)
-
选择器
(Selector)是SelectableChannel对象的多路复用器,Selector可以同时监控多个SelectableChannel的IO状况
,即利用Selector可使一个单独的线程管理多个Channel。Selector是非阻塞IO
的核心。
-
Selectionkey
:表示SelectableChannel和Selector之间的注册关系。每次向选择器注册通道时就会选择一个事件(选择键)。选择键包含两个表示为整数值的操作集。操作集的每一位都表示该键的通道所支持的一类可选择操作。
方法 | 描述 |
---|---|
int interestOps() | 获取感兴趣事件集合 |
int readyOps() | 获取通道已经准备就绪的操作集合 |
SelectableChannel channel() | 获取注册通道 |
Selector selector() | 返回选择器 |
boolean isReadable() | 检测Channal中读事件是否就绪 |
boolean isWritable() | 检测Channal中写事件是否就绪 |
boolean isConnectable() | 检测Channel中连接是否就绪 |
boolean isAcceptable() | 检测Channel中接收是否就绪 |
- 当调用
register(Selector sel, int ops)
将通道注册到选择器时,选择器对通道的监听事件,需要通过第二个参数ops指定。其可以监听的事件类型(可使用SelectionKey的四个常量表示):
读:SelectionKey.OP_READ(1)
写:SelectionKey.OP_WRITE(4)
连接:SelectionKey.OP_CONNECT(8)
接收:SelectionKey.OP_ACCEPT(16)
- 若注册时不止监听一个事件,则可以使用
位或
:|
操作符连接。
package com.zzw.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;
import org.junit.Test;
public class TestNonBlockingNIO {
// 客户端
@Test
public void client() throws IOException {
// 1、获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));
// 2、切换到非阻塞模式
sChannel.configureBlocking(false);
// 3、分配一个指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 4、发送数据给服务端
Scanner scanner = new Scanner(System.in);
String str = null;
while (scanner.hasNext()) {
str = scanner.next();
buf.put((new Date().toString() + "\n" + str).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}
// 5、关闭流和通道
scanner.close();
sChannel.close();
}
// 服务端
@Test
public void server() throws IOException {
// 1、获取服务端通信通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 2、切换到非阻塞模式
ssChannel.configureBlocking(false);
// 3、绑定连接
ssChannel.bind(new InetSocketAddress(8888));
// 4、获取选择器
Selector selector = Selector.open();
// 5、将服务端通信通道注册到选择器上,并且指定“监听接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6、轮询式地获取选择器上已经“准备就绪”的事件
while (selector.select() > 0) {
// 7、获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
// 8、获取准备“就绪”的事件
SelectionKey sk = it.next();
// 9、判断具体式什么事件准备就绪
if (sk.isAcceptable()) {
// 10、若接收就绪,则获取客户端连接
SocketChannel sChannel = ssChannel.accept();
// 11、切换成非阻塞模式
sChannel.configureBlocking(false);
// 12、将客户端通道注册到选择器上
sChannel.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {
// 13、获取当前选择器上“读就绪”状态的通道
SocketChannel sChannel = (SocketChannel) sk.channel();
// 14、读取数据
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0;
while ((len = sChannel.read(buf)) > 0) {
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}
}
// 15、取消选择键SelectionKey
it.remove();
}
}
// 16、关闭选择器和通道
selector.close();
ssChannel.close();
}
}
- Java NIO中的
DatagramChannel
是一个能收发UDP包
的通道。操作步骤:1、打开DatagramChannel;2、发送/接收数据。
package com.zzw.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;
import org.junit.Test;
public class TestNonBlockingNIO2 {
@Test
public void send() throws IOException {
DatagramChannel dc = DatagramChannel.open();
dc.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
String str = null;
while (scanner.hasNext()) {
str = scanner.next();
buf.put((new Date().toString() + "\n" + str).getBytes());
buf.flip();
dc.send(buf, new InetSocketAddress("127.0.0.1", 7777));
buf.clear();
}
// 关闭流和通道
scanner.close();
dc.close();
}
@Test
public void receive() throws IOException {
DatagramChannel dc = DatagramChannel.open();
dc.configureBlocking(false);
dc.bind(new InetSocketAddress(7777));
Selector selector = Selector.open();
dc.register(selector, SelectionKey.OP_READ); // 当前通道来注册一个读事件
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isReadable()) {
ByteBuffer buf = ByteBuffer.allocate(1024);
dc.receive(buf);
buf.flip(); // 切换成读模式
System.out.println(new String(buf.array(), 0, buf.limit()));
buf.clear();
}
}
// 取消选择键SelectionKey
it.remove();
}
// 关闭选择器和通道
selector.close();
dc.close();
}
}
- Java NIO管道是2个线程之间的
单向数据连接
。Pipe有一个source
通道和一个sink
通道。数据会被写到sink通道,从source通道读取。
package com.zzw.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import org.junit.Test;
public class TestPipe {
@Test
public void test1() throws IOException {
// 1、获取管道
Pipe pipe = Pipe.open();
// 2、分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
Pipe.SinkChannel sinkChannel = pipe.sink();
buf.put("通过单向管道发送数据!".getBytes());
buf.flip(); // 将缓冲区切换成读模式
// 3、将缓冲区的数据写入管道
sinkChannel.write(buf);
// 4、读取缓冲区中的数据
Pipe.SourceChannel sourceChannel = pipe.source();
buf.flip();
int len = sourceChannel.read(buf);
System.out.println(new String(buf.array(), 0, len));
// 5、关闭通道
sourceChannel.close();
sinkChannel.close();
}
}
网友评论