美文网首页
BIO、NIO、AIO笔记

BIO、NIO、AIO笔记

作者: EmonH | 来源:发表于2020-03-26 06:49 被阅读0次

    同步:任务一的完成需要依赖任务二,只有等待任务二完成,任务一才算完成。
    异步:任务一会通知任务二完成什么任务,但是两个任务是互不等待,都会进行。任务二完成之后会告诉任务一。
    阻塞:CPU停下来等待一个慢的操作完成才继续后面的工作。
    非阻塞:CPU遇到这个慢的操作会先去执行其他的命令,等慢的动作完成之后在处理慢操作对应的命令。
    接下来我们说说同步阻塞,同步非阻塞和异步非阻塞
    之前看过一位大牛的博客,他举了个例子来解释三个概念,我觉得收益匪浅。小时候妈妈让去烧水,然后自己拿着水壶去了,在烧水的过程中一直等水烧开,这个就是同步阻塞。后来发现烧水需要很长时间,便在烧水的过程中去干别的事,时不时的来看看水是不是烧开了,这个模型就是同步非阻塞。再后来水壶有了烧开水之后发声的功能,那么烧水的时候,我可以不用时不时的去查看,只要听到声音了就知道水烧开了,这个模型就是异步非阻塞。接下来我们用代码看看下三种模型的具体实现。

    BIO:同步阻塞

    数据的读取写入必须阻塞在一个线程内等待其完成,在java中这样的模型简单容易理解,每次来一个请求,服务器都会开启一个线程去处理,当在连接数小于1000时,可以让每一个连接专注于自己的 I/O,不用过多考虑系统的过载、限流等问题。在搭配线程池的使用,可以很好的解决服务端连接异常的问题。但是当连接数达到万级别之后,线程之间切换带来请求处理慢的问题逐渐体现。
    服务端

    public class BioServer {
        final static int PROT = 7788;
        public static void main(String[] args) throws IOException {
            ServerSocket server = null;
            server = new ServerSocket(PROT);
            System.out.println(" server start .. ");
            while(true) {
                //进行阻塞
                socket = server.accept();
                //新建一个线程执行客户端的任务
                HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
                executorPool.execute(new ServerHandler(socket));
            }
        }
    }
    class HandlerExecutorPool {
        private ExecutorService executor;
        public HandlerExecutorPool(int maxPoolSize, int queueSize){
            this.executor = new ThreadPoolExecutor(
                    Runtime.getRuntime().availableProcessors(),
                    maxPoolSize,
                    120L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(queueSize));
        }
    
        public void execute(Runnable task){
            this.executor.execute(task);
        }
    }
    class ServerHandler implements Runnable {
        private Socket socket;
    
        public ServerHandler(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            BufferedReader in = null;
            PrintWriter out = null;
            try {
                in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
                out = new PrintWriter(this.socket.getOutputStream(), true);
                String body = null;
                while (true) {
                    body = in.readLine();
                    if (body == null) break;
                    System.out.println("Server :" + body);
                    out.println("服务器端回送响的应数据.");
                }
            }catch (Exception e){
    
            }
        }
    }
    

    客户端

    public class Client implements Runnable {
        final static String ADDRESS = "127.0.0.1";
        final static int PORT = 8088;
        public static void main(String[] args) throws IOException {
            new Thread(new Client()).start();
        }
    
        @Override
        public void run() {
            Socket socket = null;
            BufferedReader in = null;
            PrintWriter out = null;
            try {
                socket = new Socket(ADDRESS, PORT);
                in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                out = new PrintWriter(socket.getOutputStream(), true);
                //向服务器端发送数据
                while (true) {
                    out.println("接收到客户端的请求数据...");
                    out.println("接收到客户端的请求数据1111...");
                    String response = in.readLine();
                    System.out.println("Client: " + response);
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    NIO:同步非阻塞

    jdk1.7以后引入了NIO的变成模式。首先有三个概念需要了解。

    buffer缓存区:NIO是将所有数据都用到缓冲区数组中,

    InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7788);//创建连接的地址
            SocketChannel sc = null;//声明连接通道
            ByteBuffer buf = ByteBuffer.allocate(1024);//建立缓冲区
                sc = SocketChannel.open();//打开通道
                sc.connect(address);//进行连接
                while(true){
                    //定义一个字节数组,然后使用系统录入功能:
                    byte[] bytes = new byte[1024];
                    System.in.read(bytes);
                    buf.put(bytes);//把数据放到缓冲区中
                    buf.flip();//对缓冲区进行复位
                    sc.write(buf);//写出数据
                    buf.clear();//清空缓冲区数据
                }
                ···
    

    Channel 通道

    NIO支持网络数据从Channel中读取,Channel是区别与传统的输入输出流的,传统输入输出流只支持单向数据流动,而Channel同时支持读取和写入,有多种状态位可以被识别。

    Selector 多路复用选择器

    NIO模型中一个连接就是一个Channel,所有的Channel都注册在Selector 中,Selector多路复用器选择器轮询查看Channel的状态位,当Channel发生读写操作时。便处于就绪状态,selector多路选择复用器会将所有处于就绪状态的Channel轮询出来,以继续后面的io操作,一个Selector可以负责上万级别的Channel,没有上限,这也是JDK使用epoll代替了传统的selector实现。
    服务端代码

    public class NioServer implements Runnable {
        private Selector seletor;
        //2 建立缓冲区
        private ByteBuffer readBuf = ByteBuffer.allocate(1024);
        //3
        private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
    
        public NioServer(int port) {
            try {
                //1 打开多路复用器
                this.seletor = Selector.open();
                //2 打开服务器通道
                ServerSocketChannel ssc = ServerSocketChannel.open();
                //3 设置服务器通道为非阻塞模式
                ssc.configureBlocking(false);
                //4 绑定地址
                ssc.bind(new InetSocketAddress(port));
                //5 把服务器通道注册到多路复用器上,并且监听阻塞事件
                ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
                System.out.println("Server start, port :" + port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    //1 必须要让多路复用器开始监听
                    this.seletor.select();
                    //2 返回多路复用器已经选择的结果集
                    Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
                    //3 进行遍历
                    while (keys.hasNext()) {
                        //4 获取一个选择的元素
                        SelectionKey key = keys.next();
                        //5 直接从容器中移除就可以了
                        keys.remove();
                        //6 如果是有效的
                        if (key.isValid()) {
                            //7 如果为阻塞状态
                            if (key.isAcceptable()) {
                                this.accept(key);
                            }
                            //8 如果为可读状态
                            if (key.isReadable()) {
                                this.read(key);
                            }
                            //9 写数据
                            if (key.isWritable()) {
                                this.write(key); //ssc
                            }
                        }
    
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        private void write(SelectionKey key) throws ClosedChannelException {
            ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
            ssc.register(this.seletor, SelectionKey.OP_WRITE);
        }
    
        private void read(SelectionKey key) {
            try {
                //1 清空缓冲区旧的数据
                this.readBuf.clear();
                //2 获取之前注册的socket通道对象
                SocketChannel sc = (SocketChannel) key.channel();
                //3 读取数据
                int count = sc.read(this.readBuf);
                //4 如果没有数据
                if(count == -1){
                    key.channel().close();
                    key.cancel();
                    return;
                }
                //5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
                this.readBuf.flip();
                //6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
                byte[] bytes = new byte[this.readBuf.remaining()];
                //7 接收缓冲区数据
                this.readBuf.get(bytes);
                //8 打印结果
                String body = new String(bytes).trim();
                System.out.println("Server : " + body);
    
                // 9..可以写回给客户端数据
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        private void accept(SelectionKey key) {
            try {
                //1 获取服务通道
                ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
                //2 执行阻塞方法
                SocketChannel sc = ssc.accept();
                //3 设置阻塞模式
                sc.configureBlocking(false);
                //4 注册到多路复用器上,并设置读取标识
                sc.register(this.seletor, SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) {
            new Thread(new NioServer(8088)).start();;
        }
    }
    

    客户端

    class NioClient{
        private static String DEFAULT_HOST = "127.0.0.1";
        private static int DEFAULT_PORT = 8088;
        private static ClientHandle clientHandle;
        public static void start(){
            start(DEFAULT_HOST,DEFAULT_PORT);
        }
        public static synchronized void start(String ip,int port){
            if(clientHandle!=null)
                clientHandle.stop();
            clientHandle = new ClientHandle(ip,port);
            new Thread(clientHandle,"Server").start();
        }
        //向服务器发送消息
        public static boolean sendMsg(String msg) throws Exception{
            if(msg.equals("q")) return false;
            clientHandle.sendMsg(msg);
            return true;
        }
        public static void main(String[] args){
            start();
        }
    }
    class ClientHandle implements Runnable{
        private String host;
        private int port;
        private Selector selector;
        private SocketChannel socketChannel;
        private volatile boolean started;
    
        public ClientHandle(String ip,int port) {
            this.host = ip;
            this.port = port;
            try{
                //创建选择器
                selector = Selector.open();
                //打开监听通道
                socketChannel = SocketChannel.open();
                //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
                socketChannel.configureBlocking(false);//开启非阻塞模式
                started = true;
            }catch(IOException e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        public void stop(){
            started = false;
        }
        @Override
        public void run() {
            try{
                doConnect();
            }catch(IOException e){
                e.printStackTrace();
                System.exit(1);
            }
            //循环遍历selector
            while(started){
                try{
                    //无论是否有读写事件发生,selector每隔1s被唤醒一次
                    selector.select(1000);
                    //阻塞,只有当至少一个注册的事件发生的时候才会继续.
    //              selector.select();
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    SelectionKey key = null;
                    while(it.hasNext()){
                        key = it.next();
                        it.remove();
                        try{
                            handleInput(key);
                        }catch(Exception e){
                            if(key != null){
                                key.cancel();
                                if(key.channel() != null){
                                    key.channel().close();
                                }
                            }
                        }
                    }
                }catch(Exception e){
                    e.printStackTrace();
                    System.exit(1);
                }
            }
            //selector关闭后会自动释放里面管理的资源
            if(selector != null)
                try{
                    selector.close();
                }catch (Exception e) {
                    e.printStackTrace();
                }
        }
        private void handleInput(SelectionKey key) throws IOException{
            if(key.isValid()){
                SocketChannel sc = (SocketChannel) key.channel();
                if(key.isConnectable()){
                    if(sc.finishConnect());
                    else System.exit(1);
                }
                //读消息
                if(key.isReadable()){
                    //创建ByteBuffer,并开辟一个1M的缓冲区
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    //读取请求码流,返回读取到的字节数
                    int readBytes = sc.read(buffer);
                    //读取到字节,对字节进行编解码
                    if(readBytes>0){
                        //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
                        buffer.flip();
                        //根据缓冲区可读字节数创建字节数组
                        byte[] bytes = new byte[buffer.remaining()];
                        //将缓冲区可读字节数组复制到新建的数组中
                        buffer.get(bytes);
                        String result = new String(bytes,"UTF-8");
                        System.out.println("客户端收到消息:" + result);
                    }
                    //没有读取到字节 忽略
    //              else if(readBytes==0);
                    //链路已经关闭,释放资源
                    else if(readBytes<0){
                        key.cancel();
                        sc.close();
                    }
                }
            }
        }
        //异步发送消息
        private void doWrite(SocketChannel channel,String request) throws IOException{
            //将消息编码为字节数组
            byte[] bytes = request.getBytes();
            //根据数组容量创建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //将字节数组复制到缓冲区
            writeBuffer.put(bytes);
            //flip操作
            writeBuffer.flip();
            //发送缓冲区的字节数组
            channel.write(writeBuffer);
            //****此处不含处理“写半包”的代码
        }
        private void doConnect() throws IOException{
            if(socketChannel.connect(new InetSocketAddress(host,port)));
            else socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
        public void sendMsg(String msg) throws Exception{
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel, msg);
        }
    }
    

    AIO:异步非阻塞

    AIO基于事件和回调机制,不需要过多的Selector对注册的通道进行轮询即可实现异步读写,从而简化了NIO的编程模型。
    服务端

    public class AioServer {
        public static void main(String[] args) {
            // AIO线程复用版
            Thread sThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    AsynchronousChannelGroup group = null;
                    try {
                        group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
                        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress(InetAddress.getLocalHost(), 8088));
                        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() {
                            @Override
                            public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
                                server.accept(null, this); // 接收下一个请求
                                try {
                                    Future<Integer> f = result.write(Charset.defaultCharset().encode("你好,世界"));
                                    f.get();
                                    System.out.println("服务端发送时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                                    result.close();
                                } catch (InterruptedException | ExecutionException | IOException e) {
                                    e.printStackTrace();
                                }
                            }
    
                            @Override
                            public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
                            }
                        });
                        group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
                    } catch (IOException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            sThread.start();
        }
    }
    

    客户端

    class AioClient {
        public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
            // Socket 客户端
            AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
            Future<Void> future = client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8088));
            future.get();
            ByteBuffer buffer = ByteBuffer.allocate(100);
            client.read(buffer, null, new CompletionHandler<Integer, Void>() {
                @Override
                public void completed(Integer result, Void attachment) {
                    System.out.println("客户端打印:" + new String(buffer.array()));
                }
    
                @Override
                public void failed(Throwable exc, Void attachment) {
                    exc.printStackTrace();
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
            Thread.sleep(10 * 1000);
        }
    }
    

    相关文章

      网友评论

          本文标题:BIO、NIO、AIO笔记

          本文链接:https://www.haomeiwen.com/subject/qttfuhtx.html