美文网首页
java IO-3-NIO

java IO-3-NIO

作者: 宠辱不惊的咸鱼 | 来源:发表于2019-10-01 09:10 被阅读0次

概述

  • JDK 1.4出现
  • Buffer
    • ByteBuffer,CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer
    • MappedByteBuffer(内存映射文件)
  • Selector
    • 允许单线程处理多Channel
    • 将Channel注册到Selector上,调用Selector的select()进入阻塞
    • 注册通道有事件就绪时select()返回,线程处理事件
    • 事件例子:新连接建立,收到数据
  • 其他
    • Pipe
    • FileLock

Channel

  • 可从通道中读数据,也可写数据到通道;流的读写通常是单向
  • 可异步读写;流是同步的
  • Channel -> Buffer -> Channel,一定有Buffer的参与
  • 实现类
    • FileChannel:文件读写(无法channel.configureBlocking(false),即无法使用Selector)
    • DatagramChannel:UDP
    • SocketChannel:TCP
    • ServerSocketChannel:监听TCP,对请求连接创建SocketChannel
RandomAccessFile aFile = new RandomAccessFile("resources/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

Buffer

本质

  • 一块可读写内存,被包装成Buffer对象,用于访问该块内存

基本用法

  • 写数据到Buffer(从channel读)
    • channel.read(buffer)
    • buffer.put()
  • 调flip()方法
    • 切换Buffer模式,写 -> 读
  • 从Buffer中读数据(写数据到channel)
    • channel.write(buffer)
    • buffer.get()
  • 调clear()或者compact()
    • clear:清空缓冲区
    • compact:清空已读数据,未读数据被挪到缓冲区起始位置
ByteBuffer buf = ByteBuffer.allocate(48);

public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}

HeapByteBuffer(int cap, int lim) {
    super(-1, 0, lim, cap, new byte[cap], 0);
}

ByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) {
    // 初始书签-1;初始位置0;写限制cap
    super(mark, pos, lim, cap);
    this.hb = hb; // 数据数组
    this.offset = offset; // 初始偏移0
}

put

public ByteBuffer put(byte x) {
    hb[ix(nextPutIndex())] = x;
    return this;
}

// 增加偏移
protected int ix(int i) {
    return i + offset;
}

final int nextPutIndex() {
    if (position >= limit)
        throw new BufferOverflowException();
    return position++;
}
  • position指向下一个可放位置
  • position == limit-1时,可以最后调用一次put
  • limit这个位置不可放数据

flip

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

get

public byte get() {
    return hb[ix(nextGetIndex())];
}

protected int ix(int i) {
    return i + offset;
}

final int nextGetIndex() {
    if (position >= limit)
        throw new BufferUnderflowException();
    return position++;
}
  • 写切换至读时,position变成limit,limit这个位置不可读数据

rewind-重读

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

clear-清空

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

compact-清空已读

public ByteBuffer compact() {
    System.arraycopy(hb, ix(position()), hb, ix(0), remaining());
    position(remaining());
    limit(capacity());
    discardMark();
    return this;
}

public final int remaining() {
    return limit - position;
}

mark & reset

public final Buffer mark() {
    mark = position;
    return this;
}

public final Buffer reset() {
    int m = mark;
    if (m < 0)
        throw new InvalidMarkException();
    position = m;
    return this;
}

equals

public boolean equals(Object ob) {
    if (this == ob)
        return true;
    if (!(ob instanceof ByteBuffer))
        return false;
    ByteBuffer that = (ByteBuffer)ob;
    if (this.remaining() != that.remaining())
        return false;
    int p = this.position();
    for (int i = this.limit() - 1, j = that.limit() - 1; i >= p; i--, j--)
        if (!equals(this.get(i), that.get(j)))
            return false;
    return true;
}
  • 未读数据比较

Scatter & Gather

  • 分散(Scatter)
    • Channel数据 -> 多个Buffer
  • 聚集(Gather)
    • 多个Buffer数据 -> 一个Channel
  • 场景
    • 数据分开处理需求
    • 例如:消息头 + 消息体
// Scattering Reads
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);
  • 一个Buffer写满后,下一个Buffer
  • 无法处理动态消息(消息头大小不固定)
// Gathering Writes
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);
  • 按照Buffer在数组中的顺序写入Channel
  • 可以处理动态消息

Selector

概述

  • 检测一到多个NIO通道,并探知通道是否已为所感兴趣事件做好准备
  • Why use Selector?
    • 减少线程数量,降低操作系统开销
// Selector创建
Selector selector = Selector.open();
channel.configureBlocking(false);(默认是true)
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
  • 与Selector一起使用时,Channel必须处于非阻塞模式下
  • FileChannel不能切换到非阻塞模式,意味着FileChannel和Selector不能一起使用;套接字通道都可以
  • register()的第二个参数:interest集合

监听事件类型

类型 SelectionKey 含义
Connect SelectionKey.OP_CONNECT channel成功连接到服务器
Accept SelectionKey.OP_ACCEPT server channel接收到连接
Read SelectionKey.OP_READ channel有数据可读
Write SelectionKey.OP_WRITE channel等待数据写入

SelectionKey

  • 向Selector注册Channel时,register()方法返回SelectionKey对象
  • interest集合
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) != 0;
boolean isInterestedInConnect = (interestSet & SelectionKey.OP_CONNECT) != 0;
boolean isInterestedInRead    = (interestSet & SelectionKey.OP_READ) != 0;
boolean isInterestedInWrite   = (interestSet & SelectionKey.OP_WRITE) != 0;
  • ready集合
int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
Channel channel = selectionKey.channel(); // 和SelectionKey一对一
Selector selector = selectionKey.selector();
  • 附加对象(可选)
  • 可以将一个对象附着到SelectionKey上,可方便的识别某个给定通道
  • 例如:可以附加与通道一起使用的Buffer,或是包含聚集数据的某个对象
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

Selector选择通道

  • int select():阻塞直到至少有一个通道在感兴趣的事件上就绪
  • int select(long timeout):和select()一样,阻塞timeout(毫秒)
  • int selectNow():不阻塞,不管是否有通道就绪都立刻返回
selector.select();

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel
    } else if (key.isConnectable()) {
        // a connection was established with a remote server
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove(); // 已处理的key需要手动移除
}

wakeUp()

  • 某个线程调用select()方法后阻塞,即使没有通道就绪,也有办法让其返回
  • 其它线程在那个selector上调用wakeUp(),阻塞在select()方法上的线程会马上返回
  • 如果有线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来”。和LockSupport.unpark有点类似

close()

  • 关闭Selector,注册到该Selector上的所有通道生成的SelectionKey无效;通道不会关闭

FileChannel

概述

  • FileChannel无法设置为非阻塞模式,总是运行在阻塞模式下,为什么???
    • 猜测和操作系统的IO实现有关

打开FileChannel

  • 使用前必须先打开,无法直接打开,需通过InputStream、OutputStream或RandomAccessFile获取FileChannel实例
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

读数据

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);

写数据

String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
    channel.write(buf);
}

关闭FileChannel

channel.close();

position方法

long pos = channel.position();
channel.position(pos + 123);
  • 将位置设置在文件结束符之后,试图读取数据,方法返回-1
  • 将位置设置在文件结束符之后,写数据,文件将撑大到当前位置并写入数据,可能导致“文件空洞”
  • 但是:toChannel.transferFrom(fromChannel, position, count);
    • 这里好像不会撑大哦,测试下来,当position超过文件大小时,会直接被忽略,啥也不干

size方法

  • 实例所关联文件大小
long fileSize = channel.size(); // 字节

truncate方法

channel.truncate(1024); // 截取文件前1024个字节,后面部分删除

force方法

  • 将数据强制刷入磁盘
channel.force(true); // true表示同时将文件元数据(权限信息等)写到磁盘上

Channel间数据传输

  • 通道中有一个是FileChannel,那么可以直接将数据从一个Channel传输到另一个Channel
// transferFrom():源 -> FileChannel
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel(); // 可以不是FileChannel
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
toChannel.transferFrom(fromChannel, position, count);
  • 从fromChannel获取最多count字节数据,写入toChannel的position位置
  • 为什么最多count
    • size计算Channel中所有数据,包括已read到Buffer的;但transferFrom时,已被read到Buffer的数据,不会进行transfer
    • 在SoketChannel的实现中,只会传输准备好的数据(可能不足count字节)
// transferTo():FileChannel -> 其他Channel
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel(); // 可以不是FileChannel
long position = 0;
long count = fromChannel.size();
fromChannel.transferTo(position, count, toChannel); // position是fromChannel的

SocketChannel

概述

  • 连接到TCP网络套接字的通道
  • 创建方式
    • 打开一个SocketChannel并连接到互联网上的某台服务器
    • 一个新连接到达ServerSocketChannel时,server会创建一个SocketChannel

打开

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));

关闭

socketChannel.close();

读数据

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf); // -1表示已读到流末尾,或者流已关闭

写数据

String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
   channel.write(buf);
}

非阻塞模式

  • 设置非阻塞模式,可以在异步模式下调用connect(),read(),write()
  • connect()
    • 非阻塞模式下,connect()可能在连接建立之前就返回;为确定连接建立,可以调用finishConnect()
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
while(!socketChannel.finishConnect()){
    //wait, or do something else...
}
  • write()
    • 非阻塞模式下,write()可能在尚未写入任何内容时就返回,需要循环
  • read()
    • 非阻塞模式下,read()可能在尚未读到任何数据时就返回,需要关注返回值

ServerSocketChannel

概述

  • 可以监听TCP连接的通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开
serverSocketChannel.bind(new InetSocketAddress(9999));
while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    //do something with socketChannel...
}

关闭

serverSocketChannel.close();

监听

  • accept()方法会阻塞直到有新连接到达

非阻塞模式

  • 非阻塞模式下,accept()会立刻返回,无连接返回null
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    if(socketChannel != null){
       //do something with socketChannel...
    }
}

DatagramChannel

概述

  • 收发UDP包

打开

DatagramChannel channel = DatagramChannel.open();
channel.bind(new InetSocketAddress(9999));

接收数据

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
channel.receive(buf); // 如果Buffer容不下所有数据,多出的数据被丢弃

发送数据

String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));

连接到特定地址

  • 可以将DatagramChannel“连接”到网络中特定地址
  • UDP是无连接的,“连接”到特定地址并不像TCP通道那样真正创建一个连接
  • 而是锁住DatagramChannel,让其只能从特定地址收发数据
channel.connect(new InetSocketAddress("jenkov.com", 80));
// 连接后,可以使用read()和write()方法,只是数据传送方面没有保证
int bytesRead = channel.read(buf);
int bytesWritten = channel.write(buf);

Pipe

概述

  • 2个线程之间的单向数据连接,Pipe有一个source通道和一个sink通道,数据写到sink通道,从source通道读取

创建管道

Pipe pipe = Pipe.open();

写数据

Pipe.SinkChannel sinkChannel = pipe.sink();
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
    sinkChannel.write(buf);
}

读数据

Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = sourceChannel.read(buf);

相关文章

  • java IO-3-NIO

    概述 JDK 1.4出现 BufferByteBuffer,CharBuffer,ShortBuffer,IntB...

  • Java(JavaEE)学习线路图1

    Java教程 Java 教程Java 简介Java 开发环境配置Java 基础语法Java 对象和类Java 基本...

  • Java学习线路图

    Java教程 Java 教程Java 简介Java 开发环境配置Java 基础语法Java 对象和类Java 基本...

  • 大数据学习线路图

    Java教程 Java 教程Java 简介Java 开发环境配置Java 基础语法Java 对象和类Java 基本...

  • 大数据学习教程

    Java教程 Java 教程Java 简介Java 开发环境配置Java 基础语法Java 对象和类Java 基本...

  • 一篇文章,全面解读Android面试知识点

    Java Java基础 Java集合框架 Java集合——ArrayList Java集合——LinkedList...

  • java学习路线

    javaSE java基础语法 java文件操作 java网络操作 java多线程 java数据库操作 java ...

  • java编程分类

    一、java简介java开发环境配置java基础语法java对象与类java基本数据类型java变量类型java修...

  • Java-01初识Java

    Java的3大版本 Java有三大版本,Java SE, Java ME, Java EE Java SE(Jav...

  • JAVA循环语句(四)9.29

    Java条件 if Java条件if...else Java条件多重if Java条件语句 嵌套if Java条件...

网友评论

      本文标题:java IO-3-NIO

      本文链接:https://www.haomeiwen.com/subject/aelkpctx.html