美文网首页
Java 多线程NIO学习

Java 多线程NIO学习

作者: gnocuohz | 来源:发表于2019-04-14 17:31 被阅读0次

    IO模型

    1. 阻塞IO
      如果数据没有准备就绪,就一直等待,直到数据准备就绪;整个进程会被阻塞。
    2. 非阻塞IO
      需不断询问内核是否已经准备好数据,非阻塞虽然不用等待但是一直占用CPU。
    3. 多路复用IO NIO
      多路复用IO,会有一个线程不断地去轮询多个socket的状态,当socket有读写事件的时候才会调用IO读写操作。
      用一个线程管理多个socket,是通过selector.select()查询每个通道是否有事件到达,如果没有事件到达,则会一直阻塞在那里,因此也会带来线程阻塞问题。
    4. 信号驱动IO模型
      在信号驱动IO模型中,当用户发起一个IO请求操作时,会给对应的socket注册一个信号函数,线程会继续执行,当数据准备就绪的时候会给线程发送一个信号,线程接受到信号时,会在信号函数中进行IO操作。
      非阻塞IO、多路复用IO、信号驱动IO都不会造成IO操作的第一步,查看数据是否准备就绪而带来的线程阻塞,但是在第二步,对数据进行拷贝都会使线程阻塞。
    5. 异步IO jdk7AIO
      异步IO是最理想的IO模型,当线程发出一个IO请求操作时,接着就去做自己的事情了,内核去查看数据是否准备就绪和准备就绪后对数据的拷贝,拷贝完以后内核会给线程发送一个通知说整个IO操作已经完成了,数据可以直接使用了。
      同步的IO操作在第二个阶段,对数据的拷贝阶段,都会造成线程的阻塞,异步IO则不会。

    异步IO在IO操作的两个阶段,都不会使线程阻塞。
    Java 的 I/O 依赖于操作系统的实现。

    Java NIO的工作原理

    1. 由一个专门的线程(Selector)来处理所有的IO事件,并负责分发。
    2. 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
    3. 线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。

    三大基本组件

    Channel

    1. FileChannel, 从文件中读写数据。
    2. DatagramChannel,通过UDP读写网络中的数据。
    3. SocketChannel,通过TCP读写网络中的数据。
    4. ServerSocketChannel,可以监听新进来的TCP连接,对每一个新进来的连接都会创建一个SocketChannel。

    Java NIO 的通道类似流,但又有些不同:

    1. 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
    2. 通道可以异步地读写。
    3. 通道中的数据总是要先读到一个 Buffer,或者总是要从一个 Buffer 中写入。

    Buffer

    关键的Buffer实现
    ByteBuffer,CharBuffer,DoubleBuffer,FloatBuffer,IntBuffer,LongBuffer,ShortBuffer

    Buffer两种模式、三个属性:


    capacity
    作为一个内存块,Buffer有一个固定的大小值,也叫“capacity”.你只能往里写capacity个byte、long,char等类型。一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。

    position
    当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1.
    当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0. 当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。

    limit
    在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。 写模式下,limit等于Buffer的capacity。
    当切换Buffer到读模式时, limit表示你最多能读到多少数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。换句话说,你能读到之前写入的所有数据(limit被设置成已写数据的数量,这个值在写模式下就是position)

    参考链接:Buffer原理 https://www.cnblogs.com/chenpi/p/6475510.html

    Selector

    Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

    监听四种事件

    1. SelectionKey.OP_CONNECT
    2. SelectionKey.OP_ACCEPT
    3. SelectionKey.OP_READ
    4. SelectionKey.OP_WRITE

    select()方法
    select()阻塞到至少有一个通道在你注册的事件上就绪了。
    select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
    selectedKeys()方法
    调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。

    参考链接:操作系统层面分析Selector原理 http://zhhphappy.iteye.com/blog/2032893

    NIO实现

    服务端

    public class NIOServerSocket {
     
        //存储SelectionKey的队列
        private static List<SelectionKey> writeQueue = new ArrayList<SelectionKey>();
        private static Selector selector = null;
     
        //添加SelectionKey到队列
        public static void addWriteQueue(SelectionKey key){
            synchronized (writeQueue) {
                writeQueue.add(key);
                //唤醒主线程
                selector.wakeup();
            }
        }
     
        public static void main(String[] args) throws IOException {
     
            // 1.创建ServerSocketChannel
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 2.绑定端口
            serverSocketChannel.bind(new InetSocketAddress(60000));
            // 3.设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            // 4.创建通道选择器
            selector = Selector.open();
            /*
             * 5.注册事件类型
             *
             *  sel:通道选择器
             *  ops:事件类型 ==>SelectionKey:包装类,包含事件类型和通道本身。四个常量类型表示四种事件类型
             *  SelectionKey.OP_ACCEPT 获取报文      SelectionKey.OP_CONNECT 连接
             *  SelectionKey.OP_READ 读           SelectionKey.OP_WRITE 写
             */
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                System.out.println("服务器端:正在监听60000端口");
                // 6.获取可用I/O通道,获得有多少可用的通道
                int num = selector.select();
                if (num > 0) { // 判断是否存在可用的通道
                    // 获得所有的keys
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    // 使用iterator遍历所有的keys
                    Iterator<SelectionKey> iterator = selectedKeys.iterator();
                    // 迭代遍历当前I/O通道
                    while (iterator.hasNext()) {
                        // 获得当前key
                        SelectionKey key = iterator.next();
                        // 调用iterator的remove()方法,并不是移除当前I/O通道,标识当前I/O通道已经处理。
                        iterator.remove();
                        // 判断事件类型,做对应的处理
                        if (key.isAcceptable()) {
                            ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
                            SocketChannel socketChannel = ssChannel.accept();
     
                            System.out.println("处理请求:"+ socketChannel.getRemoteAddress());
                            // 获取客户端的数据
                            // 设置非阻塞状态
                            socketChannel.configureBlocking(false);
                            // 注册到selector(通道选择器)
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isReadable()) {
                            System.out.println("读事件");
                            //取消读事件的监控
                            key.cancel();
                            //调用读操作工具类
                            NIOHandler.read(key);
                        } else if (key.isWritable()) {
                            System.out.println("写事件");
                            //取消读事件的监控
                            key.cancel();
                            //调用写操作工具类
                            NIOHandler.write(key);
                        }
                    }
                }else{
                    synchronized (writeQueue) {
                        while(writeQueue.size() > 0){
                            SelectionKey key = writeQueue.remove(0);
                            //注册写事件
                            SocketChannel channel = (SocketChannel) key.channel();
                            Object attachment = key.attachment();
                            channel.register(selector, SelectionKey.OP_WRITE,attachment);
                        }
                    }
                }
            }
        }
     
    }
    

    消息处理

    public class NIOHandler {
     
        //构造线程池
        private static ExecutorService executorService  = Executors.newFixedThreadPool(10);
     
        public static void read(final SelectionKey key){
            //获得线程并执行
            executorService.submit(new Runnable() {
     
                @Override
                public void run() {
                    try {
                        SocketChannel readChannel = (SocketChannel) key.channel();
                        // I/O读数据操作
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        int len = 0;
                        while (true) {
                            buffer.clear();
                            len = readChannel.read(buffer);
                            if (len == -1) break;
                            buffer.flip();
                            while (buffer.hasRemaining()) {
                                baos.write(buffer.get());
                            }
                        }
                        System.out.println("服务器端接收到的数据:"+ new String(baos.toByteArray()));
                        //将数据添加到key中
                        key.attach(baos);
                        //将注册写操作添加到队列中
                        NIOServerSocket.addWriteQueue(key);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
     
        public static void write(final SelectionKey key) {
            //拿到线程并执行
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 写操作
                        SocketChannel writeChannel = (SocketChannel) key.channel();
                        //拿到客户端传递的数据
                        ByteArrayOutputStream attachment = (ByteArrayOutputStream)key.attachment();
                        System.out.println("客户端发送来的数据:"+new String(attachment.toByteArray()));
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        String message = "你好,我是服务器!!";
                        buffer.put(message.getBytes());
                        buffer.flip();
                        writeChannel.write(buffer);
                        writeChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    

    客户端

    public class NIOClientSocket {
     
        public static void main(String[] args) throws IOException {
            //使用线程模拟用户 并发访问
            for (int i = 0; i < 1; i++) {
                new Thread(){
                    public void run() {
                        try {
                            //1.创建SocketChannel
                            SocketChannel socketChannel=SocketChannel.open();
                            //2.连接服务器
                            socketChannel.connect(new InetSocketAddress("localhost",60000));
                            //写数据
                            String msg="我是客户端"+Thread.currentThread().getId();
                            ByteBuffer buffer=ByteBuffer.allocate(1024);
                            buffer.put(msg.getBytes());
                            buffer.flip();
                            socketChannel.write(buffer);
                            socketChannel.shutdownOutput();
                            //读数据
                            ByteArrayOutputStream bos = new ByteArrayOutputStream();
                            int len = 0;
                            while (true) {
                                buffer.clear();
                                len = socketChannel.read(buffer);
                                if (len == -1)
                                    break;
                                buffer.flip();
                                while (buffer.hasRemaining()) {
                                    bos.write(buffer.get());
                                }
                            }
                            System.out.println("客户端收到:"+new String(bos.toByteArray()));
                            socketChannel.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    };
                }.start();
            }
        }
    }
    

    多线程NIO Tips

    1. 示例代码仅供学习参考。对于一个已经被监听到的事件,处理前先取消事件(SelectionKey .cancel())监控。否则selector.selectedKeys()会一直获取到该事件,但该方法比较粗暴,并且后续register会产生多个SelectionKey。推荐使用selectionKey.interestOps()改变感兴趣事件。
    2. Selector.select()和Channel.register()需同步。
    3. 当Channel设置为非阻塞(Channel.configureBlocking(false))时,SocketChannel.read 没读到数据也会返回,返回参数等于0。
    4. OP_WRITE事件,写缓冲区在绝大部分时候都是有空闲空间的,所以如果你注册了写事件,这会使得写事件一直处于就就绪,选择处理现场就会一直占用着CPU资源。参考下面的第二个链接。
    5. 粘包问题。

    参考链接:SocketChannel.read https://blog.csdn.net/cao478208248/article/details/41648359
    参考链接:NIO坑 https://www.jianshu.com/p/1af407c043cb

    相关文章

      网友评论

          本文标题:Java 多线程NIO学习

          本文链接:https://www.haomeiwen.com/subject/tweswqtx.html