NIO学习

作者: 黄二的NPE | 来源:发表于2018-08-22 12:57 被阅读6次
  • NIO是什么?

NIO全称NON-BLOCKING I/O,非阻塞I/O。在传统IO中, 一个线程调用read()或者write()的时候,当数据尚未到达时,线程会发生阻塞。而在NIO中,线程会不断轮询注册到selector的通道,检测通道是否准备就绪,如果已经有响应,就把该通道返回过来。

  • 为什么不会阻塞?

不同于传统IO基于流,NIO基于通道和缓冲区。
打个不恰当的比喻,我们可以把程序比作工厂,把数据比作工人;工厂位于用户态中,而工人们在内核态里;工人要去工厂,但是中间隔了一条河(可以理解成用户态和内核之间的隔阂)。我们可以把流(传统IO)或者通道(NIO)理解成连接系统用户态和内核态的桥梁,有了这座桥梁,内核态的工人才能到用户态的工厂上班,但是工人只能达到桥梁上,而不能直接找到工厂,这时就需要一个车带他们到工厂中。在传统IO中,工厂会派车(可以理解成各种inputstream/outputstream)到桥边等工人来,如果没人,那辆车就会一直等啊等(即阻塞)。而在NIO中,我们可以把缓存区看做是这辆车。那什么是selector呢?selector可以看做是一个监控室,每一条桥梁都会注册到监控室某一台的显示器上。线程每隔一段时间会遍历一遍所有的显示器,如果某些桥上已经有工人来了,就会派车去桥上把人载走。(Linux上的显示器更牛逼,如果桥上来人,他还会报警,线程就不用遍历所有显示器了,只需要留意报警的显示器就可以,这就是epoll)。这也是不会阻塞的原因。

  • 怎么使用NIO

  • channel

定义 channel类似于传统IO中的流,不同的是channel可以双向。数据可以从channel写到buffer中,也可以从buffer读到channel中。


实现
  1. FileChannel(文件的通道)
  2. DatagramChannel (UDP的通道)
  3. SocketChannel (从TCP中读写网络数据的通道)
  4. ServerSocketChannel (监听TCP连接的通道,可以监听所有新进来的TCP连接,像web服务器那样。对每一个新进来的连接创建SocketChannel。)
  • buffer

定义 buffer本质上一块可以从中读取数据并且把数据写到其中的内存。这块内存被封装成NIO对象。

buffer详解

  1. 当数据从channel写入到buffer中时,buffer处在写模式中(channel.write()/buffer.put())
  2. 当我们需要从buffer中获取数据时, buffer处于读状态(channel.read()/buffer.get())
  3. capacity 内存块的固定大小值,buffer不管是读还是写模式,其值一样
  4. position 当你把数据写到buffer中时,position代表当前位置,从0开始。当切换到写模式的时候,position会被重新置为0。
  5. limit 写模式下含义跟capacity一样。当buffer从写模式切换到读模式,其limit等于写模式下的position值,就是这个buffer中一共有多少数据。

使用思想

  1. 把channel的数据写入buffer中
  2. 调用buffer的flip方法,调用flip方法会把buffer从写模式切换到读模式
  3. 从buffer中读取数据, 比如 byteBuffer.get()获取一个字节
  4. 调用clear方法或者compact方法,清空buffer中的数据,clear是清空buffer中的所有数据,compact是清空buffer中所有读过的数,任何未读的数据会被移到buffer的起始位置,新鞋进来的数据会被放到缓冲区的后面。

实现

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer
  • MappedByteBuffer

基本用法

  1. 分配
    ByteBuffer buffer = ByteBuffer.allocate(1024);
  2. 读数据
    buffer.get() / channel.write(buffer)
  3. 写数据
    buffer.put() / channel.read(buffer)
  4. buffer反转,从写状态转成读状态,limit=position,position=0
    buffer.flip()
  5. 标记与重置
    mark/reset
  6. 清除
    clear/compact
  • selector

定义 可以用于监听多个channel的事件,比如连接打开,数据到达。因此一个线程可以处理多个数据通道。

使用思想 要使用selector,首先要向selector注册channel及其对应事件,然后调用它的select()方法,这个方法会阻塞到channel中有事件就绪,一旦这个方法返回,线程就可以处理这些事件。

使用

  1. 创建selector
    Selector selector = Selector.open();
  2. 将通道注册到selector中
    SelectionKey key = channel.register(selector, Selectionkey.OP_READ);
1. connect SelectionKey.OP_CONNECT 连接就绪
2. accept SelectionKey.OP_ACCEPT 接收就绪
3. read SelectionKey.OP_READ 读事件
4. write SelectionKey.OP_WRITE 写事件
当要监听多个事件的时候,可以用 | 来表示:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
  1. SelectionKey
    通道注册到selector之后,会把channel,selector, interest等封装成一个Selectionkey对象。
public class SelectionKeyImpl extends AbstractSelectionKey {
    //通道
    final SelChImpl channel;
    //选择器
    public final SelectorImpl selector;
    //其值表示该SelectionKey对象存储在与其关联的Selector对象中所在的位置
    private int index;
    //监听的事件
    private volatile int interestOps;
    //已经准备就绪的事件集合
    private int  readyOps;
    ...
}
  • interest集合 int,就是register中的事件码
isAcceptable() 当有SocketChannel连接到ServerSocketChannel时,ServerSocketChannel会有接收就绪的状态
isConnectable() 当SocketChannel成功连接到ServerSocketChannel时,SocketChannel会是连接就绪的状态
isReadable() 当有数据来到通道时,通道会是可读就绪的状态。
isWritable() 等待写数据的通道可以说是“写就绪”。
  1. select()
    当selector注册了channel以后,就可以调用selector的select方法,来获取已经就的通道key了。
  • int select() 阻塞到在你所注册的通道上至少有一个事件就绪
  • int select(long timeout) 阻塞到在你所注册的通道上至少有一个事件就绪,如果在timeout事件内没有就返回空
  • int selectNow() 不会阻塞,不管通道是否就绪都立即返回

注意:当调用一次select以后,返回一个就绪事件,在这个时间内你没有处理事件,当再次调用select,如果此刻没有事件方法,就会返回0

  1. selectedKeys()
    返回所有准备就绪的通道集合。
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

如果处理完事件以后,需要remove移除掉。selectionKey.channel()转为你需要处理的事件类型,比如SocketChannel。

  1. wakeUp
    wakeUp 当对selector调用select阻塞了,让其它线程再去调用该selector的wakeUp方法即可返回。
    close 关闭selector,通道本身不会关闭。
  • 使用NIO做一个聊天室

服务端代码:

public class Server {

    private int port;

    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    private List<SocketChannel> clients = newArrayList();

    /**
     * 读buffer
     */
    private ByteBuffer rBuffer = ByteBuffer.allocate(1024);

    /**
     * 写buffer
     */
    private ByteBuffer wBuffer = ByteBuffer.allocate(1024);

    /**
     * charset, 方便string与buffer的转换
     */
    private Charset charset = Charset.forName("utf-8");

    public Server(int port) {
        this.port = port;
        init();

    }

    /**
     * 初始化
     */
    private void init() {
        try {
            selector = Selector.open();
            //初始化serverSocketChannel
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            //注册为非阻塞的
            serverSocketChannel.configureBlocking(false);
            //将通道注册到选择器上
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void listen() {
        while (true) {
            try {
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    handle(key);
                    //移除已经处理过的key
                    iterator.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handle(SelectionKey key) throws IOException {
        //无效的selectionKey不处理
        if (!key.isValid())
            return;
        if (key.isAcceptable()) {
            //当serverSocketChannel有接收事件
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
            //接收连接进来的socketChanel
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            //注册到选择器上,读事件
            socketChannel.register(selector, SelectionKey.OP_READ);
            //添加到客户端列表
            clients.add(socketChannel);

            System.out.println("accept client success, channel = " + socketChannel);
        } else if (key.isReadable()) {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            int length = socketChannel.read(rBuffer);
            if (length > 0) {
                //接收并处理信息
                rBuffer.flip();
                String message = charset.decode(rBuffer).toString();
                System.out.println(socketChannel + ":" + message);
                rBuffer.clear();
                //转到其他socketChannel上去
                dispatch(socketChannel, message);
            }
        }
    }

    private void dispatch(SocketChannel socketChannel, String message) {
        clients.stream()
                .filter(channel -> channel != socketChannel)
                .forEach(channel -> {
                    try {
                        //将数据放到写buffer上
                        wBuffer.put(charset.encode(message));
                        //反转
                        wBuffer.flip();
                        //写到channel中去
                        channel.write(wBuffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }finally {
                        //不管有没有写成功,都必须把buffer清空
                        wBuffer.clear();
                    }
                });
    }

    public static void main(String[] args) {
        Server server = new Server(7789);
        server.listen();
    }
}

客户端代码:

public class Client {

    private int port;

    private String host;

    private String name;

    private Selector selector;

    private SocketChannel socketChannel;

    private ByteBuffer rBuffer = ByteBuffer.allocate(1024);

    private ByteBuffer wBuffer = ByteBuffer.allocate(1024);

    private Charset charset = Charset.forName("utf-8");

    public Client(String host, int port, String name) {
        this.port = port;
        this.host = host;
        this.name = name;
        try {
            init();
            listen();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void init() throws IOException{
        selector = Selector.open();
        socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress(host, port));
    }

    private void listen() throws IOException{
        while (true){
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if(!key.isValid()) continue;
                handle(key);
            }
        }
    }

    private void handle(SelectionKey key) throws IOException{
        if(key.isConnectable()){
            SocketChannel channel = (SocketChannel)key.channel();
            if(!channel.isConnectionPending()) return;
            //channel是非阻塞的,为确保连接成功
            while (!channel.finishConnect()){}
            System.out.println("连接成功!!");
            key.interestOps(SelectionKey.OP_READ);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        Scanner scanner = new Scanner(System.in);
                        while (scanner.hasNext()){
                            String message = name + ":" + scanner.next() ;
                            try {
                                wBuffer.put(charset.encode(message));
                                wBuffer.flip();
                                channel.write(wBuffer);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }finally {
                                wBuffer.clear();
                            }

                        }
                    }


                }
            }).start();


        }else if(key.isReadable()){
            SocketChannel channel = (SocketChannel)key.channel();
            try {
                int length = channel.read(rBuffer);
                if(length > 0){
                    rBuffer.flip();
                    System.out.println(charset.decode(rBuffer));
                }
                rBuffer.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                rBuffer.clear();
            }

        }
    }

    public static void main(String[] args) {
        Client client = new Client("localhost", 7789, "小蓝");

    }
}
  • NIO进阶

pipe

定义
两个线程之间的单向数据连接。有一个source管道和一个sink管道,数据写入到sink管道,然后从source管道被读取。

用法

  1. 创建
    Pipe pile = Pipe.open();

  2. 写入
    Pipe.SinkChannel sinkChannel = pipe.sink(); sinkChannel.write(buf);

  3. 读取
    Pipe.SourceChannel sourceChannel = pipe.source(); sourceChannel.read(buff);

相关文章

网友评论

      本文标题:NIO学习

      本文链接:https://www.haomeiwen.com/subject/plxzuftx.html