美文网首页
IO编程和NIO编程简介

IO编程和NIO编程简介

作者: 海晨忆 | 来源:发表于2018-02-11 20:18 被阅读16次

    个人博客:haichenyi.com。感谢关注

      传统的同步阻塞I/O通讯模型,导致的结果就是只要有一方处理数据缓慢,都会影响另外一方的处理性能。按照故障设计原则,一方的处理出现问题,不应该影响到另外一方才对。但是,在同步阻塞的模式下面,这样的情况是无法避免的,很难通过业务层去解决。既然同步无法避免,为了避免就产生了异步。Netty框架就一个完全异步非阻塞的I/O通讯方式

    同步阻塞式I/O编程

      简单的来说,传统同步阻塞的I/O通讯模式,服务器端处理的方式是,每当有一个新用户接入的时候,就new一个新的线程,一个线程只能处理一个客户端的连接,在高性能方面,并发高的情景下无法满足。伪代码如下:

    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * @author 海晨忆
     * @date 2018/2/9
     * @desc
     */
    public class SocketServer {
      private int port = 8080;
      private Socket socket = null;
    
      public SocketServer(int port) {
        this.port = port;
      }
    
      public void connect() {
        ServerSocket server = null;
        try {
          server = new ServerSocket(port);
          while (true) {
            socket = server.accept();
            new Thread(new Runnable() {
              @Override
              public void run() {
                new TimerServerHandler(socket).run();
              }
            }).start();
          }
        } catch (IOException e) {
          e.printStackTrace();
        } finally {
          //释放资源
          if (server != null) {
            try {
              server.close();
            } catch (IOException e) {
              e.printStackTrace();
            }
            server = null;
          }
        }
      }
    }
    
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.Socket;
    
    /**
     * @author 海晨忆
     * @date 2018/2/9
     * @desc
     */
    public class TimerServerHandler implements Runnable {
      private Socket socket;
    
      public TimerServerHandler(Socket socket) {
        this.socket = socket;
      }
    
      @Override
      public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
          in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
          out = new PrintWriter(this.socket.getOutputStream(), true);
          String currentTime = null;
          String body = null;
          while (true) {
            body = in.readLine();
            if (body == null)
              break;
          }
        } catch (IOException e) {
          e.printStackTrace();
          //释放in,out,socket资源
        }
      }
    }
    
    

      上面这个就是最原始的服务端IO的代码,这里我就给出的是最简化的,当有新的客户端接入的时候,服务端是怎么处理线程的,可以看出,每当有新的客户端接入的时候,总是回新创建一个线程去服务这个新的客户端

    伪异步式编程

      后来慢慢演化出一个版本“伪异步”模型,新增加一个线程池或者消息队列,满足一个线程或者多个线程满足N个客户端,通过线程池可以灵活的调用线程资源。通过设置线程池的最大值,防止海量并发接入造成的线程耗尽,它的底层实现依然是同步阻塞模型,伪代码如下:

    import com.example.zwang.mysocket.server.TimerServerHandler;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * @author 海晨忆
     * @date 2018/2/9
     * @desc
     */
    public class SocketServer {
      private int port = 8080;
      private Socket socket = null;
    
      public SocketServer(int port) {
        this.port = port;
      }
    
      private void connect() {
        ServerSocket server = null;
        try {
          server = new ServerSocket(port);
          TimeServerHandlerExecutePool executePool = new TimeServerHandlerExecutePool(50, 1000);
          while (true) {
            socket = server.accept();
            executePool.execute(new TimerServerHandler(socket));
          }
        } catch (IOException e) {
          e.printStackTrace();
        }finally {
          //释放资源
        }
      }
    }
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author 海晨忆
     * @date 2018/2/9
     * @desc
     */
    public class TimeServerHandlerExecutePool {
      private ExecutorService executor;
    
      public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
        executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize,
            120L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
      }
    
      public void execute(Runnable task) {
        executor.execute(task);
      }
    }
    

      “伪异步”的代码和传统同步的唯一区别就是在于,首先先创建了一个时间服务处理类的线程池,当有新的客户端接入的时候,先将socket请求封装成task,然后调用线程池的execute方法执行,从而避免了每一个新请求创建一个新线程。由于线程池和消息队列都是有限的,因此,无论客户端的并发量多大,它都不会导致线程个数过于大,而造成的内存溢出。相对于传统的同步阻塞,是一种改良。

      但是他没有从更本上解决同步的问题,伪异步的问题在于,他还是有一方处理出现问题还是会影响到另一方。因为:

    • 当对socket的输入流进行读取操作的时候,它会一直阻塞直到一下三种方式发生:
       1. 有数据可读
       2. 可读数据已经读取完
       3. 发生空指针或者I/O异常。
      这意味者,当读取inputstream方处理速度缓慢(不管是什么原因造成的速度缓慢),另一方会一直同步阻塞,直到这一方把数据处理完.
    • 当调用outputstream的write方法写输出流的时候,它将会被阻塞,直到所有要发送的字节全部写入完毕,或者发生异常。学过TCP/IP相关知识的人都直到,当消息的接收方处理消息缓慢,不能及时的从TCP缓冲区读取数据,这将会导致发送方的TCP缓冲区的size一直减少,直到0.缓冲区为0,那么发消息的一方将无法将消息写入缓冲区,直到缓冲区的size大于0

        通过以上。我们了解到读和写的操作都是同步阻塞的,阻塞的时间取决于对方的I/O线程的处理速度和网络I/O的传送速度。从本质上面看,我们无法保证对方的处理速度和网络传送速度。如果,我们的程序依靠与对方的处理速度,那么,他的可靠性将会非常差。

    NIO编程

      官方叫法new I/O,也就是新的IO编程,更多的人喜欢称它为:Non-block IO即非阻塞IO。
      与Socket和serverSocket类对应,NIO提供了SocketChannel和ServerSocketChannel两种不同的套接字通道实现,这两种都支持阻塞式编程和非阻塞式编程。开发人员可以根据自己的需求选择合适的编程模式。一般低负载,低并发的应用程序选择同步阻塞的方式以降低编程的复杂度。高负载,高并发的不用想了,非阻塞就是为了解决这个问题的

    1. 缓冲区Buffer
        Buffer是一个对象,它包含一些写入或者读出的数据。再NIO中加入buffer对象,体现了新库和旧库的一个重要区别。在面向流的io中,可以直接把数据读取或者写入到stream对象中。在NIO库中,所有数据操作都是通过缓冲区处理的。
      缓冲区实质上是一个数组,通常是一个字节数组(ByteBuffer),基本数据类型除了boolean没有,其他都有,如ShortBuffer,CharBuffer等等
    2. 通道Channel
        Channel是一个通道,双向通道,网络数据都是通过Channel读取,写入的。是的,没错,Channel它既可以进行读操作,也可以进行写操作。而流只能是一个方向。只能读操作或者只能写操作,而channel是全双工,读写可以同时进行。channel可以分为两大类:网络读写的SelectableChannel和文件操作的FileChannel。我们前面提到的SocketChannel和ServerSocketChannel都是SelectableChannel的子类。
    3. 多路复用器Selector
        selector多路复用器,他是java NIO编程的基础,熟练的掌握selector对于NIO编程至关重要。多路复用器提供选择已经就绪的任务的能力。简单的讲就是他会不断的轮询注册的channel,如果一个Channel发生了读写操作,这个Chnnel就会处于就绪状态,会被selector轮询出来,通过SelectorKey获取就绪Channel集合,进行后续的IO操作。一个selector对应多个Channel

        由于原生NIO编码比较麻烦和复杂,我这里就给出了思路的伪代码。下一篇我们将用NIO中的Netty框架实现Socket通信,编码简单,一行代码解决烦人粘包、拆包问题。
    /**
       * 服务端nio过程的伪代码
       *
       * @param port 端口号
       * @throws IOException IOException
       */
      private void init(int port) throws IOException {
        //第一步:打开ServerSocketChannel,用于监听客户端连接,它是所有客户端连接的父管道
        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        //第二步:监听绑定端口,设置连接模式为非阻塞模式,
        socketChannel.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"), port));
        socketChannel.configureBlocking(false);
        //第三步:创建Reactor线程,创建多路复用器,并启动线程。
        Selector selector = Selector.open();
        new Thread().start();
        //第四步:将ServerSocketChannel注册到Reactor线程的多路复用器上,监听accept事件
        SelectionKey key = socketChannel.register(selector, SelectionKey.OP_ACCEPT/*,ioHandler*/);
        //第五步:多路复用器在线程run方法的无线循环体内轮询准备就绪的key
        int num = selector.select();
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> it = selectionKeys.iterator();
        while (it.hasNext()) {
          SelectionKey next = it.next();
          //deal with io event...
        }
        //第六步:多路复用器检测到有新客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路
        SocketChannel channel = socketChannel.accept();
        //第七步:设置客户端为非阻塞模式
        channel.configureBlocking(false);
        channel.socket().setReuseAddress(true);
        //第八步:将新接入的客户端注册到reactor线程的多路复用器上,监听读操作,读取客户端发送的消息
        SelectionKey key1 = socketChannel.register(selector, SelectionKey.OP_ACCEPT/*,ioHandler*/);
        //第九步:异步读取客户端消息到缓冲区,
        /*int readNumber = channel.read("receivebuff");*/
        //第十步:对byteBuffer进行编解码,如果有半包信息指针reset,继续读取到后续的报文,将解码成功消息封装成task,投递到业务线程池,进行业务逻辑编排
        Object massage = null;
        while (buff.hasRemain()) {
          buff.mark();
          Object massage1 = decode(btyeBuffer);
          if (massage1 == null) {
            byteBuffer.reset();
            break;
          }
          massageList.add(massage1);
        }
        if (!byteBuffer.hasRemain()) {
          byteBuffer.clean();
        } else {
          byteBuffer.compact();
        }
        if (massageList != null && !massageList.isEmpty()) {
          for (Object massage3 : massageList){
            handlerTask(massage3);
          }
        }
        //第十一步:将POJO对象encode成ByteBuff,调用SocketChannel的异步write接口,将异步消息发送到客户端
        socketChannel.write(buffer);
      }
    

    结束!!!

    相关文章

      网友评论

          本文标题:IO编程和NIO编程简介

          本文链接:https://www.haomeiwen.com/subject/licybxtx.html