美文网首页
【网络编程】NIO编程

【网络编程】NIO编程

作者: 程就人生 | 来源:发表于2023-02-22 20:11 被阅读0次

    NIO是jdk1.4引入的java.nio包,它提供了高速的、面向块的IO。通过定义包含数据的类,以及通过块的形式处理这些数据。NIO类库包含缓冲区Buffer、多路复用选择器Selector、通道Channel等新的抽象,可以构建多路复用、同步非阻塞的IO程序,同时提供了更接近操作系统底层高性能的数据操作方式。

    缓冲区Buffer,包含一些要写入或者要读出的数据。在面向流的IO中,可以将数据直接写入或者将数据直接读到Stream对象中。缓冲区提供了对数据的结构化访问以及维护读写位置等信息。

    缓冲区Buffer是一个数组,通常是一个字节数组ByteBuffer,还有其他类型的数组字符缓冲区CharBuffer、短整型缓冲区ShortBuffer、整型缓冲区IntBuffer、长整形缓冲区LongBuffer、浮点型整型区FloatBuffer、双精度浮点型缓冲区。每一种Buffer的类都是Buffer接口的一个子实例。所以它们有完全一样的操作,只是操作的数据类型不一样。

    通道Channel,Channel好比自来水管,网络数据通过Channel读取和写入。通道与流的不同之处在于,通道是双向的,而流是单向的,流只能在一个方向上移动,一个流必须是InputStream或者OutputStream的子类,通道可以用于读写或者两者同时进行。通道Channel是全双工的,所以它可以比流更好地映射底层操作系统的API。Channel可以分为两大类,一类用于网络对象,一类用于文件操作。

    多路复用选择器Selector,Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,通过SelectionKey可以获得就绪Channel的集合,进行后续的IO操作。

    一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。只要一个线程负责Selector的轮询,就可以接入成千上万的客户端。

    下面来看服务器端代码:

    package com.test.nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * NIO服务器端
     * @author 程就人生
     * @Date
     */
    public class HelloServer {
    
      public static void main( String[] args ){
        int port = 8080;
        // 多路复用服务类
        MultiplexerHelloServer helloServer = new MultiplexerHelloServer(port);
            new Thread(helloServer,"多路复用服务类").start();
        }
    }
    
    class MultiplexerHelloServer implements Runnable{
    
      private Selector selector;
    
      private ServerSocketChannel serverChannel;
    
      private volatile boolean stop;
    
      /**
       * 初始化多路复用,绑定监听端口
       * @param port
       */
      MultiplexerHelloServer(int port){
        try {
          // 初始化多路复用器,创建Selector
          selector = Selector.open();
          // 打开ServerSocketChannel
          serverChannel = ServerSocketChannel.open();
          // 设置为非堵塞模式
          serverChannel.configureBlocking(false);
          // 绑定监听端口
          serverChannel.socket().bind(new InetSocketAddress(port), 1024);
          // 将ServerSocketChannel注册到Selector上去,监听accept事件
          serverChannel.register(selector, SelectionKey.OP_ACCEPT);
          System.out.println("服务器端已启动,启动端口为:" + port);
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    
      public void stop(){
        this.stop = true;
      }
    
      public void run() {
        while(!stop){
          try {
            SelectionKey key = null;
            // 每隔一秒被唤醒一次
            selector.select(1000);
            // 获取就绪状态的SelectionKey
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            // 对就绪状态的SelectionKey进行迭代
            Iterator<SelectionKey> it = selectedKeys.iterator();        
            while(it.hasNext()){
              key = it.next();
              it.remove();
              try{
                // 对网络事件进行操作(连接和读写操作)
                handleInput(key);
              }catch(Exception e){
                if(key != null){
                  key.cancel();
                  if(key.channel() != null){
                    try {
                      key.channel().close();
                    } catch (IOException e1) {
                      e1.printStackTrace();
                    }
                  }
                }
              }          
            }
          } catch (IOException e) {
            e.printStackTrace();
          }      
        }  
        // 多路复用器关闭后,所有注册在上面的channel和pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if(selector != null){
          try {
            selector.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
      }
    
      private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
          // 处理新接入的客户端请求信息
          if(key.isAcceptable()){
            // 接入新的连接
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel sc = ssc.accept();
            // 设置为异步非阻塞
            sc.configureBlocking(false);
            // 监听读操作
            sc.register(selector, SelectionKey.OP_READ);
          }
          // 处理客户端发来的信息,读取操作
          if(key.isReadable()){
            SocketChannel sc = (SocketChannel) key.channel();
            // 开辟一个1KB的缓冲区
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            int readBytes = sc.read(readBuffer);
            // 读取到了字节
            if(readBytes > 0){
              readBuffer.flip();
              byte[] bytes = new byte[readBuffer.remaining()];
              readBuffer.get(bytes);
              // 对字节码进行解码
              String body = new String(bytes, "UTF-8");
              System.out.println("服务器端收到的:" + body);
              // 回写客户端
              doWrite(sc);        
            }
          }
        }
      }
    
      // 回写客户端
      private void doWrite(SocketChannel sc) throws IOException{
        byte[] bytes = "服务器端的响应来了".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        writeBuffer.flip();
        sc.write(writeBuffer);
      }
    }
    

    在40-56行,在构造方法中对资源进行初始化。创建多路复用选择器Selector、ServerSocketChannel,对Channel和TCP参数进行配置。

    在63行代码,对网络事件进行轮询监听;在67行代码中,每隔1s唤醒一次,监听多路复用选择器中是否有就绪的SelectionKey,如果有则进行遍历。

    在105行代码中,在handleInput方法中,对SelectionKey进行判断。判断SelectionKey目前所处的状态,是接入的新连接,还是处于网络读状态。如果是新连接,则监听网络读操作。如果是网络读操作,在通过doWrite方法回写客户端。

    客户端代码:

    package com.test.nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    /**
     * NIO客户端
     * @author 程就人生
     * @Date
     */
    public class HelloClient {
    
      public static void main( String[] args ){
        int port = 8080;
        new Thread(new HelloClientHandle("127.0.0.1", port)).start();    
      }
    }
    
    /**
     * 客户端处理器
     * @author 程就人生
     * @Date
     */
    class HelloClientHandle implements Runnable{
    
      private Selector selector;
    
      private SocketChannel socketChannel;
    
      private volatile boolean stop;
    
      private String host;
    
      private int port;
    
      public HelloClientHandle(String host, int port) {
        try {
          this.host = host;
          this.port = port;
          // 创建多路复用选择器
          selector = Selector.open();
          // 打开SocketChannel
          socketChannel = SocketChannel.open();
          // 设置为非阻塞
          socketChannel.configureBlocking(false);      
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    
      public void run() {
        try {
          // 连接服务器端判断
          if(!socketChannel.connect(new InetSocketAddress(host,port))){        
            // 将socketChannel注册到多路复用器,并监听连接操作
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
          }
        } catch (IOException e) {
          e.printStackTrace();
        }
        while(!stop){
          try {
            selector.select(1000);
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectedKeys.iterator();
            SelectionKey key = null;
            while(it.hasNext()){
              key = it.next();
              it.remove();
              handleInput(key);
            }
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
        if(selector != null){
          try {
            selector.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }    
      }
    
      private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
          SocketChannel sc = (SocketChannel) key.channel();
          if(key.isConnectable()){
            // 完成连接
            if(sc.finishConnect()){
              // 监听读时间
              sc.register(selector, SelectionKey.OP_READ);
              // 给服务器端发送消息
              doWrite(sc);
            }else{
              // 退出
              System.exit(1);
            }
          }
          // 是否为可读的
          if(key.isReadable()){
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            int readBytes =sc.read(readBuffer);
            // 读到了字节
            if(readBytes > 0){
              readBuffer.flip();
              byte[] bytes = new byte[readBuffer.remaining()];
              readBuffer.get(bytes);
              String body = new String(bytes, "utf-8");
              System.out.println("客户端收到的信息是:" + body);
              this.stop = true;
    
              // 没读到字节
            } else if(readBytes < 0){
              // 取消
              key.cancel();
              // 关闭连接
              sc.close();
            }
          }
        }    
      }
    
      /**
       * 网络写操作
       * @param sc
       * @throws IOException
       */
      private void doWrite(SocketChannel sc) throws IOException{
        byte[] req = "来自客户端的消息".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if(!writeBuffer.hasRemaining()){
          System.out.println("已发送给服务器端~!");
        }
      }
    }
    

    客户端的连接步骤:先创建多路复用选择器,打开SocketChannel,绑定本地端地址,设置SocketChannel为非阻塞,异步连接服务器。将SocketChannel注册到多路选择器中,注册监听事件。

    SocketChannel连接服务器连接成功后,对SelectionKey进行轮询监听,每隔10s唤醒一次。在97行,连接成功后,监听网络读事件,并给服务器端发送消息。

    在第106行,坚挺到网络读事件后,将字节读出,并打印出来。如果读取完毕,则关闭通道,关闭连接。

    服务器端运行结果 客户端运行结果

    客户端发起的连接操作是异步的,通过在多路复用器注册OP_CONNECT等待后续结果,不需要之前那样被同步阻塞。SocketChannel的读写操作都是异步的。如果没有可读写的数据不会同步等待。

    以上便是来自java.nio包的非阻塞服务器端、客户端编码的简单演示。

    相关文章

      网友评论

          本文标题:【网络编程】NIO编程

          本文链接:https://www.haomeiwen.com/subject/jmbmkdtx.html