美文网首页
IO、BIO、NIO、AIO、多路复用

IO、BIO、NIO、AIO、多路复用

作者: 请叫我平爷 | 来源:发表于2022-02-17 16:32 被阅读0次

    概念

    同步、异步

    • 数据拷贝阶段是否需要完全由操作系统处理
    • 同步:发起一个调用,被调用者未处理完之前,不返回
    • 异步:发起一个调用,立刻得到被调用者的的回应表示收到请求,但是被调用者的并没有返回结果,此时我们可以处理其他请求,被调用者通常依靠事件、回调等机制来通知调用者返回结果

    阻塞、非阻塞

    • 针对发起IO请求操作后,是否立刻由返回一个标志信息不让请求线程等待
    • 阻塞:发起一个请求,调用者一直等待结果返回,线程被挂起,无法从事其他任务
    • 非阻塞:发起一个请求,调用者不需要一直等待结果返回,可以去干别的事

    同步阻塞、同步非阻塞、异步非阻塞

    a给b发起一个请求:

    • 同步阻塞:a啥也不干等b返回,b只有当完成后返回给a
    • 同步非阻塞:a去干其他事情了,b完成后再返回给a
    • 异步非阻塞:b表示收到了,a去干其他事情了,b完成后再告诉a已经完成了
    image.png

    IO

    只要具有输入输出类型的交互系统都可以认为是I/O模型

    Input\Output
    磁盘I/O模型、网络I/O模型、内存I/O模型、Direct I/O、数据库I/O

    解决IO慢:

    1. SSD替代机械硬盘
    2. NIO替换BIO

    BIO

    Block IO:同步阻塞IO


    image.png
    • Acceptor:独立的线程负责监听客户端的连接
    • 每个线程监听每个任务,只有当前任务完成后才会去执行下一个任务。
      任务开始执行到执行完毕,线程这段时间除了等待啥都没做

    优:

    实现简单,一个线程一个任务

    缺:

    一个线程只能做一件事
    创建大量线程支持高并发,等待时间过长会影响系统性能,只能创建少量线程来支持。

    • 模拟5个client同时发送5次数据给server,server接到一个任务就分配一个线程,总共分配5个线程来接收
      • BIOServer
        public class BIOServer {
        public static void main(String[] args) {
            try {
                ServerSocket serverSocket = new ServerSocket(9000);
                new Thread(()->{
                    while (true){
                        try {
                            Socket socket = serverSocket.accept();
                            new Thread(()->{
                                try {
                                    int len;
                                    byte[] data = new byte[1024];
                                    System.out.println(socket);
                                    InputStream inputStream = socket.getInputStream();
                                    while ((len = inputStream.read(data)) != -1) {
                                        System.out.println(new String(data, 0, len));
                                    }
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }).start();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
        }
        
      • BIOClient
        public class BIOClient {
        public static void main(String[] args) {
            for (int index=0 ;index< 5 ; index++){
                new Thread(()->{
                    try{
                        Socket socket = new Socket("127.0.0.1",9000);
                        int i=0;
                        while (i<5){
                            try {
                                Date now = new Date();
                                String str = now + "" + i;
                                System.out.println(str);
                                socket.getOutputStream().write(str.getBytes());
                                Thread.sleep(2000);
                                i++;
                            }catch (Exception e){
                                e.printStackTrace();
                            }
                        }
                    } catch (Exception e){
                        e.printStackTrace();
                    }
                }).start();
            }
        }
        }
        
    image.png
    • 采用线程池和队列的方式
      -单机千级左右可用,面对十万、百万级不可行

    BIOServer

    public class BIOServer {
        public static void main(String[] args) {
            /**
             * int corePoolSize,
             * int maximumPoolSize,
             * long keepAliveTime,
             * TimeUnit unit,
             * BlockingQueue<Runnable> workQueue
             */
            ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(1,3,0,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
    
            Thread acceptor = new Thread(()->{
                ServerSocket serverSocket = null;
                try {
                    serverSocket = new ServerSocket(9000);
                    int i=0;
                    while (true){
                        Socket socket = serverSocket.accept();
                        Thread thread = new Thread(()->{
                            int len;
                            byte[] bytes = new byte[1024];
                            try {
                                InputStream inputStream = socket.getInputStream();
                                while ((len=inputStream.read(bytes))!=-1){
                                    System.out.println(new String(bytes, 0, len));
                                }
                                Thread.sleep(2000L);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        });
                        threadPoolExecutor.execute( thread);
                        i++;
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            acceptor.start();
        }
    }
    

    BIOClient

    public class BIOClient {
    
        public static void main(String[] args) {
            for (int i=0;i<100;i++){
                int finalI = i;
                new Thread(()->{
                    try {
                        Socket socket = new Socket("127.0.0.1",9000);
    
                        for (int j=0;j<3;j++) {
                            String str = new Date() + "第" + finalI + "个链接发送第" +j+ "个消息";
                            socket.getOutputStream().write(str.getBytes());
                            Thread.sleep(2000);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
    
            }
        }
    
    }
    

    NIO

    New I/O:同时支持阻塞与非阻塞,使用多路复用器机制
    No-Blocking I/O:同步非阻塞IO

    image.png
    • thread不断轮询每个任务的状态,看是否有任务完成,然后进行下一步操作
    • 比BIO抽象出了新的通道(Channel)作为输入输出的通道,提供了缓存(Buffer)支持
      • 读操作时,使用Buffer分配空间,将数据从Channel读入Buffer中
      • 写操作,将数据写入Buffer,将数据Buffer写入Channel中

    Channel:通道,跟IO中Stream(流)差不多一个等级

    • Channel与Stream区别
      1. Stream是单向的,InputStream、OutputStream
      2. Channel是双向的,既可以读也可以写。FileChannel(文件IO)、DatagramChannel(UDP)、SocketChannel(TCP)、ServerSocketChannel(sever和Client)
      3. Channel可以非阻塞的读写IO,Stream只能阻塞的读写
      4. Channel必须配合Buffer使用,无法绕开Buffer直接向Channel读写数据
    • SocketChannel:客户端发起TCP的Channel
    • ServerSocketChannel:服务端监听新的TCP连接,对每个新的TCP连接都会创建一个SocketChannel
    • DatagramChannel:通过UDP读写数据
    • FileChannel:从文件读写数据,数据可以之间从一个Channel传输到另一个Channel,不能跟Selector一起使用(与Selector一起使用的Channel必须为非阻塞)
      1. transferFrom
        RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
        FileChannel      fromChannel = fromFile.getChannel();
        RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
        FileChannel      toChannel = toFile.getChannel();
        long position = 0;
        long count = fromChannel.size();
        toChannel.transferFrom(position, count, fromChannel);
        
      2. transferTo
        RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
        FileChannel      fromChannel = fromFile.getChannel();
        RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
        FileChannel      toChannel = toFile.getChannel();
        long position = 0;
        long count = fromChannel.size();
        fromChannel.transferTo(position, count, toChannel);
        
      3. 文件NIO读
        public static void main(String[] args) {
        String fileName = "/Users/mi/java/learn/iOSApnsPushServer/src/main/webapp/index.jsp";
        try {
            RandomAccessFile aFile = new RandomAccessFile(fileName, "rw");
            FileChannel inChannel = aFile.getChannel();
            ByteBuffer buf = ByteBuffer.allocate(1024);
            int bytesRead = inChannel.read(buf);
            while (bytesRead != -1) {
                System.out.println("Read " + bytesRead);
                buf.flip();
                while(buf.hasRemaining()){
                    System.out.print((char) buf.get());
                }
                buf.clear();
                bytesRead = inChannel.read(buf);
            }
            aFile.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        }
        
      4. Scatter、Gather
        Scatter:分散,Channel将读取的数据写入多个buffer中
        Gather:聚集,多个Buffer写入一个Channel
        //scatter分散
        ByteBuffer header = ByteBuffer.allocate(128);
        ByteBuffer body   = ByteBuffer.allocate(1024);
        ByteBuffer[] bufferArray = { header, body };
        channel.read(bufferArray);
        
        
        //gather聚集
        ByteBuffer header = ByteBuffer.allocate(128);
        ByteBuffer body   = ByteBuffer.allocate(1024);
        //write data into buffers
        ByteBuffer[] bufferArray = { header, body };
        channel.write(bufferArray);
        

    Buffer:ByteBuffer(byte)、CharBuffer(char)、DoubleBuffer(double)、FloatBuffer(float)、IntBuffer(int)、LongBuffer(long)、ShortBuffer(short)、MappedByteBuffer、HeapByteBuffer、DirectByteBuffer、HeapByteBuffer在JVM的Heap(堆)上分配

    1. 本质上是一块可以读写的内存,这块内存被包装成NIO Buffer对象,并提供一组方法,用来方便访问该内存
    2. capacity:Buffer的固定大小就叫capacity,只能写byte、long、char、short、int、double、float类型,Buffer满了就需要清空才能继续写
    3. position:
      • 写的时候表示当前位置,初始的position为0,最大为capacity-1
      • 读的时候表示从特定位置读,从写切换为读,会重置为0
    4. limit:
      • 写模式下,最多能往Buffer中写多少数据,limit=capacity
      • 读模式,最多能读到多少数据,写切换为读,limit初始为position,能读到之前写入的所有数据
    5. 初始化
    ```java
    //初始化Buffer
    ByteBuffer buf = ByteBuffer.allocate(1024);
    CharBuffer buffer = CharBuffer.allocate(48);
    //写入到Buffer
    int bytesRead = inChannel.read(buf);
    //写入到Buffer
    buffer.put('a');
    //写切换到读,limit=position,position=0,读取所有写入的数据
    buf.flip();
    //从Buffer读数据
    buf.get()
    //从Buffer读数据
    int bytesWrite =  inChannel.write(buf);
    //重读所有数据
    Buffer.rewind();
     //清空Buffer,position=0,limit=capacity,还能读到之前的记录
    Buffer.clear()
    //清空Buffer,彻底清空
    Buffer.compact()
    //标记Buffer的position
    buffer.mark();
    //获取刚刚标记的position
    buffer.reset();
    ```
    
    1. euqals:只比较Buffer中剩余的元素(position到limit之间的元素)
      1. 相同的类型
      2. 剩下的个数相同
      3. 剩下的byte、char相同
    2. compareTo:
      1. 第一个不相等的元素小于另一个Buffer中对应的元素
      2. 所有元素都相等,但是其中一个Buffer中的元素先耗尽

    Selector

    1. 用单个线程处理多个Channel
    ```
    //创建Selector
    Selector selector = Selector.open();
    //注册Channel,channel必须为非阻塞状态
    channel.configureBlocking(false);
    SelectionKey key = channel.register(selector,SelectionKey.OP_READ);
    ```
    - Connet:channel成功连接
    - Accept:serverSocketChannel准备接收
    - Read:channel有数据可读
    - Write:channel等待写入数据
    

    零拷贝

    传统copy


    image.png

    零拷贝


    image.png
    • DMA:Direct Memory Access,直接存储器访问,能帮助CPU做大量的工作
      从地址空间复制到另外一个地址空间,cpu初始化这个传输动作,传输动作本身是有DMA控制器来实行和完成

    模仿tomcat

    public class NIOServer {
    
        public static final String SEPARATOR = "\r\n";
        public static final int BACK_LOG = 1024;
    
        public static void main(String[] args) {
            ServerSocketChannel serverSocketChannel = null;
            SocketChannel channel = null;
            try{
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.bind(new InetSocketAddress(9000),BACK_LOG);
                ExecutorService executorService = Executors.newFixedThreadPool(50);
                System.out.println("服务器启动成功");
                for (;;){
                    channel = serverSocketChannel.accept();
                    System.out.println(channel.getRemoteAddress());
                    SocketChannel finalChannel = channel;
                    executorService.execute(()->{
                        try {
                            Thread.sleep(2000);
                            finalChannel.write(ByteBuffer.wrap(HttpRequest().getBytes(StandardCharsets.UTF_8)));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    });
                }
            }
            catch (Exception e){
                e.printStackTrace();
            }
            finally {
                try {
                    channel.close();
                    serverSocketChannel.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
        public static String HttpRequest() {
            String str = "<h1>tomcat</h1>";
            StringBuilder builder = new StringBuilder();
            builder.append("HTTP/1.1 200 OK").append(SEPARATOR);
            builder.append("Connection: Close").append(SEPARATOR);
            builder.append("Content-Type: text/html;charset=utf-8").append(SEPARATOR);
            builder.append("Content-Length: " + str.length()).append(SEPARATOR);
            builder.append(SEPARATOR);
            builder.append(str);
            return builder.toString();
        }
    }
    

    AIO

    Asynchronous I/O:异步非阻塞I/O模型

    image.png

    当任务完成了,会通知线程来处理

    1. 进程向操作系统请求数据
    2. 操作系统把外部数据加载到内核的缓冲区
    3. 操作系统把内核的缓冲区拷贝到进程的缓冲区
    4. 进程获得数据完成自己的功能

    第2、3步是挂起等待状态,就是同步IO,反之就是异步IO,AIO

    AIOServer

    import java.net.InetSocketAddress;
    import java.nio.channels.AsynchronousChannelGroup;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class AIOServer {
        private ExecutorService executorService;          // 线程池
        private AsynchronousChannelGroup threadGroup;      // 通道组
        public AsynchronousServerSocketChannel asynServerSocketChannel;  // 服务器通道
        public void start(Integer port){
            try {
                // 1.创建一个缓存池
                executorService = Executors.newCachedThreadPool();
                // 2.创建通道组
                threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
                // 3.创建服务器通道
                asynServerSocketChannel = AsynchronousServerSocketChannel.open(threadGroup);
                // 4.进行绑定
                asynServerSocketChannel.bind(new InetSocketAddress(port));
                System.out.println("server start , port : " + port);
                // 5.等待客户端请求
                asynServerSocketChannel.accept(this, new AIOServerHandler());
                // 一直阻塞 不让服务器停止,真实环境是在tomcat下运行,所以不需要这行代码
                Thread.sleep(Integer.MAX_VALUE);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) {
            AIOServer server = new AIOServer();
            server.start(9000);
        }
    }
    

    AIOServerHandler

    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.ExecutionException;
    public class AIOServerHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServer> {
        private final Integer BUFFER_SIZE = 1024;
        @Override
        public void completed(AsynchronousSocketChannel asynSocketChannel, AIOServer attachment) {
            // 保证多个客户端都可以阻塞
            attachment.asynServerSocketChannel.accept(attachment, this);
            read(asynSocketChannel);
        }
        //读取数据
        private void read(final AsynchronousSocketChannel asynSocketChannel) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
            asynSocketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer resultSize, ByteBuffer attachment) {
                    //进行读取之后,重置标识位
                    attachment.flip();
                    //获取读取的数据
                    String resultData = new String(attachment.array()).trim();
                    System.out.println("Server -> " + "收到客户端的数据信息为:" + resultData);
                    String response = resultData + " = " + resultData;
                    write(asynSocketChannel, response);
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    exc.printStackTrace();
                }
            });
        }
        // 写入数据
        private void write(AsynchronousSocketChannel asynSocketChannel, String response) {
            try {
                // 把数据写入到缓冲区中
                ByteBuffer buf = ByteBuffer.allocate(BUFFER_SIZE);
                buf.put(response.getBytes());
                buf.flip();
                // 在从缓冲区写入到通道中
                asynSocketChannel.write(buf).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void failed(Throwable exc, AIOServer attachment) {
            exc.printStackTrace();
        }
    }
    
    

    AIOClient

    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.util.Random;
    
    public class AIOClient implements Runnable{
    
        private static Integer PORT = 9000;
        private static String IP_ADDRESS = "127.0.0.1";
        private AsynchronousSocketChannel asynSocketChannel ;
        public AIOClient() throws Exception {
            asynSocketChannel = AsynchronousSocketChannel.open();  // 打开通道
        }
        public void connect(){
            asynSocketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT));  // 创建连接 和NIO一样
        }
        public void write(String request){
            try {
                asynSocketChannel.write(ByteBuffer.wrap(request.getBytes())).get();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                asynSocketChannel.read(byteBuffer).get();
                byteBuffer.flip();
                byte[] respByte = new byte[byteBuffer.remaining()];
                byteBuffer.get(respByte); // 将缓冲区的数据放入到 byte数组中
                System.out.println(new String(respByte,"utf-8").trim());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        @Override
        public void run() {
            while(true){
            }
        }
        public static void main(String[] args) throws Exception {
            for (int i = 0; i < 10; i++) {
                AIOClient myClient = new AIOClient();
                myClient.connect();
                new Thread(myClient, "myClient").start();
                String []operators = {"+","-","*","/"};
                Random random = new Random(System.currentTimeMillis());
                String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
                myClient.write(expression);
            }
        }
    }
    

    多路复用IO

    select

    image.png
    1. 使用copy_from_user从用户空间拷贝fd_set到内核空间
    2. 注册回调函数__pollwait
    3. 遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)
    4. 以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。
    5. __pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。
    6. poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。
    7. 如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。
    8. 把fd_set从内核空间拷贝到用户空间。
      1. 服务端接入者少
      2. 程序应具有兼容性
      3. select可移植性更好,几乎所有平台支持
      4. select对超时精确到微秒,poll是毫秒
    • 缺点
      1. 每次调用select函数时都要向内核传递监视对象信息,这意味着需要将用户态的fd集合列表从用户态拷贝到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态,非常低效
      2. 每次调用select时内核都需要遍历(即轮询)传递进来的全部fd,浪费CPU时间;
      3. select在单个进程中能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。

    poll

    • 与select非常相似,只是fd集合的方式不一样,poll使用pollfd,使poll支持的文件描述符集合限制大于select,select使用fd_set解决了问题3

    • 优点:

      1. poll使用pollfd结构而不是select的fd_set结构,没有最大连接数的限制,原因是它是基于链表来存储的
      2. poll() 不要求开发者计算最大文件描述符加一的大小;
      3. poll() 在应付大数目的文件描述符的时候速度更快,相比于select;
    • 缺点

      1. 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
      2. poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。
        与select同样,poll返回后,须要轮询pollfd来获取就绪的描述符。

    epoll

    epoll的高效就在于,当我们调用epoll_ctl往里塞入百万个句柄时,epoll_wait仍然可以飞快的返回,并有效的将发生事件的句柄给我们用户。这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。
      而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已,如何能不高效。
      epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,我们先看一下epoll和select和poll的调用接口上的不同,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。
      对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。
      对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。
      对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

    • 优点

      1. 无需编写循环语句来监视发生状态变化的文件描述符,即调用对应于select函数的epoll_wait函数时无需每次传递监视对象信息;
      2. 调用epoll_wait时就相当于以往调用select/poll,但是这时却不用传递socket句柄给内核,因为内核已经在epoll_ctl中拿到了要监控的句柄列表;
      3. 只向操作系统传递一次监视对象,监视范围或内容发生变化是只通知发生变化的事项;
        监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左 右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大;
      4. 效率提升,epoll不是轮询的方式,IO效率不会随FD数目增长而线性降低。这是由于在内核实现中epoll是根据每一个fd上面的callback函数实现的,而只有活跃可用的FD才会主动去调用callback函数,即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll;
      5. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递,即epoll使用mmap减少复制开销,epoll是通过内核于用户空间mmap同一块内存,避免了无畏的内存拷贝;
      6. 支持电平触发和边沿触发两种方式,理论上边缘触发的性能要更高一些,但是代码实现相当复杂;
    • 流程

      1. epoll_create()系统调用,返回一个句柄。
      2. epoll_ctl()系统调用,向epoll对象增、删、改,返回0成功,-1失败
      3. epoll_wait()系统调用,收集epoll监控中已经发送的事件

    综上,在选择select,poll,epoll时要根据具体的使用场合以及这三种方式的自身特点。

    1. select、poll的实现需要本身不断轮询全部的fd集合,直到设备就绪,期间可能要睡眠和唤醒屡次交替。而epoll其实也须要调用epoll_wait不断轮询就绪链表,期间也可能屡次睡眠和唤醒交替,可是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,可是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的时候只要判断一下就绪链表是否为空就好了,这节省了大量的CPU时间。这就是回调机制带来的性能提高。
    2. select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,而且要把current往设备等待队列中挂一次,而epoll只要一次拷贝,并且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并非设备等待队列,只是一个epoll内部定义的等待队列)。这也能节省很多的开销。
    3. 表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。
    4. select低效是因为每次它都需要轮询。但低效也是相对的,视情况而定,也可通过良好的设计改善
    类型 select poll epoll
    操作方式 遍历 遍历 回调
    底层实现 数组 链表 红黑树
    IO效率 每次都进行线性遍历,O(n) 每次都进行线性遍历,O(n) 事件通知方式,当fd就绪,系统注册的回调函数被调用,fd放入readyList中,O(1)
    最大连接数 1024(X86)或2048(X64) 无上限 无上限
    fd拷贝 每次调用select,都需要把fd集合从用户态拷贝到内核态 每次调用poll,都需要把fd集合从用户态拷贝到内核态 调用epoll_ctl时拷贝进内核并保存,之后epoll_wait不拷贝,epoll通过内核和用户空间共享一块内存来实现的。

    对比

    image.png

    目前操作系统对AIO的支持并没有特别完善

    相关文章

      网友评论

          本文标题:IO、BIO、NIO、AIO、多路复用

          本文链接:https://www.haomeiwen.com/subject/hvlslrtx.html