美文网首页
I/O-NIO-多路复用器

I/O-NIO-多路复用器

作者: 麦大大吃不胖 | 来源:发表于2020-11-30 12:27 被阅读0次

    by shihang.mai

    1. 概述

    通过一个系统调用,获取多个IO状态,叫多路复用器。在Linux下多路复用器都是同步模型。(只要程序自己读写IO,那么IO模型就是同步的

    多路复用器
    只关注IO:不关注从IO读写完之后的事情
    
    同步:app自己R/W
    异步:kernel完成R/W 只有win:iocp
    阻塞:blocking
    非阻塞:non-blocking
    
    linux以及成熟的框架netty:同步非阻塞 同步阻塞
    

    2. select poll

    select poll

    系统调用

    1. 调用socket()返回一个fd
    2. 调用bind(fd,9090),将fd和端口绑定起来
    3. 调用listen(),监听端口
    4. 调用select(fds)查找有状态的IO
    5. 调用recv(有状态的fd)

    其实无论NIO SELECT POLL都需要遍历所有的IO询问状态,只不过:

    • 普通NIO:这个遍历成本在用户态切换内核态

    • select poll :这个遍历过程只触发了一次系统调用,用户态内核态的切换,过程中,把fds传递给内核,内核重新根据用户这次调用传过来的fds,遍历,修改状态

    2.1 弊端

    1. 每次重复传递fds
    2. 每次内核被调用之后,针对这次调用,触发一次遍历fds全量的复杂度

    3. epoll

    3.1 概述

    epoll

    现象调用过程

    1. 首先,执行无论BIO NOI SELECT poll都有的socket->bind->listen,如listen得到fd4->socket连接
    2. 调用epoll_create,创建fd6(叫epfd)->红黑树
    3. 调用epoll_ctl(fd6,add,fd4,accept),把fd4加入到fd6红黑树中
    4. 调用epoll_wait等待一个链表,这时链表没数据
    5. 当客户端通过网卡发送消息时,会在fd4的buffer中写数据,并且做一个延伸处理(原来的中断中加入延伸逻辑),将fd4在fd6中查找,并改变状态复制到链表中,这时链表含有fd4。
    6. 这时调用epoll_wait的话直接得到有状态的IO。

    系统调用过程

    1. 调用socket()返回一个fd,例如fd1
    2. 调用bind(fd1,9090),将fd和端口绑定起来
    3. 调用listen(),监听端口
    4. 调用epoll_create(),产生一个红黑树,当然也会产生一个fd代表这个红黑树,例如fd2
    5. 调用epoll_ctl(fd2,add,fd1,accept),意思是在红黑树添加fd1,并且关注的是accept事件
    6. 调用epoll_wait(),它等待的是一个链表
    7. 当有一个连接过来,即是fd1的accept事件,就会将fd1转移到链表中,epoll_wait拿到了后accept产生一个fd3,调用epoll_ctl(fd2,add,fd3,recv)重新加入到红黑树
    8. 当这个链接发消息过来,就会将这个fd3移动到链表里
    9. 那么epoll_wait拿到的都是有状态的IO

    优势:

    1. epoll直接调用epoll_ctl增量加入新的fd,解决重复传入fds问题,
    2. 在内核中做了将有状态的IO直接copy到链表,调用epoll_wait直接拿到有状态的IO,解决了遍历fds问题

    3.2 举例

    public class SocketMultiplexingSingleThreadv1 {
    
        private ServerSocketChannel server = null;
        private Selector selector = null;
        int port = 9090;
    
        public void initServer() {
            try {
                //这个server相当于listen状态的fd4
                server = ServerSocketChannel.open();
                //设置non-blocking
                server.configureBlocking(false);
                //绑定端口
                server.bind(new InetSocketAddress(port));
                /*
                 * 创建多路复用器 优先使用epoll
                 * select poll:创建一个数组用来存放fds
                 * epoll:调用epoll_create==>fd3
                 */
                selector = Selector.open();
                /*
                 * select poll:把fd4放入数组
                 * epoll:调用epoll_ctl(fd3,add,fd4,epoll_in) 将fd4放入 上面epoll_create得到的内存空间fd3(实际运行时在selector.select时才放进去,懒加载),注册accept事件
                 */
                server.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void start() {
            initServer();
            System.out.println("服务器启动了。。。。。");
            try {
                while (true) {
                    /*
                     * select poll:查看数组中有的fds
                     * epoll:查看fd3 红黑树中的fds
                     */
                    Set<SelectionKey> keys = selector.keys();
                    System.out.println(keys.size()+"   size");
                    /*
                     * select poll:传入fds,查找有状态的IO
                     * epoll:epoll_wait
                     */
                    while (selector.select(500) > 0) {
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iter = selectionKeys.iterator();
                        //无论基于那种多路复用器,得到有状态的IO后,都要自行R/W
                        while (iter.hasNext()) {
                            SelectionKey key = iter.next();
                            iter.remove();
                            //是否需要accept。服务端未accept前,客户端建立连接并发消息,就会进到这里
                            if (key.isAcceptable()) {
                                acceptHandler(key);
                                //是否可读。即是否已经分配进程处理socket连接
                            } else if (key.isReadable()) {
                                readHandler(key);
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void acceptHandler(SelectionKey key) {
            try {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                //accept得到client
                SocketChannel client = ssc.accept();
                //设置client non-blocking 为了读不阻塞
                client.configureBlocking(false);
                //创建缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(8192);
                /*
                 *注册当前fd对应的read buffer
                 */
                client.register(selector, SelectionKey.OP_READ, buffer);
                System.out.println("-------------------------------------------");
                System.out.println("新客户端:" + client.getRemoteAddress());
                System.out.println("-------------------------------------------");
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void readHandler(SelectionKey key) {
            SocketChannel client = (SocketChannel) key.channel();
            ByteBuffer buffer = (ByteBuffer) key.attachment();
            buffer.clear();
            int read = 0;
            try {
                while (true) {
                    read = client.read(buffer);
                    if (read > 0) {
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            client.write(buffer);
                        }
                        buffer.clear();
                    } else if (read == 0) {
                        break;
                    } else {
                        client.close();
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
            service.start();
        }
    }
    

    这个红黑树观察的文件描述符是有限的,属性max_user_watch

    4. epoll单线程/多线程

    场景:服务端接收到客户端数据,写回给客户端

    单线程_多线程epoll

    4.1 单线程

    1. 主线程通过selector查找有状态的IO,然后调用readHandler(),readHandler读取后直接write

    2. 主线程通过selector查找有状态的IO,然后调用readHandler(),readHandler里将当前fd.register selector OP_WRITE,那么再查找有状态的IO时,就会触发writeHandler(),再去write

    4.2 多线程

    1. 主线程通过selector查找有状态的IO,然后new thread()去执行readHandler(),readHandler里将当前fd.register selector OP_WRITE,但这个操作需要时间,当抛出线程,立刻返回,再一次通过selector查找有状态的IO,仍然会有当前这个fd。所有会重复调起readHandler()
    2. 当1中注册了写事件,而写事件是看send-q有没空位,所以当主线程再次通过selector查找有状态的IO,会重复new thread()调起writeHandler

    解决重复调用办法:在调用readHandler()和writeHandler()前加入key.cancle(epoll_ctl(del))

    4.2.1 多线程弊端:

    1. 考虑资源利用,充分利用cpu核数。

    2. 考虑有一个fd执行耗时长,在一个线程里会阻塞后续的fd的处理

      但是会重复register和cancle系统调用。

    4.3 解决

    • 当有N个fd有R/w处理时,将N个FD分组,每一个组一个selector,将一个selector压到一个线程上
    • 最好的线程数量时cpu个数或者cpu个数*2
    • 其实单看一个线程,里面只有一个selector,有一部分fd,它们在自己的cpu上执行,代表会有多个selector并行,并且线程内部线性的,最终是并行的fd被处理
    • 那么我也可以拿出一个线程的selector只关注accept,然后将接受的fd分配给其他线程selector

    相关文章

      网友评论

          本文标题:I/O-NIO-多路复用器

          本文链接:https://www.haomeiwen.com/subject/wdkoiktx.html