美文网首页
IO、BIO、NIO、AIO、多路复用

IO、BIO、NIO、AIO、多路复用

作者: 请叫我平爷 | 来源:发表于2022-02-17 16:32 被阅读0次

概念

同步、异步

  • 数据拷贝阶段是否需要完全由操作系统处理
  • 同步:发起一个调用,被调用者未处理完之前,不返回
  • 异步:发起一个调用,立刻得到被调用者的的回应表示收到请求,但是被调用者的并没有返回结果,此时我们可以处理其他请求,被调用者通常依靠事件、回调等机制来通知调用者返回结果

阻塞、非阻塞

  • 针对发起IO请求操作后,是否立刻由返回一个标志信息不让请求线程等待
  • 阻塞:发起一个请求,调用者一直等待结果返回,线程被挂起,无法从事其他任务
  • 非阻塞:发起一个请求,调用者不需要一直等待结果返回,可以去干别的事

同步阻塞、同步非阻塞、异步非阻塞

a给b发起一个请求:

  • 同步阻塞:a啥也不干等b返回,b只有当完成后返回给a
  • 同步非阻塞:a去干其他事情了,b完成后再返回给a
  • 异步非阻塞:b表示收到了,a去干其他事情了,b完成后再告诉a已经完成了
image.png

IO

只要具有输入输出类型的交互系统都可以认为是I/O模型

Input\Output
磁盘I/O模型、网络I/O模型、内存I/O模型、Direct I/O、数据库I/O

解决IO慢:

  1. SSD替代机械硬盘
  2. NIO替换BIO

BIO

Block IO:同步阻塞IO


image.png
  • Acceptor:独立的线程负责监听客户端的连接
  • 每个线程监听每个任务,只有当前任务完成后才会去执行下一个任务。
    任务开始执行到执行完毕,线程这段时间除了等待啥都没做

优:

实现简单,一个线程一个任务

缺:

一个线程只能做一件事
创建大量线程支持高并发,等待时间过长会影响系统性能,只能创建少量线程来支持。

  • 模拟5个client同时发送5次数据给server,server接到一个任务就分配一个线程,总共分配5个线程来接收
    • BIOServer
      public class BIOServer {
      public static void main(String[] args) {
          try {
              ServerSocket serverSocket = new ServerSocket(9000);
              new Thread(()->{
                  while (true){
                      try {
                          Socket socket = serverSocket.accept();
                          new Thread(()->{
                              try {
                                  int len;
                                  byte[] data = new byte[1024];
                                  System.out.println(socket);
                                  InputStream inputStream = socket.getInputStream();
                                  while ((len = inputStream.read(data)) != -1) {
                                      System.out.println(new String(data, 0, len));
                                  }
                              } catch (IOException e) {
                                  e.printStackTrace();
                              }
                          }).start();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
              }).start();
          }
          catch (Exception e){
              e.printStackTrace();
          }
      }
      }
      
    • BIOClient
      public class BIOClient {
      public static void main(String[] args) {
          for (int index=0 ;index< 5 ; index++){
              new Thread(()->{
                  try{
                      Socket socket = new Socket("127.0.0.1",9000);
                      int i=0;
                      while (i<5){
                          try {
                              Date now = new Date();
                              String str = now + "" + i;
                              System.out.println(str);
                              socket.getOutputStream().write(str.getBytes());
                              Thread.sleep(2000);
                              i++;
                          }catch (Exception e){
                              e.printStackTrace();
                          }
                      }
                  } catch (Exception e){
                      e.printStackTrace();
                  }
              }).start();
          }
      }
      }
      
image.png
  • 采用线程池和队列的方式
    -单机千级左右可用,面对十万、百万级不可行

BIOServer

public class BIOServer {
    public static void main(String[] args) {
        /**
         * int corePoolSize,
         * int maximumPoolSize,
         * long keepAliveTime,
         * TimeUnit unit,
         * BlockingQueue<Runnable> workQueue
         */
        ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(1,3,0,TimeUnit.SECONDS,new LinkedBlockingQueue<>());

        Thread acceptor = new Thread(()->{
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(9000);
                int i=0;
                while (true){
                    Socket socket = serverSocket.accept();
                    Thread thread = new Thread(()->{
                        int len;
                        byte[] bytes = new byte[1024];
                        try {
                            InputStream inputStream = socket.getInputStream();
                            while ((len=inputStream.read(bytes))!=-1){
                                System.out.println(new String(bytes, 0, len));
                            }
                            Thread.sleep(2000L);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
                    threadPoolExecutor.execute( thread);
                    i++;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        acceptor.start();
    }
}

BIOClient

public class BIOClient {

    public static void main(String[] args) {
        for (int i=0;i<100;i++){
            int finalI = i;
            new Thread(()->{
                try {
                    Socket socket = new Socket("127.0.0.1",9000);

                    for (int j=0;j<3;j++) {
                        String str = new Date() + "第" + finalI + "个链接发送第" +j+ "个消息";
                        socket.getOutputStream().write(str.getBytes());
                        Thread.sleep(2000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();

        }
    }

}

NIO

New I/O:同时支持阻塞与非阻塞,使用多路复用器机制
No-Blocking I/O:同步非阻塞IO

image.png
  • thread不断轮询每个任务的状态,看是否有任务完成,然后进行下一步操作
  • 比BIO抽象出了新的通道(Channel)作为输入输出的通道,提供了缓存(Buffer)支持
    • 读操作时,使用Buffer分配空间,将数据从Channel读入Buffer中
    • 写操作,将数据写入Buffer,将数据Buffer写入Channel中

Channel:通道,跟IO中Stream(流)差不多一个等级

  • Channel与Stream区别
    1. Stream是单向的,InputStream、OutputStream
    2. Channel是双向的,既可以读也可以写。FileChannel(文件IO)、DatagramChannel(UDP)、SocketChannel(TCP)、ServerSocketChannel(sever和Client)
    3. Channel可以非阻塞的读写IO,Stream只能阻塞的读写
    4. Channel必须配合Buffer使用,无法绕开Buffer直接向Channel读写数据
  • SocketChannel:客户端发起TCP的Channel
  • ServerSocketChannel:服务端监听新的TCP连接,对每个新的TCP连接都会创建一个SocketChannel
  • DatagramChannel:通过UDP读写数据
  • FileChannel:从文件读写数据,数据可以之间从一个Channel传输到另一个Channel,不能跟Selector一起使用(与Selector一起使用的Channel必须为非阻塞)
    1. transferFrom
      RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
      FileChannel      fromChannel = fromFile.getChannel();
      RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
      FileChannel      toChannel = toFile.getChannel();
      long position = 0;
      long count = fromChannel.size();
      toChannel.transferFrom(position, count, fromChannel);
      
    2. transferTo
      RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
      FileChannel      fromChannel = fromFile.getChannel();
      RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
      FileChannel      toChannel = toFile.getChannel();
      long position = 0;
      long count = fromChannel.size();
      fromChannel.transferTo(position, count, toChannel);
      
    3. 文件NIO读
      public static void main(String[] args) {
      String fileName = "/Users/mi/java/learn/iOSApnsPushServer/src/main/webapp/index.jsp";
      try {
          RandomAccessFile aFile = new RandomAccessFile(fileName, "rw");
          FileChannel inChannel = aFile.getChannel();
          ByteBuffer buf = ByteBuffer.allocate(1024);
          int bytesRead = inChannel.read(buf);
          while (bytesRead != -1) {
              System.out.println("Read " + bytesRead);
              buf.flip();
              while(buf.hasRemaining()){
                  System.out.print((char) buf.get());
              }
              buf.clear();
              bytesRead = inChannel.read(buf);
          }
          aFile.close();
      } catch (FileNotFoundException e) {
          e.printStackTrace();
      } catch (IOException e) {
          e.printStackTrace();
      }
      }
      
    4. Scatter、Gather
      Scatter:分散,Channel将读取的数据写入多个buffer中
      Gather:聚集,多个Buffer写入一个Channel
      //scatter分散
      ByteBuffer header = ByteBuffer.allocate(128);
      ByteBuffer body   = ByteBuffer.allocate(1024);
      ByteBuffer[] bufferArray = { header, body };
      channel.read(bufferArray);
      
      
      //gather聚集
      ByteBuffer header = ByteBuffer.allocate(128);
      ByteBuffer body   = ByteBuffer.allocate(1024);
      //write data into buffers
      ByteBuffer[] bufferArray = { header, body };
      channel.write(bufferArray);
      

Buffer:ByteBuffer(byte)、CharBuffer(char)、DoubleBuffer(double)、FloatBuffer(float)、IntBuffer(int)、LongBuffer(long)、ShortBuffer(short)、MappedByteBuffer、HeapByteBuffer、DirectByteBuffer、HeapByteBuffer在JVM的Heap(堆)上分配

  1. 本质上是一块可以读写的内存,这块内存被包装成NIO Buffer对象,并提供一组方法,用来方便访问该内存
  2. capacity:Buffer的固定大小就叫capacity,只能写byte、long、char、short、int、double、float类型,Buffer满了就需要清空才能继续写
  3. position:
    • 写的时候表示当前位置,初始的position为0,最大为capacity-1
    • 读的时候表示从特定位置读,从写切换为读,会重置为0
  4. limit:
    • 写模式下,最多能往Buffer中写多少数据,limit=capacity
    • 读模式,最多能读到多少数据,写切换为读,limit初始为position,能读到之前写入的所有数据
  5. 初始化
```java
//初始化Buffer
ByteBuffer buf = ByteBuffer.allocate(1024);
CharBuffer buffer = CharBuffer.allocate(48);
//写入到Buffer
int bytesRead = inChannel.read(buf);
//写入到Buffer
buffer.put('a');
//写切换到读,limit=position,position=0,读取所有写入的数据
buf.flip();
//从Buffer读数据
buf.get()
//从Buffer读数据
int bytesWrite =  inChannel.write(buf);
//重读所有数据
Buffer.rewind();
 //清空Buffer,position=0,limit=capacity,还能读到之前的记录
Buffer.clear()
//清空Buffer,彻底清空
Buffer.compact()
//标记Buffer的position
buffer.mark();
//获取刚刚标记的position
buffer.reset();
```
  1. euqals:只比较Buffer中剩余的元素(position到limit之间的元素)
    1. 相同的类型
    2. 剩下的个数相同
    3. 剩下的byte、char相同
  2. compareTo:
    1. 第一个不相等的元素小于另一个Buffer中对应的元素
    2. 所有元素都相等,但是其中一个Buffer中的元素先耗尽

Selector

  1. 用单个线程处理多个Channel
```
//创建Selector
Selector selector = Selector.open();
//注册Channel,channel必须为非阻塞状态
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,SelectionKey.OP_READ);
```
- Connet:channel成功连接
- Accept:serverSocketChannel准备接收
- Read:channel有数据可读
- Write:channel等待写入数据

零拷贝

传统copy


image.png

零拷贝


image.png
  • DMA:Direct Memory Access,直接存储器访问,能帮助CPU做大量的工作
    从地址空间复制到另外一个地址空间,cpu初始化这个传输动作,传输动作本身是有DMA控制器来实行和完成

模仿tomcat

public class NIOServer {

    public static final String SEPARATOR = "\r\n";
    public static final int BACK_LOG = 1024;

    public static void main(String[] args) {
        ServerSocketChannel serverSocketChannel = null;
        SocketChannel channel = null;
        try{
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(9000),BACK_LOG);
            ExecutorService executorService = Executors.newFixedThreadPool(50);
            System.out.println("服务器启动成功");
            for (;;){
                channel = serverSocketChannel.accept();
                System.out.println(channel.getRemoteAddress());
                SocketChannel finalChannel = channel;
                executorService.execute(()->{
                    try {
                        Thread.sleep(2000);
                        finalChannel.write(ByteBuffer.wrap(HttpRequest().getBytes(StandardCharsets.UTF_8)));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
        catch (Exception e){
            e.printStackTrace();
        }
        finally {
            try {
                channel.close();
                serverSocketChannel.close();
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static String HttpRequest() {
        String str = "<h1>tomcat</h1>";
        StringBuilder builder = new StringBuilder();
        builder.append("HTTP/1.1 200 OK").append(SEPARATOR);
        builder.append("Connection: Close").append(SEPARATOR);
        builder.append("Content-Type: text/html;charset=utf-8").append(SEPARATOR);
        builder.append("Content-Length: " + str.length()).append(SEPARATOR);
        builder.append(SEPARATOR);
        builder.append(str);
        return builder.toString();
    }
}

AIO

Asynchronous I/O:异步非阻塞I/O模型

image.png

当任务完成了,会通知线程来处理

  1. 进程向操作系统请求数据
  2. 操作系统把外部数据加载到内核的缓冲区
  3. 操作系统把内核的缓冲区拷贝到进程的缓冲区
  4. 进程获得数据完成自己的功能

第2、3步是挂起等待状态,就是同步IO,反之就是异步IO,AIO

AIOServer

import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AIOServer {
    private ExecutorService executorService;          // 线程池
    private AsynchronousChannelGroup threadGroup;      // 通道组
    public AsynchronousServerSocketChannel asynServerSocketChannel;  // 服务器通道
    public void start(Integer port){
        try {
            // 1.创建一个缓存池
            executorService = Executors.newCachedThreadPool();
            // 2.创建通道组
            threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
            // 3.创建服务器通道
            asynServerSocketChannel = AsynchronousServerSocketChannel.open(threadGroup);
            // 4.进行绑定
            asynServerSocketChannel.bind(new InetSocketAddress(port));
            System.out.println("server start , port : " + port);
            // 5.等待客户端请求
            asynServerSocketChannel.accept(this, new AIOServerHandler());
            // 一直阻塞 不让服务器停止,真实环境是在tomcat下运行,所以不需要这行代码
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        AIOServer server = new AIOServer();
        server.start(9000);
    }
}

AIOServerHandler

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
public class AIOServerHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServer> {
    private final Integer BUFFER_SIZE = 1024;
    @Override
    public void completed(AsynchronousSocketChannel asynSocketChannel, AIOServer attachment) {
        // 保证多个客户端都可以阻塞
        attachment.asynServerSocketChannel.accept(attachment, this);
        read(asynSocketChannel);
    }
    //读取数据
    private void read(final AsynchronousSocketChannel asynSocketChannel) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
        asynSocketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer resultSize, ByteBuffer attachment) {
                //进行读取之后,重置标识位
                attachment.flip();
                //获取读取的数据
                String resultData = new String(attachment.array()).trim();
                System.out.println("Server -> " + "收到客户端的数据信息为:" + resultData);
                String response = resultData + " = " + resultData;
                write(asynSocketChannel, response);
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }
        });
    }
    // 写入数据
    private void write(AsynchronousSocketChannel asynSocketChannel, String response) {
        try {
            // 把数据写入到缓冲区中
            ByteBuffer buf = ByteBuffer.allocate(BUFFER_SIZE);
            buf.put(response.getBytes());
            buf.flip();
            // 在从缓冲区写入到通道中
            asynSocketChannel.write(buf).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void failed(Throwable exc, AIOServer attachment) {
        exc.printStackTrace();
    }
}

AIOClient

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.Random;

public class AIOClient implements Runnable{

    private static Integer PORT = 9000;
    private static String IP_ADDRESS = "127.0.0.1";
    private AsynchronousSocketChannel asynSocketChannel ;
    public AIOClient() throws Exception {
        asynSocketChannel = AsynchronousSocketChannel.open();  // 打开通道
    }
    public void connect(){
        asynSocketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT));  // 创建连接 和NIO一样
    }
    public void write(String request){
        try {
            asynSocketChannel.write(ByteBuffer.wrap(request.getBytes())).get();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            asynSocketChannel.read(byteBuffer).get();
            byteBuffer.flip();
            byte[] respByte = new byte[byteBuffer.remaining()];
            byteBuffer.get(respByte); // 将缓冲区的数据放入到 byte数组中
            System.out.println(new String(respByte,"utf-8").trim());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        while(true){
        }
    }
    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            AIOClient myClient = new AIOClient();
            myClient.connect();
            new Thread(myClient, "myClient").start();
            String []operators = {"+","-","*","/"};
            Random random = new Random(System.currentTimeMillis());
            String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
            myClient.write(expression);
        }
    }
}

多路复用IO

select

image.png
  1. 使用copy_from_user从用户空间拷贝fd_set到内核空间
  2. 注册回调函数__pollwait
  3. 遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)
  4. 以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。
  5. __pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。
  6. poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。
  7. 如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。
  8. 把fd_set从内核空间拷贝到用户空间。
    1. 服务端接入者少
    2. 程序应具有兼容性
    3. select可移植性更好,几乎所有平台支持
    4. select对超时精确到微秒,poll是毫秒
  • 缺点
    1. 每次调用select函数时都要向内核传递监视对象信息,这意味着需要将用户态的fd集合列表从用户态拷贝到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态,非常低效
    2. 每次调用select时内核都需要遍历(即轮询)传递进来的全部fd,浪费CPU时间;
    3. select在单个进程中能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。

poll

  • 与select非常相似,只是fd集合的方式不一样,poll使用pollfd,使poll支持的文件描述符集合限制大于select,select使用fd_set解决了问题3

  • 优点:

    1. poll使用pollfd结构而不是select的fd_set结构,没有最大连接数的限制,原因是它是基于链表来存储的
    2. poll() 不要求开发者计算最大文件描述符加一的大小;
    3. poll() 在应付大数目的文件描述符的时候速度更快,相比于select;
  • 缺点

    1. 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
    2. poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。
      与select同样,poll返回后,须要轮询pollfd来获取就绪的描述符。

epoll

epoll的高效就在于,当我们调用epoll_ctl往里塞入百万个句柄时,epoll_wait仍然可以飞快的返回,并有效的将发生事件的句柄给我们用户。这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。
  而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已,如何能不高效。
  epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,我们先看一下epoll和select和poll的调用接口上的不同,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。
  对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。
  对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。
  对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

  • 优点

    1. 无需编写循环语句来监视发生状态变化的文件描述符,即调用对应于select函数的epoll_wait函数时无需每次传递监视对象信息;
    2. 调用epoll_wait时就相当于以往调用select/poll,但是这时却不用传递socket句柄给内核,因为内核已经在epoll_ctl中拿到了要监控的句柄列表;
    3. 只向操作系统传递一次监视对象,监视范围或内容发生变化是只通知发生变化的事项;
      监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左 右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大;
    4. 效率提升,epoll不是轮询的方式,IO效率不会随FD数目增长而线性降低。这是由于在内核实现中epoll是根据每一个fd上面的callback函数实现的,而只有活跃可用的FD才会主动去调用callback函数,即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll;
    5. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递,即epoll使用mmap减少复制开销,epoll是通过内核于用户空间mmap同一块内存,避免了无畏的内存拷贝;
    6. 支持电平触发和边沿触发两种方式,理论上边缘触发的性能要更高一些,但是代码实现相当复杂;
  • 流程

    1. epoll_create()系统调用,返回一个句柄。
    2. epoll_ctl()系统调用,向epoll对象增、删、改,返回0成功,-1失败
    3. epoll_wait()系统调用,收集epoll监控中已经发送的事件

综上,在选择select,poll,epoll时要根据具体的使用场合以及这三种方式的自身特点。

  1. select、poll的实现需要本身不断轮询全部的fd集合,直到设备就绪,期间可能要睡眠和唤醒屡次交替。而epoll其实也须要调用epoll_wait不断轮询就绪链表,期间也可能屡次睡眠和唤醒交替,可是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,可是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的时候只要判断一下就绪链表是否为空就好了,这节省了大量的CPU时间。这就是回调机制带来的性能提高。
  2. select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,而且要把current往设备等待队列中挂一次,而epoll只要一次拷贝,并且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并非设备等待队列,只是一个epoll内部定义的等待队列)。这也能节省很多的开销。
  3. 表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。
  4. select低效是因为每次它都需要轮询。但低效也是相对的,视情况而定,也可通过良好的设计改善
类型 select poll epoll
操作方式 遍历 遍历 回调
底层实现 数组 链表 红黑树
IO效率 每次都进行线性遍历,O(n) 每次都进行线性遍历,O(n) 事件通知方式,当fd就绪,系统注册的回调函数被调用,fd放入readyList中,O(1)
最大连接数 1024(X86)或2048(X64) 无上限 无上限
fd拷贝 每次调用select,都需要把fd集合从用户态拷贝到内核态 每次调用poll,都需要把fd集合从用户态拷贝到内核态 调用epoll_ctl时拷贝进内核并保存,之后epoll_wait不拷贝,epoll通过内核和用户空间共享一块内存来实现的。

对比

image.png

目前操作系统对AIO的支持并没有特别完善

相关文章

网友评论

      本文标题:IO、BIO、NIO、AIO、多路复用

      本文链接:https://www.haomeiwen.com/subject/hvlslrtx.html