概述
- JDK 1.4出现
- Buffer
- ByteBuffer,CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer
- MappedByteBuffer(内存映射文件)
- Selector
- 允许单线程处理多Channel
- 将Channel注册到Selector上,调用Selector的select()进入阻塞
- 注册通道有事件就绪时select()返回,线程处理事件
- 事件例子:新连接建立,收到数据
- 其他
Channel
- 可从通道中读数据,也可写数据到通道;流的读写通常是单向
- 可异步读写;流是同步的
- Channel -> Buffer -> Channel,一定有Buffer的参与
- 实现类
- FileChannel:文件读写(无法channel.configureBlocking(false),即无法使用Selector)
- DatagramChannel:UDP
- SocketChannel:TCP
- ServerSocketChannel:监听TCP,对请求连接创建SocketChannel
RandomAccessFile aFile = new RandomAccessFile("resources/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();
Buffer
本质
- 一块可读写内存,被包装成Buffer对象,用于访问该块内存
基本用法
- 写数据到Buffer(从channel读)
- channel.read(buffer)
- buffer.put()
- 调flip()方法
- 从Buffer中读数据(写数据到channel)
- channel.write(buffer)
- buffer.get()
- 调clear()或者compact()
- clear:清空缓冲区
- compact:清空已读数据,未读数据被挪到缓冲区起始位置
ByteBuffer buf = ByteBuffer.allocate(48);
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
HeapByteBuffer(int cap, int lim) {
super(-1, 0, lim, cap, new byte[cap], 0);
}
ByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) {
// 初始书签-1;初始位置0;写限制cap
super(mark, pos, lim, cap);
this.hb = hb; // 数据数组
this.offset = offset; // 初始偏移0
}
put
public ByteBuffer put(byte x) {
hb[ix(nextPutIndex())] = x;
return this;
}
// 增加偏移
protected int ix(int i) {
return i + offset;
}
final int nextPutIndex() {
if (position >= limit)
throw new BufferOverflowException();
return position++;
}
- position指向下一个可放位置
- position == limit-1时,可以最后调用一次put
- limit这个位置不可放数据
flip
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
get
public byte get() {
return hb[ix(nextGetIndex())];
}
protected int ix(int i) {
return i + offset;
}
final int nextGetIndex() {
if (position >= limit)
throw new BufferUnderflowException();
return position++;
}
- 写切换至读时,position变成limit,limit这个位置不可读数据
rewind-重读
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
clear-清空
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
compact-清空已读
public ByteBuffer compact() {
System.arraycopy(hb, ix(position()), hb, ix(0), remaining());
position(remaining());
limit(capacity());
discardMark();
return this;
}
public final int remaining() {
return limit - position;
}
mark & reset
public final Buffer mark() {
mark = position;
return this;
}
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
equals
public boolean equals(Object ob) {
if (this == ob)
return true;
if (!(ob instanceof ByteBuffer))
return false;
ByteBuffer that = (ByteBuffer)ob;
if (this.remaining() != that.remaining())
return false;
int p = this.position();
for (int i = this.limit() - 1, j = that.limit() - 1; i >= p; i--, j--)
if (!equals(this.get(i), that.get(j)))
return false;
return true;
}
Scatter & Gather
- 分散(Scatter)
- 聚集(Gather)
- 场景
// Scattering Reads
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);
- 一个Buffer写满后,下一个Buffer
- 无法处理动态消息(消息头大小不固定)
// Gathering Writes
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);
- 按照Buffer在数组中的顺序写入Channel
- 可以处理动态消息
Selector
概述
- 检测一到多个NIO通道,并探知通道是否已为所感兴趣事件做好准备
- Why use Selector?
// Selector创建
Selector selector = Selector.open();
channel.configureBlocking(false);(默认是true)
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
- 与Selector一起使用时,Channel必须处于非阻塞模式下
- FileChannel不能切换到非阻塞模式,意味着FileChannel和Selector不能一起使用;套接字通道都可以
- register()的第二个参数:interest集合
监听事件类型
类型 |
SelectionKey |
含义 |
Connect |
SelectionKey.OP_CONNECT |
channel成功连接到服务器 |
Accept |
SelectionKey.OP_ACCEPT |
server channel接收到连接 |
Read |
SelectionKey.OP_READ |
channel有数据可读 |
Write |
SelectionKey.OP_WRITE |
channel等待数据写入 |
SelectionKey
- 向Selector注册Channel时,register()方法返回SelectionKey对象
- interest集合
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) != 0;
boolean isInterestedInConnect = (interestSet & SelectionKey.OP_CONNECT) != 0;
boolean isInterestedInRead = (interestSet & SelectionKey.OP_READ) != 0;
boolean isInterestedInWrite = (interestSet & SelectionKey.OP_WRITE) != 0;
int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
Channel channel = selectionKey.channel(); // 和SelectionKey一对一
Selector selector = selectionKey.selector();
- 附加对象(可选)
- 可以将一个对象附着到SelectionKey上,可方便的识别某个给定通道
- 例如:可以附加与通道一起使用的Buffer,或是包含聚集数据的某个对象
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
Selector选择通道
- int select():阻塞直到至少有一个通道在感兴趣的事件上就绪
- int select(long timeout):和select()一样,阻塞timeout(毫秒)
- int selectNow():不阻塞,不管是否有通道就绪都立刻返回
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel
} else if (key.isConnectable()) {
// a connection was established with a remote server
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove(); // 已处理的key需要手动移除
}
wakeUp()
- 某个线程调用select()方法后阻塞,即使没有通道就绪,也有办法让其返回
- 其它线程在那个selector上调用wakeUp(),阻塞在select()方法上的线程会马上返回
- 如果有线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来”。和LockSupport.unpark有点类似
close()
- 关闭Selector,注册到该Selector上的所有通道生成的SelectionKey无效;通道不会关闭
FileChannel
概述
- FileChannel无法设置为非阻塞模式,总是运行在阻塞模式下,为什么???
打开FileChannel
- 使用前必须先打开,无法直接打开,需通过InputStream、OutputStream或RandomAccessFile获取FileChannel实例
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();
读数据
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
写数据
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
channel.write(buf);
}
关闭FileChannel
channel.close();
position方法
long pos = channel.position();
channel.position(pos + 123);
- 将位置设置在文件结束符之后,试图读取数据,方法返回-1
- 将位置设置在文件结束符之后,写数据,文件将撑大到当前位置并写入数据,可能导致“文件空洞”
- 但是:toChannel.transferFrom(fromChannel, position, count);
- 这里好像不会撑大哦,测试下来,当position超过文件大小时,会直接被忽略,啥也不干
size方法
long fileSize = channel.size(); // 字节
truncate方法
channel.truncate(1024); // 截取文件前1024个字节,后面部分删除
force方法
channel.force(true); // true表示同时将文件元数据(权限信息等)写到磁盘上
Channel间数据传输
- 通道中有一个是FileChannel,那么可以直接将数据从一个Channel传输到另一个Channel
// transferFrom():源 -> FileChannel
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel(); // 可以不是FileChannel
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
toChannel.transferFrom(fromChannel, position, count);
- 从fromChannel获取最多count字节数据,写入toChannel的position位置
- 为什么最多count
- size计算Channel中所有数据,包括已read到Buffer的;但transferFrom时,已被read到Buffer的数据,不会进行transfer
- 在SoketChannel的实现中,只会传输准备好的数据(可能不足count字节)
// transferTo():FileChannel -> 其他Channel
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel(); // 可以不是FileChannel
long position = 0;
long count = fromChannel.size();
fromChannel.transferTo(position, count, toChannel); // position是fromChannel的
SocketChannel
概述
- 连接到TCP网络套接字的通道
- 创建方式
- 打开一个SocketChannel并连接到互联网上的某台服务器
- 一个新连接到达ServerSocketChannel时,server会创建一个SocketChannel
打开
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
关闭
socketChannel.close();
读数据
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf); // -1表示已读到流末尾,或者流已关闭
写数据
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
channel.write(buf);
}
非阻塞模式
- 设置非阻塞模式,可以在异步模式下调用connect(),read(),write()
- connect()
- 非阻塞模式下,connect()可能在连接建立之前就返回;为确定连接建立,可以调用finishConnect()
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
while(!socketChannel.finishConnect()){
//wait, or do something else...
}
- write()
- 非阻塞模式下,write()可能在尚未写入任何内容时就返回,需要循环
- read()
- 非阻塞模式下,read()可能在尚未读到任何数据时就返回,需要关注返回值
ServerSocketChannel
概述
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开
serverSocketChannel.bind(new InetSocketAddress(9999));
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
//do something with socketChannel...
}
关闭
serverSocketChannel.close();
监听
非阻塞模式
- 非阻塞模式下,accept()会立刻返回,无连接返回null
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
if(socketChannel != null){
//do something with socketChannel...
}
}
DatagramChannel
概述
打开
DatagramChannel channel = DatagramChannel.open();
channel.bind(new InetSocketAddress(9999));
接收数据
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
channel.receive(buf); // 如果Buffer容不下所有数据,多出的数据被丢弃
发送数据
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));
连接到特定地址
- 可以将DatagramChannel“连接”到网络中特定地址
- UDP是无连接的,“连接”到特定地址并不像TCP通道那样真正创建一个连接
- 而是锁住DatagramChannel,让其只能从特定地址收发数据
channel.connect(new InetSocketAddress("jenkov.com", 80));
// 连接后,可以使用read()和write()方法,只是数据传送方面没有保证
int bytesRead = channel.read(buf);
int bytesWritten = channel.write(buf);
Pipe
概述
- 2个线程之间的单向数据连接,Pipe有一个source通道和一个sink通道,数据写到sink通道,从source通道读取
创建管道
Pipe pipe = Pipe.open();
写数据
Pipe.SinkChannel sinkChannel = pipe.sink();
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
sinkChannel.write(buf);
}
读数据
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = sourceChannel.read(buf);
网友评论