NIO学习

作者: 黄二的NPE | 来源:发表于2018-08-22 12:57 被阅读6次
    • NIO是什么?

    NIO全称NON-BLOCKING I/O,非阻塞I/O。在传统IO中, 一个线程调用read()或者write()的时候,当数据尚未到达时,线程会发生阻塞。而在NIO中,线程会不断轮询注册到selector的通道,检测通道是否准备就绪,如果已经有响应,就把该通道返回过来。

    • 为什么不会阻塞?

    不同于传统IO基于流,NIO基于通道和缓冲区。
    打个不恰当的比喻,我们可以把程序比作工厂,把数据比作工人;工厂位于用户态中,而工人们在内核态里;工人要去工厂,但是中间隔了一条河(可以理解成用户态和内核之间的隔阂)。我们可以把流(传统IO)或者通道(NIO)理解成连接系统用户态和内核态的桥梁,有了这座桥梁,内核态的工人才能到用户态的工厂上班,但是工人只能达到桥梁上,而不能直接找到工厂,这时就需要一个车带他们到工厂中。在传统IO中,工厂会派车(可以理解成各种inputstream/outputstream)到桥边等工人来,如果没人,那辆车就会一直等啊等(即阻塞)。而在NIO中,我们可以把缓存区看做是这辆车。那什么是selector呢?selector可以看做是一个监控室,每一条桥梁都会注册到监控室某一台的显示器上。线程每隔一段时间会遍历一遍所有的显示器,如果某些桥上已经有工人来了,就会派车去桥上把人载走。(Linux上的显示器更牛逼,如果桥上来人,他还会报警,线程就不用遍历所有显示器了,只需要留意报警的显示器就可以,这就是epoll)。这也是不会阻塞的原因。

    • 怎么使用NIO

    • channel

    定义 channel类似于传统IO中的流,不同的是channel可以双向。数据可以从channel写到buffer中,也可以从buffer读到channel中。


    实现
    1. FileChannel(文件的通道)
    2. DatagramChannel (UDP的通道)
    3. SocketChannel (从TCP中读写网络数据的通道)
    4. ServerSocketChannel (监听TCP连接的通道,可以监听所有新进来的TCP连接,像web服务器那样。对每一个新进来的连接创建SocketChannel。)
    • buffer

    定义 buffer本质上一块可以从中读取数据并且把数据写到其中的内存。这块内存被封装成NIO对象。

    buffer详解

    1. 当数据从channel写入到buffer中时,buffer处在写模式中(channel.write()/buffer.put())
    2. 当我们需要从buffer中获取数据时, buffer处于读状态(channel.read()/buffer.get())
    3. capacity 内存块的固定大小值,buffer不管是读还是写模式,其值一样
    4. position 当你把数据写到buffer中时,position代表当前位置,从0开始。当切换到写模式的时候,position会被重新置为0。
    5. limit 写模式下含义跟capacity一样。当buffer从写模式切换到读模式,其limit等于写模式下的position值,就是这个buffer中一共有多少数据。

    使用思想

    1. 把channel的数据写入buffer中
    2. 调用buffer的flip方法,调用flip方法会把buffer从写模式切换到读模式
    3. 从buffer中读取数据, 比如 byteBuffer.get()获取一个字节
    4. 调用clear方法或者compact方法,清空buffer中的数据,clear是清空buffer中的所有数据,compact是清空buffer中所有读过的数,任何未读的数据会被移到buffer的起始位置,新鞋进来的数据会被放到缓冲区的后面。

    实现

    • ByteBuffer
    • CharBuffer
    • DoubleBuffer
    • FloatBuffer
    • IntBuffer
    • LongBuffer
    • ShortBuffer
    • MappedByteBuffer

    基本用法

    1. 分配
      ByteBuffer buffer = ByteBuffer.allocate(1024);
    2. 读数据
      buffer.get() / channel.write(buffer)
    3. 写数据
      buffer.put() / channel.read(buffer)
    4. buffer反转,从写状态转成读状态,limit=position,position=0
      buffer.flip()
    5. 标记与重置
      mark/reset
    6. 清除
      clear/compact
    • selector

    定义 可以用于监听多个channel的事件,比如连接打开,数据到达。因此一个线程可以处理多个数据通道。

    使用思想 要使用selector,首先要向selector注册channel及其对应事件,然后调用它的select()方法,这个方法会阻塞到channel中有事件就绪,一旦这个方法返回,线程就可以处理这些事件。

    使用

    1. 创建selector
      Selector selector = Selector.open();
    2. 将通道注册到selector中
      SelectionKey key = channel.register(selector, Selectionkey.OP_READ);
    1. connect SelectionKey.OP_CONNECT 连接就绪
    2. accept SelectionKey.OP_ACCEPT 接收就绪
    3. read SelectionKey.OP_READ 读事件
    4. write SelectionKey.OP_WRITE 写事件
    当要监听多个事件的时候,可以用 | 来表示:
    int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
    
    1. SelectionKey
      通道注册到selector之后,会把channel,selector, interest等封装成一个Selectionkey对象。
    public class SelectionKeyImpl extends AbstractSelectionKey {
        //通道
        final SelChImpl channel;
        //选择器
        public final SelectorImpl selector;
        //其值表示该SelectionKey对象存储在与其关联的Selector对象中所在的位置
        private int index;
        //监听的事件
        private volatile int interestOps;
        //已经准备就绪的事件集合
        private int  readyOps;
        ...
    }
    
    • interest集合 int,就是register中的事件码
    isAcceptable() 当有SocketChannel连接到ServerSocketChannel时,ServerSocketChannel会有接收就绪的状态
    isConnectable() 当SocketChannel成功连接到ServerSocketChannel时,SocketChannel会是连接就绪的状态
    isReadable() 当有数据来到通道时,通道会是可读就绪的状态。
    isWritable() 等待写数据的通道可以说是“写就绪”。
    
    1. select()
      当selector注册了channel以后,就可以调用selector的select方法,来获取已经就的通道key了。
    • int select() 阻塞到在你所注册的通道上至少有一个事件就绪
    • int select(long timeout) 阻塞到在你所注册的通道上至少有一个事件就绪,如果在timeout事件内没有就返回空
    • int selectNow() 不会阻塞,不管通道是否就绪都立即返回

    注意:当调用一次select以后,返回一个就绪事件,在这个时间内你没有处理事件,当再次调用select,如果此刻没有事件方法,就会返回0

    1. selectedKeys()
      返回所有准备就绪的通道集合。
    Set selectedKeys = selector.selectedKeys();
    Iterator keyIterator = selectedKeys.iterator();
    while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if(key.isAcceptable()) {
            // a connection was accepted by a ServerSocketChannel.
        } else if (key.isConnectable()) {
            // a connection was established with a remote server.
        } else if (key.isReadable()) {
            // a channel is ready for reading
        } else if (key.isWritable()) {
            // a channel is ready for writing
        }
        keyIterator.remove();
    }
    

    如果处理完事件以后,需要remove移除掉。selectionKey.channel()转为你需要处理的事件类型,比如SocketChannel。

    1. wakeUp
      wakeUp 当对selector调用select阻塞了,让其它线程再去调用该selector的wakeUp方法即可返回。
      close 关闭selector,通道本身不会关闭。
    • 使用NIO做一个聊天室

    服务端代码:

    public class Server {
    
        private int port;
    
        private Selector selector;
    
        private ServerSocketChannel serverSocketChannel;
    
        private List<SocketChannel> clients = newArrayList();
    
        /**
         * 读buffer
         */
        private ByteBuffer rBuffer = ByteBuffer.allocate(1024);
    
        /**
         * 写buffer
         */
        private ByteBuffer wBuffer = ByteBuffer.allocate(1024);
    
        /**
         * charset, 方便string与buffer的转换
         */
        private Charset charset = Charset.forName("utf-8");
    
        public Server(int port) {
            this.port = port;
            init();
    
        }
    
        /**
         * 初始化
         */
        private void init() {
            try {
                selector = Selector.open();
                //初始化serverSocketChannel
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.socket().bind(new InetSocketAddress(port));
                //注册为非阻塞的
                serverSocketChannel.configureBlocking(false);
                //将通道注册到选择器上
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private void listen() {
            while (true) {
                try {
                    selector.select();
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        handle(key);
                        //移除已经处理过的key
                        iterator.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void handle(SelectionKey key) throws IOException {
            //无效的selectionKey不处理
            if (!key.isValid())
                return;
            if (key.isAcceptable()) {
                //当serverSocketChannel有接收事件
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                //接收连接进来的socketChanel
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                //注册到选择器上,读事件
                socketChannel.register(selector, SelectionKey.OP_READ);
                //添加到客户端列表
                clients.add(socketChannel);
    
                System.out.println("accept client success, channel = " + socketChannel);
            } else if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                int length = socketChannel.read(rBuffer);
                if (length > 0) {
                    //接收并处理信息
                    rBuffer.flip();
                    String message = charset.decode(rBuffer).toString();
                    System.out.println(socketChannel + ":" + message);
                    rBuffer.clear();
                    //转到其他socketChannel上去
                    dispatch(socketChannel, message);
                }
            }
        }
    
        private void dispatch(SocketChannel socketChannel, String message) {
            clients.stream()
                    .filter(channel -> channel != socketChannel)
                    .forEach(channel -> {
                        try {
                            //将数据放到写buffer上
                            wBuffer.put(charset.encode(message));
                            //反转
                            wBuffer.flip();
                            //写到channel中去
                            channel.write(wBuffer);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }finally {
                            //不管有没有写成功,都必须把buffer清空
                            wBuffer.clear();
                        }
                    });
        }
    
        public static void main(String[] args) {
            Server server = new Server(7789);
            server.listen();
        }
    }
    

    客户端代码:

    public class Client {
    
        private int port;
    
        private String host;
    
        private String name;
    
        private Selector selector;
    
        private SocketChannel socketChannel;
    
        private ByteBuffer rBuffer = ByteBuffer.allocate(1024);
    
        private ByteBuffer wBuffer = ByteBuffer.allocate(1024);
    
        private Charset charset = Charset.forName("utf-8");
    
        public Client(String host, int port, String name) {
            this.port = port;
            this.host = host;
            this.name = name;
            try {
                init();
                listen();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private void init() throws IOException{
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            socketChannel.connect(new InetSocketAddress(host, port));
        }
    
        private void listen() throws IOException{
            while (true){
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    if(!key.isValid()) continue;
                    handle(key);
                }
            }
        }
    
        private void handle(SelectionKey key) throws IOException{
            if(key.isConnectable()){
                SocketChannel channel = (SocketChannel)key.channel();
                if(!channel.isConnectionPending()) return;
                //channel是非阻塞的,为确保连接成功
                while (!channel.finishConnect()){}
                System.out.println("连接成功!!");
                key.interestOps(SelectionKey.OP_READ);
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        while (true){
                            Scanner scanner = new Scanner(System.in);
                            while (scanner.hasNext()){
                                String message = name + ":" + scanner.next() ;
                                try {
                                    wBuffer.put(charset.encode(message));
                                    wBuffer.flip();
                                    channel.write(wBuffer);
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }finally {
                                    wBuffer.clear();
                                }
    
                            }
                        }
    
    
                    }
                }).start();
    
    
            }else if(key.isReadable()){
                SocketChannel channel = (SocketChannel)key.channel();
                try {
                    int length = channel.read(rBuffer);
                    if(length > 0){
                        rBuffer.flip();
                        System.out.println(charset.decode(rBuffer));
                    }
                    rBuffer.clear();
                } catch (IOException e) {
                    e.printStackTrace();
                }finally {
                    rBuffer.clear();
                }
    
            }
        }
    
        public static void main(String[] args) {
            Client client = new Client("localhost", 7789, "小蓝");
    
        }
    }
    
    • NIO进阶

    pipe

    定义
    两个线程之间的单向数据连接。有一个source管道和一个sink管道,数据写入到sink管道,然后从source管道被读取。

    用法

    1. 创建
      Pipe pile = Pipe.open();

    2. 写入
      Pipe.SinkChannel sinkChannel = pipe.sink(); sinkChannel.write(buf);

    3. 读取
      Pipe.SourceChannel sourceChannel = pipe.source(); sourceChannel.read(buff);

    相关文章

      网友评论

          本文标题:NIO学习

          本文链接:https://www.haomeiwen.com/subject/plxzuftx.html