美文网首页
从0基础的I/O到Netty的入门(2)

从0基础的I/O到Netty的入门(2)

作者: 扁圆柱体 | 来源:发表于2018-06-26 16:54 被阅读15次

阻塞和非阻塞的几点区别

在开始了解NIO之前,先看阻塞和非阻塞IO的几点区别

缓冲区

NIO中有一个缓冲区(以下统一称buffer)的概念,不论是读信息还是写信息,都是在buffer上进行操作;相比之下,BIO使用的是流(以下统一称stream)的概念,stream必须分读和写两种,通过装饰器模式生成stream后,数据直接写入stream或从stream中读取。

所以NIO是面向buffer的,而BIO是面向stream的;但socket总是面向stream的。

通道

NIO使用通道(以下统一称channel)来建立客户端和服务端之间的联系;BIO依然是stream(也就是说BIO没有区分管道和数据的不同)。Channel是双向的,既可以写也可以读,而stream还是必须分读或者写。

这里特别注意,由于channel是双向的,后面注册到多路复用器时,客户端channel(也就是SocketChannel对象)是作为READ类型注册,但其实也可以写操作。

Channel的类继承结构比较复杂,如下图所示。其中

  1. InterruptibleChannel是一个能够异步的关闭和捕获的channel(接口);
  2. SelectableChannel是一个能够被多路复用器选择的channel(抽象类);
  3. ServerSocketChannel是一个基于stream的监听socket的可被选择的channel;
  4. SocketChannel是一个基于stream的连接socket的可被选择的channel。

注:#3是一个服务于监听socket的channel,#4是一个服务于连接socket的channel。

Channel的类继承结构

整体机制

个人理解,非阻塞IO的核心,是采用了一种和BIO完全不同的机制。服务端生成一个监听socket,以及客户端生成一个连接socket,都会新new一个线程(因为只需要一个线程即可,所以不用线程池),这个线程将当前的channel注册到一个地方。通过客户端或服务端的该线程(注意是独立的)的轮询,检查注册的socket,哪个socket准备好,比如可以读到数据或者可以连接,就做相应的操作。

这样做的好处是服务端或客户端都不会产生阻塞,由独立线程注册和处理监听事件,而原来的进程就不会一直等待,可以继续其它的操作,等待ready再由独立线程执行相关操作。

独立线程中的注册点称为多路复用器(以下统一称Selector),通过轮询注册其上的SelectionKey,查看当前连接是否可用,并做相关操作。所以Selector是NIO的核心。

NIO - Server端的实现

以下通过代码分析NIO对于同样业务逻辑的实现

Server的创建

try {
    // selector是一个多路复用器实例
    selector = Selector.open();
    // servChannel是一个ServerSocketChannel对象,代表server通道
    servChannel = ServerSocketChannel.open();
    servChannel.configureBlocking(false);
    // 服务器channel启动一个socket绑定端口
    servChannel.socket().bind(new InetSocketAddress(port), 1024);
    // 这个服务端socket通道注册到selector上,监听可接受连接事件(这步最为关键!!!)
    servChannel.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
    e.printStackTrace();
    System.exit(1);
}

Server端的Selector的多路选择算法

这是一个单独的线程,实际是一个Selector的运行内容。其核心是获取选择器上所有的注册的key,看key的状态并做相应的操作。

while (!stop) {
    try {
        // 循环前休眠1秒
        selector.select(1000);
        // 获取所有注册到该selector上的key
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> it = selectedKeys.iterator();
        SelectionKey key = null;
        // 轮询所有的key
        while (it.hasNext()) {
            key = it.next();
            it.remove();
            try {
                // 根据key的类型做不同的处理
                handleInput(key);
            } catch (Exception e) {
                // ...
            }
        }
    } catch (Throwable t) {
        t.printStackTrace();
    }
}

以下为handleInput的处理算法。

private void handleInput(SelectionKey key) throws IOException {
    if (key.isValid()) {
        // 该方法测试这个key的channel是否已经准备好接收新的socket连接
        if (key.isAcceptable()) {
            // 接受新的连接,注意此处key的channel一定是一个服务端socket通道
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            // 接受后会返回当前和server通信的客户端socket通道
            // 注意此处的channel也是服务端角度获取的客户端的socket,会从此channel获得客户端的request
            SocketChannel sc = ssc.accept();
            sc.configureBlocking(false);
            // 将这个客户端socket通道注册到selector上,并监听可读事件
            sc.register(selector, SelectionKey.OP_READ);
        }
        // 该方法测试这个key的channel是否已经可读
        if (key.isReadable()) {
            // 读取数据,注意此处key的channel是从服务端角度获取的客户端的socket
            // 从下面的代码可以看出,从这个通道读取的内容,是客户端发给服务端的request
            // 注意此处一定要理解清楚!!!
            SocketChannel sc = (SocketChannel) key.channel();
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            int readBytes = sc.read(readBuffer);
            // 读取结果分3种情况,只有大于0表示读取到信息
            if (readBytes > 0) {
                // 缓冲区的limit设为0,相当于一次性读所有的内容
                readBuffer.flip();
                byte[] bytes = new byte[readBuffer.remaining()];
                readBuffer.get(bytes);
                String body = new String(bytes, "UTF-8");
                // 转码后获取request,即body的内容
                System.out.println("The time server receive order : " + body);
                String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
                // doWrite(sc, currentTime);
                // doWrite方法用于将结果作为response返回给客户端,直接给出该方法内容
                // doWrite method start
                if (currentTime != null && currentTime.trim().length() > 0) {
                    byte[] bytes = currentTime.getBytes();
                    // 声明buffer并放入信息
                    ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                    writeBuffer.put(bytes);
                    // 设置limit为0,相当于一次性写所有的内容
                    writeBuffer.flip();
                    // 注意SocketChannel是既可以读,也可以写的
                    sc.write(writeBuffer);
                }
                // doWrite method end
            } else if (readBytes < 0) {
                // 对端链路关闭
                key.cancel();
                sc.close();
            } else
                ; // 读到0字节,忽略
        }
    }
}

NIO - Client端的实现

Client的创建

try {
    // 创建多路复用器
    selector = Selector.open();
    // 创建SocketChannel
    socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
    // 这个socket还没有连接,所以不需要注册到selector上
} catch (IOException e) {
    e.printStackTrace();
    System.exit(1);
}

Client连接Server

try {
    // doConnect(); 以下直接给出
    // doConnect method start
    // 尝试连接服务端,如果成功返回true
    if (socketChannel.connect(new InetSocketAddress(host, port))) {
        // 将这个客户端socket通道注册到selector上,并监听可读事件
        socketChannel.register(selector, SelectionKey.OP_READ);
        // doWrite(socketChannel); 以下直接给出
        // doWrite method start
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        // 构造request,并写入缓冲区
        socketChannel.write(writeBuffer);
        if (!writeBuffer.hasRemaining()){
            System.out.println("Send order 2 server succeed.");
        }
        // doWrite method end
    } else {
        // 没有连接成功,并不是连接失败,只是没有获得握手信息
        // 将这个客户端socket通道注册到selector上,并监听连接事件,当服务器应答后,即可继续操作
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
    }
    // doConnect method end
} catch (IOException e) {
    e.printStackTrace();
    System.exit(1);
}

Client端的Selector的多路选择算法

在连接之后,该线程中的selector采用和server同样的策略,轮询SelectionKey,并在handleInput中进行处理,如下。

private void handleInput(SelectionKey key) throws IOException {
    if (key.isValid()) {
        // 同server端的检查
        SocketChannel sc = (SocketChannel) key.channel();
        // 此处即对应connect方法返回false的情况(监听connect事件)
        if (key.isConnectable()) {
            // true表示握手过程完成
            if (sc.finishConnect()) {
                // 重新注册,监听读事件
                sc.register(selector, SelectionKey.OP_READ);
                doWrite(sc);
            } else
                System.exit(1);// 连接失败,进程退出
        }
        if (key.isReadable()) {
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            // 通过读缓冲区,获取服务端的response,同样是3中情况,大于0为读到信息
            int readBytes = sc.read(readBuffer);
            if (readBytes > 0) {
                readBuffer.flip();
                byte[] bytes = new byte[readBuffer.remaining()];
                readBuffer.get(bytes);
                String body = new String(bytes, "UTF-8");
                System.out.println("Now is : " + body);
                // 此处会通知selector的轮询结束
                this.stop = true;
            } else if (readBytes < 0) {
                // 对端链路关闭
                key.cancel();
                sc.close();
            } else
                ; // 读到0字节,忽略
        }
    }
}

【未完待续,下一部分开始Netty的实现及针对4.x的改造】

相关文章

网友评论

      本文标题:从0基础的I/O到Netty的入门(2)

      本文链接:https://www.haomeiwen.com/subject/dscmyftx.html