AIO

作者: yongguang423 | 来源:发表于2018-09-17 07:32 被阅读18次

    Asynchronous IO: 异步非阻塞的编程方式
    与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。在JDK1.7中,这部分内容被称作NIO.2,主要在java.nio.channels包下增加了下面四个异步通道:
    AsynchronousSocketChannel
    AsynchronousServerSocketChannel
    AsynchronousFileChannel
    AsynchronousDatagramChannel
    异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O 请求都是由OS
    先完成了再通知服务器应用去启动线程进行处理。
    AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调
    用OS 参与并发操作,编程比较复杂,JDK7 开始支持。

    image.png
    package com.bjsxt.socket.aio;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.util.Scanner;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class AIOClient {
        
        private AsynchronousSocketChannel channel;
        
        public AIOClient(String host, int port){
            init(host, port);
        }
        
        private void init(String host, int port){
            try {
                // 开启通道
                channel = AsynchronousSocketChannel.open();
                // 发起请求,建立连接。
                channel.connect(new InetSocketAddress(host, port));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        public void write(String line){
            try {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.put(line.getBytes("UTF-8"));
                buffer.flip();
                channel.write(buffer);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        
        public void read(){
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            try {
                // read方法是异步方法,OS实现的。get方法是一个阻塞方法,会等待OS处理结束后再返回。
                channel.read(buffer).get();
                // channel.read(dst, attachment, handler);
                buffer.flip();
                System.out.println("from server : " + new String(buffer.array(), "UTF-8"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        
        public void doDestory(){
            if(null != channel){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public static void main(String[] args) {
            AIOClient client = new AIOClient("localhost", 9999);
            try{
                System.out.print("enter message send to server > ");
                Scanner s = new Scanner(System.in);
                String line = s.nextLine();
                client.write(line);
                client.read();
            }finally{
                client.doDestory();
            }
        }
    
    }
    
    
    package com.bjsxt.socket.aio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.AsynchronousChannelGroup;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.util.Scanner;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class AIOServer {
    
        // 线程池, 提高服务端效率。
        private ExecutorService service;
        // 线程组
        // private AsynchronousChannelGroup group;
        // 服务端通道, 针对服务器端定义的通道。
        private AsynchronousServerSocketChannel serverChannel;
        
        public AIOServer(int port){
            init(9999);
        }
        
        private void init(int port){
            try {
                System.out.println("server starting at port : " + port + " ...");
                // 定长线程池
                service = Executors.newFixedThreadPool(4);
                /* 使用线程组
                group = AsynchronousChannelGroup.withThreadPool(service);
                serverChannel = AsynchronousServerSocketChannel.open(group);
                */
                // 开启服务端通道, 通过静态方法创建的。
                serverChannel = AsynchronousServerSocketChannel.open();
                // 绑定监听端口, 服务器启动成功,但是未监听请求。
                serverChannel.bind(new InetSocketAddress(port));
                System.out.println("server started.");
                // 开始监听 
                // accept(T attachment, CompletionHandler<AsynchronousSocketChannel, ? super T>)
                // AIO开发中,监听是一个类似递归的监听操作。每次监听到客户端请求后,都需要处理逻辑开启下一次的监听。
                // 下一次的监听,需要服务器的资源继续支持。
                serverChannel.accept(this, new AIOServerHandler());
                try {
                    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) {
            new AIOServer(9999);
        }
    
        public ExecutorService getService() {
            return service;
        }
    
        public void setService(ExecutorService service) {
            this.service = service;
        }
    
        public AsynchronousServerSocketChannel getServerChannel() {
            return serverChannel;
        }
    
        public void setServerChannel(AsynchronousServerSocketChannel serverChannel) {
            this.serverChannel = serverChannel;
        }
        
    }
    
    
    package com.bjsxt.socket.aio;
    
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.Scanner;
    import java.util.concurrent.ExecutionException;
    
    public class AIOServerHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServer> {
    
        /**
         * 业务处理逻辑, 当请求到来后,监听成功,应该做什么。
         * 一定要实现的逻辑: 为下一次客户端请求开启监听。accept方法调用。
         * result参数 : 就是和客户端直接建立关联的通道。
         *  无论BIO、NIO、AIO中,一旦连接建立,两端是平等的。
         *  result中有通道中的所有相关数据。如:OS操作系统准备好的读取数据缓存,或等待返回数据的缓存。
         */
        @Override
        public void completed(AsynchronousSocketChannel result, AIOServer attachment) {
            // 处理下一次的客户端请求。类似递归逻辑
            attachment.getServerChannel().accept(attachment, this);
            doRead(result);
        }
    
        /**
         * 异常处理逻辑, 当服务端代码出现异常的时候,做什么事情。
         */
        @Override
        public void failed(Throwable exc, AIOServer attachment) {
            exc.printStackTrace();
        }
        
        /**
         * 真实项目中,服务器返回的结果应该是根据客户端的请求数据计算得到的。不是等待控制台输入的。
         * @param result
         */
        private void doWrite(AsynchronousSocketChannel result){
            try {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                System.out.print("enter message send to client > ");
                Scanner s = new Scanner(System.in);
                String line = s.nextLine();
                buffer.put(line.getBytes("UTF-8"));
                // 重点:必须复位,必须复位,必须复位
                buffer.flip();
                // write方法是一个异步操作。具体实现由OS实现。 可以增加get方法,实现阻塞,等待OS的写操作结束。
                result.write(buffer);
                // result.write(buffer).get(); // 调用get代表服务端线程阻塞,等待写操作完成
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }/* catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }*/
        }
        
        private void doRead(final AsynchronousSocketChannel channel){
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            /*
             * 异步读操作, read(Buffer destination, A attachment, 
             *                    CompletionHandler<Integer, ? super A> handler)
             * destination - 目的地, 是处理客户端传递数据的中转缓存。 可以不使用。
             * attachment - 处理客户端传递数据的对象。 通常使用Buffer处理。
             * handler - 处理逻辑
             */
            channel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
    
                /**
                 * 业务逻辑,读取客户端传输数据
                 * attachment - 在completed方法执行的时候,OS已经将客户端请求的数据写入到Buffer中了。
                 *  但是未复位(flip)。 使用前一定要复位。
                 */
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    try {
                        System.out.println(attachment.capacity());
                        // 复位
                        attachment.flip();
                        System.out.println("from client : " + new String(attachment.array(), "UTF-8"));
                        doWrite(channel);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
    
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    exc.printStackTrace();
                }
            });
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:AIO

          本文链接:https://www.haomeiwen.com/subject/ufjhnftx.html