美文网首页
I/O-NIO-多路复用器

I/O-NIO-多路复用器

作者: 麦大大吃不胖 | 来源:发表于2020-11-30 12:27 被阅读0次

by shihang.mai

1. 概述

通过一个系统调用,获取多个IO状态,叫多路复用器。在Linux下多路复用器都是同步模型。(只要程序自己读写IO,那么IO模型就是同步的

多路复用器
只关注IO:不关注从IO读写完之后的事情

同步:app自己R/W
异步:kernel完成R/W 只有win:iocp
阻塞:blocking
非阻塞:non-blocking

linux以及成熟的框架netty:同步非阻塞 同步阻塞

2. select poll

select poll

系统调用

  1. 调用socket()返回一个fd
  2. 调用bind(fd,9090),将fd和端口绑定起来
  3. 调用listen(),监听端口
  4. 调用select(fds)查找有状态的IO
  5. 调用recv(有状态的fd)

其实无论NIO SELECT POLL都需要遍历所有的IO询问状态,只不过:

  • 普通NIO:这个遍历成本在用户态切换内核态

  • select poll :这个遍历过程只触发了一次系统调用,用户态内核态的切换,过程中,把fds传递给内核,内核重新根据用户这次调用传过来的fds,遍历,修改状态

2.1 弊端

  1. 每次重复传递fds
  2. 每次内核被调用之后,针对这次调用,触发一次遍历fds全量的复杂度

3. epoll

3.1 概述

epoll

现象调用过程

  1. 首先,执行无论BIO NOI SELECT poll都有的socket->bind->listen,如listen得到fd4->socket连接
  2. 调用epoll_create,创建fd6(叫epfd)->红黑树
  3. 调用epoll_ctl(fd6,add,fd4,accept),把fd4加入到fd6红黑树中
  4. 调用epoll_wait等待一个链表,这时链表没数据
  5. 当客户端通过网卡发送消息时,会在fd4的buffer中写数据,并且做一个延伸处理(原来的中断中加入延伸逻辑),将fd4在fd6中查找,并改变状态复制到链表中,这时链表含有fd4。
  6. 这时调用epoll_wait的话直接得到有状态的IO。

系统调用过程

  1. 调用socket()返回一个fd,例如fd1
  2. 调用bind(fd1,9090),将fd和端口绑定起来
  3. 调用listen(),监听端口
  4. 调用epoll_create(),产生一个红黑树,当然也会产生一个fd代表这个红黑树,例如fd2
  5. 调用epoll_ctl(fd2,add,fd1,accept),意思是在红黑树添加fd1,并且关注的是accept事件
  6. 调用epoll_wait(),它等待的是一个链表
  7. 当有一个连接过来,即是fd1的accept事件,就会将fd1转移到链表中,epoll_wait拿到了后accept产生一个fd3,调用epoll_ctl(fd2,add,fd3,recv)重新加入到红黑树
  8. 当这个链接发消息过来,就会将这个fd3移动到链表里
  9. 那么epoll_wait拿到的都是有状态的IO

优势:

  1. epoll直接调用epoll_ctl增量加入新的fd,解决重复传入fds问题,
  2. 在内核中做了将有状态的IO直接copy到链表,调用epoll_wait直接拿到有状态的IO,解决了遍历fds问题

3.2 举例

public class SocketMultiplexingSingleThreadv1 {

    private ServerSocketChannel server = null;
    private Selector selector = null;
    int port = 9090;

    public void initServer() {
        try {
            //这个server相当于listen状态的fd4
            server = ServerSocketChannel.open();
            //设置non-blocking
            server.configureBlocking(false);
            //绑定端口
            server.bind(new InetSocketAddress(port));
            /*
             * 创建多路复用器 优先使用epoll
             * select poll:创建一个数组用来存放fds
             * epoll:调用epoll_create==>fd3
             */
            selector = Selector.open();
            /*
             * select poll:把fd4放入数组
             * epoll:调用epoll_ctl(fd3,add,fd4,epoll_in) 将fd4放入 上面epoll_create得到的内存空间fd3(实际运行时在selector.select时才放进去,懒加载),注册accept事件
             */
            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        initServer();
        System.out.println("服务器启动了。。。。。");
        try {
            while (true) {
                /*
                 * select poll:查看数组中有的fds
                 * epoll:查看fd3 红黑树中的fds
                 */
                Set<SelectionKey> keys = selector.keys();
                System.out.println(keys.size()+"   size");
                /*
                 * select poll:传入fds,查找有状态的IO
                 * epoll:epoll_wait
                 */
                while (selector.select(500) > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = selectionKeys.iterator();
                    //无论基于那种多路复用器,得到有状态的IO后,都要自行R/W
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        //是否需要accept。服务端未accept前,客户端建立连接并发消息,就会进到这里
                        if (key.isAcceptable()) {
                            acceptHandler(key);
                            //是否可读。即是否已经分配进程处理socket连接
                        } else if (key.isReadable()) {
                            readHandler(key);
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            //accept得到client
            SocketChannel client = ssc.accept();
            //设置client non-blocking 为了读不阻塞
            client.configureBlocking(false);
            //创建缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(8192);
            /*
             *注册当前fd对应的read buffer
             */
            client.register(selector, SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("新客户端:" + client.getRemoteAddress());
            System.out.println("-------------------------------------------");

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read = 0;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
        service.start();
    }
}

这个红黑树观察的文件描述符是有限的,属性max_user_watch

4. epoll单线程/多线程

场景:服务端接收到客户端数据,写回给客户端

单线程_多线程epoll

4.1 单线程

  1. 主线程通过selector查找有状态的IO,然后调用readHandler(),readHandler读取后直接write

  2. 主线程通过selector查找有状态的IO,然后调用readHandler(),readHandler里将当前fd.register selector OP_WRITE,那么再查找有状态的IO时,就会触发writeHandler(),再去write

4.2 多线程

  1. 主线程通过selector查找有状态的IO,然后new thread()去执行readHandler(),readHandler里将当前fd.register selector OP_WRITE,但这个操作需要时间,当抛出线程,立刻返回,再一次通过selector查找有状态的IO,仍然会有当前这个fd。所有会重复调起readHandler()
  2. 当1中注册了写事件,而写事件是看send-q有没空位,所以当主线程再次通过selector查找有状态的IO,会重复new thread()调起writeHandler

解决重复调用办法:在调用readHandler()和writeHandler()前加入key.cancle(epoll_ctl(del))

4.2.1 多线程弊端:

  1. 考虑资源利用,充分利用cpu核数。

  2. 考虑有一个fd执行耗时长,在一个线程里会阻塞后续的fd的处理

    但是会重复register和cancle系统调用。

4.3 解决

  • 当有N个fd有R/w处理时,将N个FD分组,每一个组一个selector,将一个selector压到一个线程上
  • 最好的线程数量时cpu个数或者cpu个数*2
  • 其实单看一个线程,里面只有一个selector,有一部分fd,它们在自己的cpu上执行,代表会有多个selector并行,并且线程内部线性的,最终是并行的fd被处理
  • 那么我也可以拿出一个线程的selector只关注accept,然后将接受的fd分配给其他线程selector

相关文章

  • I/O-NIO-多路复用器

    多路复用器 by shihang.mai 概述 通过一个系统调用,获取多个IO状态,叫多路复用器。在Linux下多...

  • NIO学习-1 概述

    服务器实现模式为一个请求一个线程,但客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时...

  • 多路复用器

    第三方路由介绍 多路复用器,只需要实现ServeHTTP方法即可实现, net/http包中的ServeMux提供...

  • 为什么Netty使用NIO而不是AIO?

    NIO模型同步非阻塞服务器实现模式为一个请求一个线程,但客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询...

  • Screen极简指南

    Screen极简指南 Terminal Multiplex 终端多路复用器主要特性: Persistent 可以持...

  • Java多线程:Linux多路复用,Java NIO与Netty

    JVM的多路复用器实现原理 Linux 2.5以前:select/poll Linux 2.6以后: epoll ...

  • 面试官:说说Netty断开连接的原理

    多路复用器(Selector) 接收到OP_READ事件:处理OP_READ事件: NioSocketChanne...

  • Tmux --man文档笔记

    名称 -- 终端多路复用器 语法 tmux [-2CDluvV] [-c shell-command] [-f f...

  • I/O-NIO-普通

    NIO by shihang.mai SocketNIO 一个线程里,设置accept非阻塞, 然后每接收一个客户...

  • Netty 线程模型

    Reactor模型 Netty中的Reactor模型主要由多路复用器(Acceptor)、事件分发器(Dispat...

网友评论

      本文标题:I/O-NIO-多路复用器

      本文链接:https://www.haomeiwen.com/subject/wdkoiktx.html