美文网首页
linux的底层支持以及几种io对比

linux的底层支持以及几种io对比

作者: 无聊之园 | 来源:发表于2019-05-12 10:01 被阅读0次

    1. nio的底层系统的支持

    liunx的io模型:(只是大概一个感性的认识,liunx的网络原理不深究。)

    堵塞io:数据准备好,并且复制到应用内存,才返回。期间一直堵塞。
    非堵塞io:io命令并不堵塞,数据没准备好,则返回一个标识错误,轮询检查数据是否准备好并复制到应用内存。
    i/o复用模型:一个select或poll轮询检查所有的socket io是否准备好数据,用一个select处理了所有socket io,liunx的epoll性能更高,一个select检查的socket io没有限制,基于事件驱动,而不是轮询扫描,数据准备好则回调方法。
    信号驱动io模型:通过发送信号的方式,socket io发送信号,数据准备好发送信号回调。
    异步io:当数据准备并且复制到应用内存之后,内核发送通知给应用可以进行io操作了。

    nio使用的就是io多路复用原理
    linux针对多路复用采用的select、poll等都存在缺陷,一个select轮询的socket数量有限制,效率低。之后采用epoll,epoll的优点有:轮询的socket数量没有限制,效率高:基于socket的callback回调,所以不会把性能浪费在非活跃socket上。epoll和内核mmap共享同一块内存区域,减少了一次复制过程。

    nio的缺点:针对文件系统的处理方法能力有点不足。

    2.bio

    bio的缺点:

    1. 线程个数和客户连接数是1对1关系。(可以用线程池解决)
    2. read和write操作都是堵塞的,数据没准备好,则线程一直堵塞,浪费资源。

    3. nio

    new io或者no block io。
    nio基于的主要的对象是:Buffer、Channel、Selector
    Buffer:buffer可以开辟堆内存,堆内存就是普通的java堆,堆外内存则是直接在java堆之外,开辟和回收代价大,但是进行比如socket数据交换的时候少了一次java堆到操作系统的内存复制。
    Channel:channel是一个双向操作的通道。
    Select:多路复用select,select轮询注册到select上的channel,如果channel有状态变更,用户再去处理这个channel。
    关键代码:

    服务端:

    public MultiplexerTimeServer(int port) {
            try {
                selector = Selector.open();
                servChannel = ServerSocketChannel.open();
                servChannel.configureBlocking(false);
                servChannel.socket().bind(new InetSocketAddress(port), 1024);
                            // select轮询捕捉servChannel的accept状态
                servChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("The time server is start in port : " + port);
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    
    public void run() {
            while (!stop) {
                try {
    // select轮询捕捉状态变更,如果没有channel状态发生变更,则一直堵塞,直到传入的1000毫秒超时了,返回捕捉数0。
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    SelectionKey key = null;
                    while (it.hasNext()) {
                        key = it.next();
      // 把这个selectKey移除,不移除selectedKeys中永远存在这个selectkey
                        it.remove();
                        try {
                            handleInput(key);
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null)
                                    key.channel().close();
                            }
                        }
                    }
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
    
            // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
            if (selector != null)
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
        }
    
        private void handleInput(SelectionKey key) throws IOException {
    
            if (key.isValid()) {
                // 处理新接入的请求消息
                if (key.isAcceptable()) {
                    // Accept the new connection
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    // socketchannel也注册到select中去,select捕捉其read状态变更
                    sc.register(selector, SelectionKey.OP_READ);
                }
                if (key.isReadable()) {
                    // Read the data
                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = sc.read(readBuffer);
                    if (readBytes > 0) {
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes, "UTF-8");
                        System.out.println("The time server receive order : "
                                + body);
                        String currentTime = "QUERY TIME ORDER"
                                .equalsIgnoreCase(body) ? new java.util.Date(
                                System.currentTimeMillis()).toString()
                                : "BAD ORDER";
                        doWrite(sc, currentTime);
                    } else if (readBytes < 0) {
                        // 对端链路关闭
                        key.cancel();
                        sc.close();
                    } else
                        ; // 读到0字节,忽略
                }
            }
        }
    
        private void doWrite(SocketChannel channel, String response)
                throws IOException {
            if (response != null && response.trim().length() > 0) {
                byte[] bytes = response.getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                writeBuffer.flip();
                channel.write(writeBuffer);
            }
        }
    

    客户端

    public class TimeClientHandle implements Runnable {
    
        private String host;
        private int port;
    
        private Selector selector;
        private SocketChannel socketChannel;
    
        private volatile boolean stop;
    
        public TimeClientHandle(String host, int port) {
            this.host = host == null ? "127.0.0.1" : host;
            this.port = port;
            try {
                selector = Selector.open();
                socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    
        /*
         * (non-Javadoc)
         *
         * @see java.lang.Runnable#run()
         */
        @Override
        public void run() {
            try {
                doConnect();
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
            while (!stop) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    SelectionKey key = null;
                    while (it.hasNext()) {
                        key = it.next();
                        it.remove();
                        try {
                            handleInput(key);
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null)
                                    key.channel().close();
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    System.exit(1);
                }
            }
    
            // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
            if (selector != null)
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
        }
    
        private void handleInput(SelectionKey key) throws IOException {
    
            if (key.isValid()) {
                // 判断是否连接成功
                SocketChannel sc = (SocketChannel) key.channel();
                if (key.isConnectable()) {
                    if (sc.finishConnect()) {
                        sc.register(selector, SelectionKey.OP_READ);
                        doWrite(sc);
                    } else
                        System.exit(1);// 连接失败,进程退出
                }
                if (key.isReadable()) {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = sc.read(readBuffer);
                    if (readBytes > 0) {
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes, "UTF-8");
                        System.out.println("Now is : " + body);
                        this.stop = true;
                    } else if (readBytes < 0) {
                        // 对端链路关闭
                        key.cancel();
                        sc.close();
                    } else
                        ; // 读到0字节,忽略
                }
            }
    
        }
    
        private void doConnect() throws IOException {
            // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
            if (socketChannel.connect(new InetSocketAddress(host, port))) {
                socketChannel.register(selector, SelectionKey.OP_READ);
                doWrite(socketChannel);
            } else
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    
        private void doWrite(SocketChannel sc) throws IOException {
            byte[] req = "QUERY TIME ORDER".getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
            writeBuffer.put(req);
            writeBuffer.flip();
            sc.write(writeBuffer);
            if (!writeBuffer.hasRemaining())
                System.out.println("Send order 2 server succeed.");
        }
    
    }
    

    nio总结:代码其实和啰嗦繁琐,而且很容易出问题,比如例子中的nio会有粘包拆包现象,所以nio一般不直接使用。

    4.Aio

    nio 2引入的概念。真正的异步非堵塞io,基于事件驱动的,而不是nio一样轮询select。

    服务端

    public class AsyncTimeServerHandler implements Runnable {
    
        private int port;
    
        CountDownLatch latch;
        AsynchronousServerSocketChannel asynchronousServerSocketChannel;
    
        public AsyncTimeServerHandler(int port) {
        this.port = port;
        try {
                // 这是nio包下的类
            asynchronousServerSocketChannel = AsynchronousServerSocketChannel
                .open();
            asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
        }
    
        /*
         * (non-Javadoc)
         * 
         * @see java.lang.Runnable#run()
         */
        @Override
        public void run() {
            // countDownlatch只是为了不让线程结束
        latch = new CountDownLatch(1);
        doAccept();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        }
    
        public void doAccept() {
            // 传入一个accept的处理类
        asynchronousServerSocketChannel.accept(this,
            new AcceptCompletionHandler());
        }
    
    }
    
    public class AcceptCompletionHandler implements
        CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {
      // 数据准备好并且复制到应用内存之后,回调这个方法
        @Override
        public void completed(AsynchronousSocketChannel result,
            AsyncTimeServerHandler attachment) {
        attachment.asynchronousServerSocketChannel.accept(attachment, this);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        result.read(buffer, buffer, new ReadCompletionHandler(result));
        }
    
        @Override
        public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
        exc.printStackTrace();
        attachment.latch.countDown();
        }
    
    }
    

    客户端

    public class AsyncTimeClientHandler implements
        CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
    
        private AsynchronousSocketChannel client;
        private String host;
        private int port;
        private CountDownLatch latch;
    
        public AsyncTimeClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            client = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        }
    
        @Override
        public void run() {
    
        latch = new CountDownLatch(1);
        client.connect(new InetSocketAddress(host, port), this, this);
        try {
            latch.await();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        try {
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        }
    
        @Override
        public void completed(Void result, AsyncTimeClientHandler attachment) {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        client.write(writeBuffer, writeBuffer,
            new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {
                    client.write(buffer, buffer, this);
                } else {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    client.read(
                        readBuffer,
                        readBuffer,
                        new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result,
                            ByteBuffer buffer) {
                            buffer.flip();
                            byte[] bytes = new byte[buffer
                                .remaining()];
                            buffer.get(bytes);
                            String body;
                            try {
                            body = new String(bytes,
                                "UTF-8");
                            System.out.println("Now is : "
                                + body);
                            latch.countDown();
                            } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                            }
                        }
    
                        @Override
                        public void failed(Throwable exc,
                            ByteBuffer attachment) {
                            try {
                            client.close();
                            latch.countDown();
                            } catch (IOException e) {
                            // ingnore on close
                            }
                        }
                        });
                }
                }
    
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    client.close();
                    latch.countDown();
                } catch (IOException e) {
                    // ingnore on close
                }
                }
            });
        }
    
        @Override
        public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
        exc.printStackTrace();
        try {
            client.close();
            latch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
        }
        }
    
    }
    

    那么为什么netty不使用aio而使用nio呢?netty作者说,在unix系统上aio不会更快比nio,netty已经有一个稳定的nio的封装了。

    Not faster than NIO (epoll) on unix systems (which is true)
    There is no daragram suppport
    Unnecessary threading model (too much abstraction without usage)
    

    几种io对比:

    image.png

    相关文章

      网友评论

          本文标题:linux的底层支持以及几种io对比

          本文链接:https://www.haomeiwen.com/subject/hzvqaqtx.html