nio核心三组件
Buffer
Buffer 是一个特定原始类型的容器。Buffer 是一个原始类型的线性的、有限序列,除了 Buffer 存储的内容外,关键属性还包括:capacity, limit 和 position。
capacity:Buffer 包含的元素的数量,capacity 永远不会为负,也不会改变。
limit:Buffer 中第一个不能读取或写入的元素索引。limit 永远不会为负,且永远小于等于 capacity。
position:下一个待读取、写入的元素索引。position 永远不会为负,且永远小于等于 limit。
每个基本类型(布尔类型除外),都有一个 Buffer 的子类。Java NIO 中 Buffer 的一些实现,其中最重要的是 ByteBuffer
创建
两种方式:
- 堆内存,读写效率相对低下,收到GC的影响
- 直接内存:读写相对快(少一次拷贝),不受GC影响,但是分配空间效率低
public static void main(String[] args) {
System.out.println(ByteBuffer.allocate(16).getClass());
System.out.println(ByteBuffer.allocateDirect(16).getClass());
}
class java.nio.HeapByteBuffer
class java.nio.DirectByteBuffer
写入buffer
- 使用channel
- 直接put
每写入一个元素,position += 1
int readByted = channel.read(buf);
buf.put((byte)126);
buf.put(index, byte b);
buf.put(ByteBuffer)
读取buffer
读取也是用的positon,而position在写入后对应的是下一个写入的位置的索引,
需要调用flip
方法来切换模式, 在读取的时候,读取当前的position上的值。
可以通过channel写入buffer,也可以通过channel读取buffer。
Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
对于
get() // position += 1,
get(Int index) // position = position
// channel读取buffer
int writed = channel.write(buf)
变量mark
, 用于暂时保存position的值, 以便待会能够回来。
Buffer mark() {
mark = position;
return this;
}
Buffer reset() {
int m = mark;
if( m < 0 ) {
throw InvalidMarkException(); // 说明没有调用 mark() 方法
}
position = m;
return this;
}
其他的一些方法
rewind(), clear(), compact()
在读模式下,调用rewind会重置position和mark到初始0和-1. 即重头开始读吧。
而clear,会重置position=0, limit = capacity, mark = -1。这是清空了。而compact会将上一次没读完的移动到前面来
string <=> byte数组
- "string".getBytes();
- StandardCharsets.UTF_8.encode("string");
- 读模式下: StandardCharsets.UTF_8.decode(buf).toString();
分散读取和集中写入
有点是减少数据在buffer中的读写次数
分散读取
将数据从channel读出来,写入到buffer里,需要buffer数组,按照顺序填满buffer
// 分散读取
try(FileChannel channel = new RandomAccessFile("D:\\develop\\idea_projects\\nio_stuff\\nio_selector\\src\\main\\java\\bytebuff_stuff\\TestByteBufferAllocate.java", "r").getChannel()) {
ByteBuffer b1 = ByteBuffer.allocate(16000);
ByteBuffer b2 = ByteBuffer.allocate(16000);
ByteBuffer b3 = ByteBuffer.allocate(16000);
channel.read(new ByteBuffer[]{b1,b2,b3});
b1.flip(); // 必须切换到读模式
b2.flip();
b3.flip();
System.out.println(StandardCharsets.UTF_8.decode(b1));
System.out.println(StandardCharsets.UTF_8.decode(b2));
System.out.println(StandardCharsets.UTF_8.decode(b3));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
集中写入
// 集中写入
ByteBuffer bx = StandardCharsets.UTF_8.encode("enjoy");
ByteBuffer by = StandardCharsets.UTF_8.encode("happy");
ByteBuffer bz = StandardCharsets.UTF_8.encode("幸福");
try(FileChannel channel = new RandomAccessFile("./tmp.txt", "rw").getChannel()) {
channel.write(new ByteBuffer[]{bx, by, bz});
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
粘包与半包的解决
粘包:合并多条消息,然后依次发送
半包: 因为接受的缓冲区大小有限,一条数据可能缓冲区存不下
两个需要解决的问题:1. 接收端拆粘包 2. 接收端合并半包
public static void main(String[] args) {
ByteBuffer bf = ByteBuffer.allocate(32);
bf.put("hello world\ni am xxx\n ho".getBytes(StandardCharsets.UTF_8));
LinkedList<ByteBuffer> ts = new LinkedList<>(split(bf));
Consumer<ByteBuffer> c = byteBuffer -> System.out.println(StandardCharsets.UTF_8.decode(byteBuffer));
bf.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
ts.addAll(split(bf));
ts.forEach(c);
}
private static LinkedList<ByteBuffer> split(ByteBuffer bf) {
// 首先切换成读模式,如果还是写模式的话
bf.flip();
LinkedList<ByteBuffer> ts = new LinkedList<>();
for(int i = 0; i < bf.limit(); ++ i) {
if(bf.get(i) == '\n') {
int length = i - bf.position();
ByteBuffer t = ByteBuffer.allocate(length);
for(int j=0; j < length; j ++) {
t.put(bf.get());
}
t.flip();
ts.add(t);
}
}
bf.compact(); // 丢弃已读
return ts;
}
Channel
是对设备,文件,网络,进程间的连接的一种抽象,channel 有两种模式: 阻塞和非阻塞
- FileChannel
- DatagramChannel : udp链接
- SocketChannel: Tcp链接通道, tcp客户端
- ServerSocketChannel: Tcp对应的服务端, 用来监听端口进来的请求
FileChannel只能在阻塞模式下工作, 网络channel既可以阻塞也可以非阻塞模式。
channel开启用完后必须关闭, 任何新的调用都会抛异常,用isOpen来检测。
FileChannel
channel.position(idx)
如果设置成末尾,那么再读的时候就是-1,如果超过了文件末尾, 再写入的话,中间会出现空洞0000.
使用transfer
的优势是零拷贝,效率高,缺点是最大2g。但是可以通过不断判断transferTo返回的大小,和文件剩余大小,循环transferTo,因为是追加动作。
try(FileChannel from = new FileInputStream("tmp.txt").getChannel();
FileChannel to = new FileOutputStream("out.txt").getChannel();
) {
//
from.transferTo(1, from.size(), to);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
path和files
用Path
和工具类Paths
以及Files
来表示文件路径,和获取Path实例,这样可以解决文件路径名的繁琐,通过相对路径,绝对路径,目录操作,linux和windows的路径操作,支持.和../ ,甚至通过Files
工具类拷贝文件(效率也高,调用的os的接口),walkFileTree,原子move,删除等等。
网络channel
// server
ByteBuffer bf = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
List<SocketChannel> channels = new LinkedList<>();
while(true) {
SocketChannel sc = ssc.accept();
channels.add(sc);
for(SocketChannel c: channels) {
c.read(bf);
bf.flip();
bf.clear();
}
}
// client
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
阻塞方法
ServerSocketChannel.accept 和 SocketChannel.read是阻塞的
非阻塞方法
ServerSocketChannel打开后, 只需要在
ssc.configureBlocking(false);
非阻塞,线程不被被accept方法阻塞,如果没有客户端连接进来,那么accept()方法直接返回null.
SocketChannel同样
sc.configureBlocking(false);
read方法也不会阻塞线程了,如果没有数据,read返回0;
非阻塞模式虽然解决了阻塞的问题,但是没有办法解决空忙的问题,所以我们用Selector。
Selector
Selector管理多个channel, 需要运行在非阻塞模式下(FileChannel不行)。
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ByteBuffer bf = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 必须的
// 建立selector和channel的关系
// 将这个channel上的实践注册到selector上,让他来关注和管理
SelectionKey sscKey = ssc.register(selector, 0, null);
// 设置需要关注的事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select(); // 阻塞的,只有有时间发生,才会被唤醒
// 唤醒后,keys里面包含这次发生的所有事件(因为只关注了OP_ACCEPT,所以其实只有客户端连接进来)
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeysIter = selectionKeys.iterator();
while(selectionKeysIter.hasNext()) {
SelectionKey key = selectionKeysIter.next();
// selectionKeysIter.remove();
System.out.println(key);
ServerSocketChannel sccKey = (ServerSocketChannel) key.channel();
SocketChannel sc = sccKey.accept();
System.out.println("new channel");
System.out.println(sc);
// 取消
// key.cancel();
//还可以继续把 sc 注册到Selector上,关注它的READAble
}
}
}
四种事件:
static final int OP_READ = 1 << 0; //可读
static final int OP_WRITE = 1 << 2; // 可写
static final int OP_CONNECT = 1 << 3; // 客户端连接
static final int OP_ACCEPT = 1 << 4; // 客户端请求连接
在事件发生后,要么处理,要么取消,如果两者都不做的话,就会留在Selector中,使得下次Selector立刻返回上次未处理的事件。尤其要警惕,客户端断开连接(用channel.read的返回值判断是异常断开还是主动断开-1),对的情况,因为没法处理,会抛弃异常,需要finally取消它。
处理粘包与半包
无论采用固定大小,还是变长(判断分割符),或者申明数据长度的方式,都需要每个channel独享各自的ByteBuffer,通过附件的方法,独享的也要和消息大小适配:1. 大小可变 2. 多个buffer组成的数组(分散读,集中写)
SelectionKey scKey = sc.register(selector, 0, ByteBuffer.allocate(16));
ByteBuffer bf = (ByteBuffer)key.attachment();
selector的方法调用的时候是阻塞的,只有在这些时候不阻塞:
- 发生关注的事件的时候
- 客户端的连接请求,accept
- 客户端的发数据,客户端的断开,都会触发读事件,且如果发送的数据大于buffer缓冲区,会多次触发读
- channel可以写,触发write事件
- selector.wakeup()方法
- 调用selector.close()
- selector所在的线程被interrupt
网友评论