美文网首页
0、常见的几种IO模型

0、常见的几种IO模型

作者: chanyi | 来源:发表于2021-08-17 19:21 被阅读0次

1、操作系统内核

2、BIO模型

BIO(blocking I/O)是阻塞IO模型,每个客户端连接上之后都会启用一个新的线程来处理。

阻塞是指线程内部方法的阻塞,异步是指线程间的异步执行

(1)、Java实现单线程的BIO模型
# 单线程阻塞情况---server端
public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(8899));
        byte[] bytes =new byte[1024];
        while (true){
            System.out.println("等待连接...");
            Socket accept = serverSocket.accept();//阻塞方法
            System.out.println("连接成功");
            int read = accept.getInputStream().read(bytes);
            System.out.println("接受到消息-read:"+read);
            System.out.println("接受到消息-bytes:"+new String(bytes));
        }
    }
# 单线程阻塞情况---client端
public static void main(String[] args) throws IOException {
        Socket socket = new Socket();
        socket.connect(new InetSocketAddress("127.0.0.1",8899));
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入内容...");
        while(true){
            String text = scanner.next();
            System.out.println("客户端发送消息:"+text);
            socket.getOutputStream().write(text.getBytes(StandardCharsets.UTF_8));;
        }
    }

如果只有单线程,则启动server之后,会阻塞在accept方法这里,等待客户端的连接,客户端连接成功之后,会阻塞在read方法这里等待接受消息,客户端成功发送消息之后,服务端接受到消息后打印出消息,然后循环继续阻塞在accept方法这里。如果当前客户端继续发送消息给服务端,服务端也是无法接受到消息,因为阻塞在了accept方法这里。所以单线程无法实现BIO

(2)、Java实现多线程的BIO模型
# 多线程阻塞情况---server端
public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(8899));
        byte[] bytes =new byte[1024];
        while (true){
            System.out.println("等待连接...");
            Socket accept = serverSocket.accept();//阻塞方法
            System.out.println("连接成功");
            new Thread(()->{
                try {
                    while (true){
                        int read = accept.getInputStream().read(bytes);
                        System.out.println("线程:"+Thread.currentThread().getName()+"--接受到消息-read:"+read);
                        System.out.println("线程:"+Thread.currentThread().getName()+"--接受到消息-bytes:"+
                                new String(bytes,0,read));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
# 多线程阻塞情况---client端
public static void main(String[] args) throws IOException {
        Socket socket = new Socket();
        socket.connect(new InetSocketAddress("127.0.0.1",8899));
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入内容...");
        while(true){
            String text = scanner.next();
            System.out.println("客户端发送消息:"+text);
            socket.getOutputStream().write(text.getBytes(StandardCharsets.UTF_8));;
        }
    }

开辟多线程,accept阻塞在主线程,其他消息的read阻塞在各自的子线程中。


BIO模型
(3)、缺点

BIO的缺点就是不适合大规模的客户端访问,每一个客户端需要开辟一个新的线程去阻塞,浪费资源。

3、NIO模型--Select

NIONon-blocking IO,在BIO模型的基础上改进了开辟多个线程专门复杂读写的缺点
NIO模型有多种实现方式,select,poll,epoll

(1)、select的模型

select模型的结构图


Select结构图
(2)、select的Java实现

select模型的单线程实现

# select模型的单线程实现
# 服务端
public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9988));
        serverSocketChannel.configureBlocking(false);//设置为非阻塞的
        System.out.println("开始监听客户端...");
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            selector.select();//阻塞方法,进行轮询,如果没有连接请求或者需要读取的请求,则会一直阻塞在这里
            Set<SelectionKey> keys = selector.selectedKeys();//轮询完成之后得到所有有标记的请求
            System.out.println("轮询得到的key集合长度为:" + keys.size());
            for (SelectionKey selectionKey : keys) {
                //处理完成之后,将对应的有标记key的socket从集合中删除,下次再轮询获取
                keys.remove(selectionKey);
                //对不同的key做不同的处理,key的可能情况有:accept read write connect
                handle(selectionKey);

            }
        }
    }

    static void handle(SelectionKey selectionKey) {
        //如果key是accept,表示是一个连接的请求
        if (selectionKey.isAcceptable()) {
            System.out.println("当前socket是连接请求...");
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            SocketChannel socketChannel = null;
            try {
                socketChannel = serverSocketChannel.accept();
                System.out.println("接受连接");
                //设置为非阻塞
                socketChannel.configureBlocking(false);
                //设置当前的请求为read,因为已经连接上,所以下次肯定是要发送消息,所以服务端需要read发过来的信息
                socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }

        }

        //如果key是read,表示是一个写的请求,需要多写的内容进行读取
        if (selectionKey.isReadable()) {
            System.out.println("处理线程:" + Thread.currentThread().getName() + "--当前请求为read请求...");
            SocketChannel socketChannel = null;
            //拿到对应的socket
            socketChannel = (SocketChannel) selectionKey.channel();
            //读取socket中发送来的数据
            ByteBuffer buffer = ByteBuffer.allocate(512);
            buffer.clear();
            int len = 0;
            try {
                len = socketChannel.read(buffer);
                //打印出socket中发送过来的信息
                if (len != -1) {
                    System.out.println("接受到的消息:" + new String(buffer.array(), 0, len));
                }
                //写消息给客户端,说明已经接受到消息
                ByteBuffer byteBufferWrite = ByteBuffer.wrap("你好客户端,请求已经收到!".getBytes(StandardCharsets.UTF_8));
                socketChannel.write(byteBufferWrite);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
            }
        }
    }

# 客户端
    public static void main(String[] args) throws IOException {
        Socket socket = new Socket();
        socket.connect(new InetSocketAddress("127.0.0.1",9988));
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入内容...");
        while(true){
            String text = scanner.next();
            System.out.println("客户端发送消息:"+text);
            socket.getOutputStream().write(text.getBytes(StandardCharsets.UTF_8));;
        }
    }
(3)、select的缺点
  • 1、最大并发数的限制
    因为一个进程所能打开的fd(文件描述符)是有限制的,可以通过FD_SETSIZE来设置,默认是1024或者2048,因此在linux系统中select模型的最大并发就被限制到10242048了(windows系统并没有限制)(poll模型解决了此问题)
  • 2、效率低
    每次进行select调用都会进行线性轮训全部的fd集合,这样就会导致效率线性下降。fd集合越大,效率越低。(poll模型任然采用轮训的方式,问题依然存在)

select模型的时间复杂度为O(n)poll模型的复杂度O(n)epoll模型的复杂度为O(1)

  • 3、内核和用户空间内存copy问题
    select在解决将fd消息传递给用户空间时,采用了内存copy的方式,这样处理效率比较低。(poll同样也是使用内存copy的方式,问题依然存在)

3、IO模型--poll

poll在select的基础上改进了fd的存储集合,由原来的上限被限制的集合改为没有上限的列表。但是轮询和内存copy的问题并没有实际解决。

4、IO模型--epoll

(1)、epoll模型的基本信息
(2)、epoll模型的特点
  • 1、所支持的文件描述符的上限是整个系统最大可以打开的文件数(select模型是进程最大打开的文件数)
    1G内存的机器上,最大可以打开10w左右
  • 2、每个fd文件描述符上都有callback函数,只有活跃的socket才会主动的调用callback函数,其他的idle状态socket不会调用。采用的是通知机制
    使用红黑树来存储fd文件,使用链表来存储事件
  • 3、通过内核与用户空间的mmap共用同一块内存,减少内存copy的消耗
(3)、epoll模型基本函数

3个API

#建立一个红黑树,用来存储fd,size表示红黑树的大小,最新版已经去掉,按照资源自动分配
int epoll_create(int size);

#对fd的红黑树进行增删改的操作。
#op参数表示动作类型,有三种:EPOLL_CTL_ADD,EPOLL_CTL_MOD,EPOLL_CTL_DEL
#event告诉内核需要监听的事件类型
#-EPOLLIN:表示对应的文件描述符可读
#-EPOLLOUT:表示对应的文件描述符可写
#-EPOLLPRI:表示对应的文件描述符有紧急数据可读(外带数据)
#-EPOLLERR:表示对应的文件描述符发生错误
#-EPOLLHUP:表示对应的文件描述符挂断
#-EPOLLET:表示将EPOLL设置为边缘触发(只要状态变化了才会通知获取,如果没有读完且没有新事件,则不会通知,是一种高速处理模式,相对是的电平出发)
int epoll_ctl(int epfd,int op,int fd,struct epoll_event event);

#等待时间的产生,类似于select模型的select()
#events表示内存得到的事件的集合(全部是就绪的event)
#maxEvents表示每次能处理的最大事件数
#timeout表示超时时间,设置为-1表示阻塞,0则会立即返回
#返回的int表示事件个数
int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout);
(4)、nettyepoll的封装

netty使用两个groupbossgroupworkgroup),bossgroup只负责客户端的连接,workgroup负责消息的处理,workgroup在所有的通道的监听器队列最后面加上自己的处理器,也是通过回调的方式,如果有消息过来,则会自动执行回调方法里的逻辑。
netty的使用

#服务端
public class NettyServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        channel.pipeline().addLast(new MyChannelInboundHandler());
                    }
                });

        try {
            ChannelFuture future = serverBootstrap.bind(9999);
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

class MyChannelInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buffer = (ByteBuf)msg;
        System.out.println("接受到消息:"+buffer.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(msg);
//        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("发生异常:"+cause.getCause().toString());
        ctx.close();
    }
}

#客户端
public class NettyClient {
    public static void main(String[] args) {
        new NettyClient().start();
    }
    private void start(){
        EventLoopGroup clientWorks = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(clientWorks)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        System.out.println("初始化通道...");
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });

        try {
            System.out.println("连接成功");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9999).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class ClientHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("通道已经建立...");
        final ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好".getBytes()));
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                System.out.println("消息发送成功");
            }
        });
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //读取服务器发回的消息
        try {
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("服务端发来消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }
}

4、AIO模型

AIO(Asynchronous IO)是一种异步非阻塞的模型,程序实现一个回调函数,交给操作系统,如果有请求连接上来,操作系统会自动执行这个回调函数。

(1)、模型图

执行流程图:


image.png
(2)、Java实现

单线程实现AIO

#服务器端
public static void main(String[] args) throws IOException {
        final AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel
                .open()
                .bind(new InetSocketAddress(7788));
        System.out.println("服务器已经启动...");
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Object attachment) {
                serverSocketChannel.accept(null,this);
                try {
                    System.out.println("客户端:"+client.getRemoteAddress()+"--已经连接");
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    System.out.println("等待接受消息...");
                    client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer attachment) {
                            System.out.println("客户端发来消息--");
                            attachment.flip();
                            //打印出从客户端读取的消息
                            try {
                                System.out.println("客户端"+client.getRemoteAddress()+"发来消息:"
                                        +new String(attachment.array(),0,result));
                            }catch (Exception e){
                                System.out.println("获取客户端地址错误");
                            }
                            client.write(ByteBuffer.wrap("消息已收到".getBytes(StandardCharsets.UTF_8)));
                            //每次读取都会将事件从queue中取出来,所以这里需要重新放进去,以便继续通信,不然只能收到一次客户端的消息
                            client.read(attachment,attachment,this);
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            System.out.println("处理消息失败");
                        }
                    });
                } catch (IOException e) {
                    System.out.println("出现异常:"+e.getMessage());
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("连接失败");
            }
        });
        //让程序执行下去。不要终止
        while (true){
            try {
                CountDownLatch latch = new CountDownLatch(1);
                latch.await();
            }catch (Exception e){
                System.out.println("await exception");
            }
        }
    }

#客户端
public static void main(String[] args) throws IOException {
        Socket socket = new Socket();
        socket.connect(new InetSocketAddress("127.0.0.1",7788));
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入内容...");
        while(true){
            String text = scanner.next();
            System.out.println("客户端发送消息:"+text);
            socket.getOutputStream().write(text.getBytes(StandardCharsets.UTF_8));;
        }
    }

多线程实现AIO

#服务端代码
public static void main(String[] args) throws IOException, InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        AsynchronousChannelGroup asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService,1);

        final AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel
                .open(asynchronousChannelGroup)
                .bind(new InetSocketAddress(9090));
        System.out.println("等待客户端连接...");
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Object attachment) {
                serverSocketChannel.accept(null,this);

                ByteBuffer buffer = ByteBuffer.allocate(1024);
                client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer attachment) {
                        attachment.flip();
                        System.out.println("接受到客户端的消息:"+new String(attachment.array(),0,result));
                        client.write(ByteBuffer.wrap("消息已接收到".getBytes(StandardCharsets.UTF_8)));
                        client.read(attachment,attachment,this);
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        System.out.println("接受消息处理失败");
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("连接失败");
            }
        });

        CountDownLatch latch = new CountDownLatch(1);
        latch.await();
    }


参考资料
1、(填坑系列) 用aio写server与client进行通信的坑
2、

相关文章

  • 0、常见的几种IO模型

    1、操作系统内核 2、BIO模型 BIO(blocking I/O)是阻塞IO模型,每个客户端连接上之后都会启用一...

  • 【转】IO模型及select、poll、epoll和kqueue

    【转】IO模型及select、poll、epoll和kqueue的区别 (一)首先,介绍几种常见的I/O模型及其区...

  • 网络IO模型

    网络IO的模型大致包括下面几种 同步模型(synchronous IO)阻塞IO(bloking IO)非阻塞IO...

  • 什么是BIO,NIO与AIO

    计算机中常见的IO模型主要分为几种BIO,NIO和AIO。 操作系统的IO操作包括读写文件,Socket操作等。C...

  • 几种IO模型

    周日午后,刚刚放下手里的电话,正在给刚刚的面试者写评价。刚刚写到『对Linux的基本IO模型理解不深』这句的时候,...

  • 2.五种IO模型

    0.IO介绍1.阻塞IO模型2.非阻塞IO模型3.IO多路复用模型4.信号驱动IO模型5.异步IO模型6.五种IO...

  • 深入浅出网络IO

    IO作为网络通信中最重要的部分,面试中经常会问到;本文将从计算机组成基础讲起,围绕几种常见的IO模型,介绍其原理和...

  • 常见IO模型

    阻塞和非阻塞: 同步和异步: 其实异步还可以分为两种:带通知的和不带通知的。前面说的那种属于带通知的。有些传菜员干...

  • 高性能IO模型(摘选)

    高性能IO模型浅析 服务器端编程经常需要构造高性能的IO模型,常见的IO模型有四种: (1)同步阻塞IO(Bloc...

  • IO多路复用机制详解

    高性能IO模型浅析 服务器端编程经常需要构造高性能的IO模型,常见的IO模型有四种: (1)同步阻塞IO(Bloc...

网友评论

      本文标题:0、常见的几种IO模型

      本文链接:https://www.haomeiwen.com/subject/obcjbltx.html