美文网首页
Java IO 演进之路

Java IO 演进之路

作者: 剑道_7ffc | 来源:发表于2020-03-22 14:44 被阅读0次

    必须明白的几个概念

    阻塞(Block)和非阻塞(Non-Block)

    阻塞和非阻塞是在缓冲区数据没有准备就绪的情况下处理方式
    阻塞:在没有就绪下,就一直等待哪里
    非阻塞:在没有就绪下,则直接返回
    阻塞和非阻塞是相对于io操作来讲
    举例如下图:从机器内存到jvm内存


    image.png

    同步(Synchronization)和异步(Asynchronous)

    同步和异步是处理io事件的采用方式
    同步:应用程序参与io的处理
    异步:io交给操作系统,应用程序只需要等待通知
    异步和同步是相对于时间点,异步指同一时间点做多个处理,同步指同一时间点做一个处理。

    读和写

    读指input,写指output,输入和输出是相对于内存来讲的

    BIO 与 NIO 对比

    BIO:面向流(乡村公路),阻塞 IO(多线程)
    NIO:面向缓冲(高速公路,多路复用技术),非阻塞IO(反应堆 Reactor),选择器(轮询机制)


    image.png

    阻塞和非阻塞

    NIO是发现缓冲数据没有准备好,直接返回,处理其他事情,因此可以一个线程处理多个输入和输出通道。
    BIO 是发现缓冲区数据没有准备好,则等待哪里,因此,可以一个线程处理一个输入和输出通道。

    Java AIO

    真正实现异步IO

    代码实现

    BIO

    1 服务端

    //同步阻塞IO模型
    public class BIOServer {
        ServerSocket server;
        public BIOServer(int port){
            try {
                server = new ServerSocket(port);
                System.out.println("BIO服务已启动,监听端口是:" + port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public void listen() throws IOException{
            //循环监听
            while(true){
                //等待客户端连接,阻塞方法
                Socket client = server.accept();
    
                InputStream is = client.getInputStream();
                //网络客户端把数据发送到网卡,机器所得到的数据读到了JVM内中
                byte [] buff = new byte[1024];
                int len = is.read(buff);
                if(len > 0){
                    String msg = new String(buff,0,len);
                    System.out.println("收到" + msg);
                }
            }
        }   
        public static void main(String[] args) throws IOException {
            new BIOServer(8080).listen();
        }
    }
    

    2 客户端

    public class BIOClient {
        public static void main(String[] args) throws UnknownHostException, IOException {
            Socket client = new Socket("localhost", 8080);
    
            OutputStream os = client.getOutputStream();
    
            //生成一个随机的ID
            String name = UUID.randomUUID().toString();
    
            System.out.println("客户端发送数据:" + name);
            os.write(name.getBytes());
            os.close();
            client.close();
        }
    }
    

    NIO

    1 服务端

    /**
     * NIO的操作过于繁琐,于是才有了Netty
     * Netty就是对这一系列非常繁琐的操作进行了封装
     */
    public class NIOServerDemo {
    
        private int port = 8080;
    
        //轮询器 Selector 大堂经理
        private Selector selector;
        //缓冲区 Buffer 等候区
        private ByteBuffer buffer = ByteBuffer.allocate(1024);
    
        //初始化完毕
        public NIOServerDemo(int port){
            try {
                this.port = port;
    
                //远程服务器通道
                ServerSocketChannel server = ServerSocketChannel.open();
                //我得告诉地址
                server.bind(new InetSocketAddress(this.port));
                //BIO 升级版本 NIO,为了兼容BIO,NIO模型默认是采用阻塞式
                server.configureBlocking(false);
    
                //轮训器
                selector = Selector.open();
    
                //轮训器属于该通道
                server.register(selector, SelectionKey.OP_ACCEPT);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public void listen(){
            System.out.println("listen on " + this.port + ".");
            try {
                //轮询主线程
                while (true){
                    //通道个数,当没有通道则阻塞,有则继续进行
                    selector.select();
                    //每次都拿到所有的号子
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    //不断地迭代,就叫轮询
                    //同步体现在这里,因为每次只能拿一个key,每次只能处理一种状态
                    while (iter.hasNext()){
                        SelectionKey key = iter.next();
                        iter.remove();
                        //每一个key代表一种状态
                        //数据就绪、数据可读、数据可写 等等等等
                        process(key);
                    }
                    
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        //具体办业务的方法,坐班柜员
        //在同一时间点,只能干一件事
        private void process(SelectionKey key) throws IOException {
            //针对于每一种状态给一个反应
            if(key.isAcceptable()){
                ServerSocketChannel server = (ServerSocketChannel)key.channel();
                //这个方法体现非阻塞,不管你数据有没有准备好
                //你给我一个状态和反馈
                SocketChannel channel = server.accept();
                //一定一定要记得设置为非阻塞
                channel.configureBlocking(false);
                //当数据准备就绪的时候,将状态改为可读
                channel.register(selector,SelectionKey.OP_READ);
            }
            else if(key.isReadable()){
                //key.channel 从多路复用器中拿到客户端的引用
                SocketChannel channel = (SocketChannel)key.channel();
                int len = channel.read(buffer);
                if(len > 0){
                    buffer.flip();
                    String content = new String(buffer.array(),0,len);
                    key = channel.register(selector,SelectionKey.OP_WRITE);
                    //在key上携带一个附件,一会再写出去
                    key.attach(content);
                    System.out.println("读取内容:" + content);
                }
            }
            else if(key.isWritable()){
                SocketChannel channel = (SocketChannel)key.channel();
    
                String content = (String)key.attachment();
                channel.write(ByteBuffer.wrap(("输出:" + content).getBytes()));
    
                channel.close();
            }
        }
        public static void main(String[] args) {
            new NIOServerDemo(8080).listen();
        }
    }
    

    2 客户端
    同BIO客户端

    AIO

    1 服务端

    public class AIOServer {
        private final int port;
        public static void main(String args[]) {
            int port = 8000;
            new AIOServer(port);
        }
        public AIOServer(int port) {
            this.port = port;
            listen();
        }
        private void listen() {
            try {
                ExecutorService executorService = Executors.newCachedThreadPool();
                AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
                //开门营业
                //工作线程,用来侦听回调的,事件响应的时候需要回调
                final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadGroup);
                server.bind(new InetSocketAddress(port));
                System.out.println("服务已启动,监听端口" + port);
    
                //准备接受数据
                server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){
                    final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                    //实现completed方法来回调
                    //由操作系统来触发
                    //回调有两个状态,成功
                    public void completed(AsynchronousSocketChannel result, Object attachment){
                        System.out.println("IO操作成功,开始获取数据");
                        try {
                            buffer.clear();
                            result.read(buffer).get();
                            buffer.flip();
                            result.write(buffer);
                            buffer.flip();
                        } catch (Exception e) {
                            System.out.println(e.toString());
                        } finally {
                            try {
                                result.close();
                                server.accept(null, this);
                            } catch (Exception e) {
                                System.out.println(e.toString());
                            }
                        }
    
                        System.out.println("操作完成");
                    }
    
                    @Override
                    //回调有两个状态,失败
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("IO操作是失败: " + exc);
                    }
                });
    
                try {
                    Thread.sleep(Integer.MAX_VALUE);
                } catch (InterruptedException ex) {
                    System.out.println(ex);
                }
            } catch (IOException e) {
                System.out.println(e);
            }
        }
    }
    

    2 客户端

    public class AIOClient {
        private final AsynchronousSocketChannel client;
    
        public AIOClient() throws Exception{
            client = AsynchronousSocketChannel.open();
        }
    
        public void connect(String host,int port)throws Exception{
            client.connect(new InetSocketAddress(host,port),null,new CompletionHandler<Void,Void>() {
                @Override
                public void completed(Void result, Void attachment) {
                    try {
                        client.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get();
                        System.out.println("已发送至服务器");
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
    
                @Override
                public void failed(Throwable exc, Void attachment) {
                    exc.printStackTrace();
                }
            });
            final ByteBuffer bb = ByteBuffer.allocate(1024);
            client.read(bb, null, new CompletionHandler<Integer,Object>(){
    
                        @Override
                        public void completed(Integer result, Object attachment) {
                            System.out.println("IO操作完成" + result);
                            System.out.println("获取反馈结果" + new String(bb.array()));
                        }
    
                        @Override
                        public void failed(Throwable exc, Object attachment) {
                            exc.printStackTrace();
                        }
                    }
            );
    
            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException ex) {
                System.out.println(ex);
            }
    
        }
        public static void main(String args[])throws Exception{
            new AIOClient().connect("localhost",8000);
        }
    }
    

    相关文章

      网友评论

          本文标题:Java IO 演进之路

          本文链接:https://www.haomeiwen.com/subject/jufvyhtx.html