美文网首页
【网络编程】AIO编程

【网络编程】AIO编程

作者: 程就人生 | 来源:发表于2023-02-24 12:30 被阅读0次

    JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0。同时,Java正式提供了异步文件IO操作,同时提供给了与UNIX网络编程事件驱动IO对应的AIO。

    NIO2.0引入了新的异步通道的概念,提供了异步文件通道和异步套接字通道的实现。异步通道以两种方式获取操作结果。

    • 通过java.util.concurrent.Future来来获取异步操作的结果;

    • 在执行异步操作的时候传入一个java.nio.channels。

    NIO2.0的异步套接字是真正的异步非阻塞IO,对应UNIX网络编程中的事件驱动IO,即AIO。它不需要通过多路复用选择器Selector对注册的通道进行轮询操作即可实现异步读写,简化了NIO的编程模型。服务器代码:

    package com.test.aio;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * AIO编程服务器端
     * @author 程就人生
     * @Date
     */
    public class HelloServer {
    
      public static void main( String[] args ){
        int port = 8080;
        AsyncHelloServerHandler helloServer = new  AsyncHelloServerHandler(port);
        new Thread(helloServer,"AIO").start();
      }
    }
    
    /**
     * AIO编程handler处理类
     * @author 程就人生
     * @Date
     */
    class AsyncHelloServerHandler implements Runnable{
      
      private int port;
      
      CountDownLatch latch;
      
      AsynchronousServerSocketChannel asynchronousServerSocketChannel;
    
      public AsyncHelloServerHandler(int port) {
        this.port = port;
        try {
          // 创建一个异步的服务端通道
          asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
          // 绑定监听端口
          asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
          System.out.println("AIO编程,服务器端已启动,启动端口号为:" + port);
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    
      public void run() {
        // 完成一组正在执行的操作之前,允许当前的线程一直阻塞
        latch = new CountDownLatch(1);
        // 接收客户端的连接
        doAccept();
        try{
          latch.await();
        }catch(InterruptedException e){
          e.printStackTrace();
        }
      }
    
      private void doAccept() {
        asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
      }  
    }
    /**
     * 接收新的客户端连接
     * @author 程就人生
     * @Date
     */
    class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncHelloServerHandler>{
    
      public void completed(AsynchronousSocketChannel result, AsyncHelloServerHandler attachment) {
        attachment.asynchronousServerSocketChannel.accept(attachment, this);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        result.read(buffer, buffer, new ReadCompletionHandler(result));
      }
    
      public void failed(Throwable exc, AsyncHelloServerHandler attachment) {
        attachment.latch.countDown();
      }
      
    }
    /**
     * 接收通知回调处理的handler
     * @author 程就人生
     * @Date
     */
    class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer>{
      
      private AsynchronousSocketChannel channel;  
    
      public ReadCompletionHandler(AsynchronousSocketChannel channel) {
        this.channel = channel;
      }
    
      public void completed(Integer result, ByteBuffer attachment) {
        attachment.flip();
        byte[] body = new byte[attachment.remaining()];
        attachment.get(body);
        try {
          String req = new String(body, "utf-8");
          System.out.println("服务器端收到的:" + req);
          // 应答
          doWrite();
        } catch (UnsupportedEncodingException e) {
          e.printStackTrace();
        }
      }
    
      private void doWrite() {
        byte[] bytes = "服务器端的反馈消息".getBytes();
        // 发送缓冲区
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        writeBuffer.flip();
        channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>(){
    
          public void completed(Integer result, ByteBuffer buffer) {
            // 如果没有发送完,继续发送
            if(buffer.hasRemaining()){
              channel.write(buffer, buffer, this);
            }
          }
    
          public void failed(Throwable exc, ByteBuffer buffer) {
            try {
              channel.close();
            } catch (IOException e) {
              e.printStackTrace();
            }
          }
        });
      }
    
      public void failed(Throwable exc, ByteBuffer attachment) {
        try {
          this.channel.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }  
    }
    

    在AsyncHelloServerHandler 的构造方法中,首先创建一个异步的服务器端通道AsynchronousServerSocketChannel,然后调用bind方法绑定监听端口。

    在线程run方法中,初始化CountDownLatch对象,完成一组正在执行的操作之前,允许当前的线程一直阻塞,防止服务器端执行完退出,实际项目中不需要。

    使用doAccept方法接收客户端的连接,因为是异步操作,在这里传递一个CompletionHandler类型的handler实例接收accept操作成功的通知消息。在AcceptCompletionHandler 类中可以接收新加入的客户端连接。在77行预分配1KB的缓冲区。78行调用read进行异步读操作。

    在118行先进行flip操作,为后续从缓冲区读取数据做准备。根据缓冲区的可读字节数创建byte数据组,通过new String对字节数组进行编码,并打印出来。最后通过doWrite回写客户端。

    客户端代码:

    package com.test.aio;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * AIO编程客户端
     * @author 程就人生
     * @Date
     */
    public class HelloClient {
    
      public static void main( String[] args ){
        int port = 8080;
        new Thread(new AsyncHelloHandler("127.0.0.1",port)).start();
      }
    }
    
    class AsyncHelloHandler implements CompletionHandler<Void, AsyncHelloHandler>, Runnable{
      
      private String host;
      
      private int port;
      
      private AsynchronousSocketChannel client;
      
      private CountDownLatch latch;
    
      public AsyncHelloHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
          client = AsynchronousSocketChannel.open();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    
      public void run() {
        latch = new CountDownLatch(1);
        client.connect(new InetSocketAddress(host, port), this, this);
        try {
          latch.await();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        try {
          client.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    
      public void completed(Void result, AsyncHelloHandler attachment) {
        byte[] req = "客户端的消息".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        client.write(writeBuffer,writeBuffer, new CompletionHandler<Integer, ByteBuffer>(){
    
          public void completed(Integer result, ByteBuffer buffer) {
            if(buffer.hasRemaining()){
              client.write(buffer, buffer, this);
            }else{
              ByteBuffer readBuffer = ByteBuffer.allocate(1024);
              client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>(){
    
                public void completed(Integer result, ByteBuffer buffer) {
                  buffer.flip();
                  byte[] bytes = new byte[buffer.remaining()];
                  buffer.get(bytes);
                  try {
                    String body = new String(bytes, "utf-8");
                    System.out.println("客户端读取到的:" + body);
                    latch.countDown();
                    System.out.println("客户端操作完毕,关闭连接");
                  } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                  }
                }
    
                public void failed(Throwable exc, ByteBuffer buffer) {
                  try {
                    client.close();
                    latch.countDown();
                  } catch (IOException e) {
                    e.printStackTrace();
                  }
                }
                
              });
            }
          }
    
          public void failed(Throwable exc, ByteBuffer attachment) {
            try {
              client.close();
              latch.countDown();
            } catch (IOException e) {
              e.printStackTrace();
            }
          }
          
        });
      }
    
      public void failed(Throwable exc, AsyncHelloHandler attachment) {
        try {
          client.close();
          latch.countDown();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }  
    }
    

    在20行,客户端通过一个独立的线程创建异步客户端处理handler。在AsyncHelloHandler中,使用了大量匿名内部类。第38行,通过AsynchronousSocketChannel的open方法创建了一个新的AsynchronousSocketChannel对象。

    在第45行,创建CountDownLatch进行等待,防止异步操咋没有执行完线程就退出了。在第46行,通过connect方法发起异步操作。

    在第73行,completed异步连接成功之后的方法回调,创建请求消息体,对bytebuffer进行flip,并发给服务器端。发送给服务器端后,接收服务器端的反馈,并进行打印,最后调用latch的countDown()方法,关闭客户端连接。

    服务器端执行结果:

    image.png

    客户端执行结果:

    image.png

    以上便是来自java.nio包的非阻塞异步服务器端、客户端编码的简单演示。

    相关文章

      网友评论

          本文标题:【网络编程】AIO编程

          本文链接:https://www.haomeiwen.com/subject/yjlxldtx.html