美文网首页
Tomcat 源码分析 NIO (基于8.0.5)

Tomcat 源码分析 NIO (基于8.0.5)

作者: 爱吃鱼的KK | 来源:发表于2017-08-03 23:21 被阅读497次
    1. Tomcat NIO 概述

    Tomcat 8.x.x 默认的请求处理都是 NIO, 据说以前处理都是 BIO (PS: 两者的区别: 一个是从IO设备读取数据到内核内存, 再从内核内存copy到用户内存, 另一个是 通过 select 来轮训注册是事件, 而且数据已经从 IO 设备读取到了 内核内存中), 而NIO对比BIO最大的好处就是少一步从IO设备读取数据 + 程序可以用更少的线程处理更多的请求, 尤其是在开启 KeepAlive 的情况下

    2. Tomcat NIO 组件

    我们先来看一下在 NIO 模型下, Connector 的组件:

    1. Acceptor             : 监听指定的端口, 将接收到的 socket 封装成 NioChannel 丢给 Poller 线程来进行处理
    2. NioChannel           : SocketChannel 的一个包装类, 给 SocketChannel 增加了一个属性, 并且代理其做了一个方法
    3. PollerEvent          : 其的作用就是在 Poller.Selector 上异步注册 OP_READ 事件
    4. SynchronizedQueue    : Poller 每次运行都会执行里面的 PollerEvent 事件, 进行SocketChannel注册 selector
    5. Poller               : 从 SynchronizedQueue 里面 poll 出PollerEvent(在 Selector 上注册读数据的时间)事件, 并进行通过 selector.select 来轮训注册的读写事件
    6. SocketProcessor      : Tomcat 执行 Socket 请求处理的执行单元
    

    本质上就是一个线程在端口上等待接收请求, 开启几个 Poller 线程(每个Poller线程有一个归属的 Selector), 通过 round robin 的方式来分派 SocketChannel , 并且注册对应的 OP_READ 事件到其 Selector 上, 并且进行 SocketChannel 后续的读写处理

    3. Tomcat NIO 请求

    先来看一下下面一张 UML 图:


    Tomcat_nio.png

    整个流程就是开启一个Acceptor线程来接收请求, 2个Poller线程(PS: 每个线程管理一个 Selector) 来处理读写事件, 最终真正的逻辑处理交给 Executor 来处理

    4. Tomcat NIO 读取数据

    先来看一张HTTP协议的结构图


    image.png

    在Http头部中, 每行都是通过 /r/n 字符来进行分割, 而header 与 body 之间也是通过一个单独的一行(这一行里面只有 /r/n 字符)来进行分割;

    Tomcat NIO 读取数据主要在 InternalNioInputBuffer.parseRequestLine() 与 InternalNioInputBuffer.parseHeaders(); 这里对 HTTP header 数据的解析也是通过判断是否有单独一行数据是 /r/n 来进行判断, 不然就进行调用 InternalNioInputBuffer.fill(boolean timeout, boolean block) 来进行再次读取(为什么要再次读取呢? 主要还是因为TCP底层在发送数据包时 不一定一下子将数据发送过来), 其实这一步也就是 IM 服务中的粘包

    5. Tomcat NIO 写数据

    写数据是通过 org.apache.catalina.connector.Response.finishResponse() 来进行触发的(具体写入的步骤与 BIO 差不对, 可参考 Tomcat 源码分析 一次完整请求), 我这里来点与 BIO 不同的; 先看一下 Tomcat官方中的一张图:

    image.png
    (PS: 图片地址)

    其中指明了 在NIO模式下, Response 的写入是 Blocking 的, 而我们在通过 SocketChannel 进行写数据时有可能一次不能完全写完, 那Tomcat是这么做的呢? 直接看 NioBlockingSelector.write 方法

    try {
        while ( (!timedout) && buf.hasRemaining()) {                                    // 1. 检查数据是否写完, 写操作是否超时
            if (keycount > 0) { //only write if we were registered for a write
                int cnt = socket.write(buf); //write the data                           // 2. 进行写操作
                if (cnt == -1)                                                          // 3. 写操作失败, 直接报异常 (有可能对方已经关闭 socket)
                    throw new EOFException();
                written += cnt;                                                         // 4. 累加 已经写的数据总和
                if (cnt > 0) {                                                          // 5. 写数据成功, continue 再次写数据
                    time = System.currentTimeMillis(); //reset our timeout timer
                    continue; //we successfully wrote, try again without a selector
                }
            }
            try {                                                                       // 6. 写入不成功 (cnt == 0)
                if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
                poller.add(att,SelectionKey.OP_WRITE,reference);                        // 7. 通过 BlockPoller 线程将 SocketChannel 的 OP_WRITE 事件 注册到 NioSelectorPool 中的 selector 上
                if (writeTimeout < 0) {                                                 // 8. CountDownLatch 进行不限时的等到 OP_WRITE 事件
                    att.awaitWriteLatch(Long.MAX_VALUE,TimeUnit.MILLISECONDS);
                } else {
                    att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);            // 9. CountDownLatch 进行限时的等到 OP_WRITE 事件
                }
            } catch (InterruptedException ignore) {
                // Ignore
            }
            if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) {      // 10. 若  CountDownLatch 是被线程 interrupt 唤醒的, 将 keycount 置为 0 (CountDownLatch被  Interrupt 的标识就是程序能继续向下执行, 但里面的 statue > 0)
                //we got interrupted, but we haven't received notification from the poller.
                keycount = 0;                                                          // 11. keycount 变成 0, 则在第一次进入 loop 时不会接着写数据, 因为这时还没有真正的 OP_WRITE 事件过来
            }else {
                //latch countdown has happened
                keycount = 1;
                att.resetWriteLatch();                                                 // 12. OP_WRITE 事件过来了, 重置 CountDownLatch 里面的技术支持
            }
    
            if (writeTimeout > 0 && (keycount == 0))
                timedout = (System.currentTimeMillis() - time) >= writeTimeout;        // 13. 判断是否写超时
        } //while
        if (timedout)
            throw new SocketTimeoutException();                                        // 14. 若是写超时的话, 则直接抛异常
    } finally {
        poller.remove(att,SelectionKey.OP_WRITE);                                      // 15. Tomcat 写数据到客户端成功, 移除 SocketChannel 对应的 OP_WRITE 事件
        if (timedout && reference.key!=null) {
            poller.cancelKey(reference.key);
        }
        reference.key = null;
        keyReferenceStack.push(reference);
    }
    

    通过代码我们知道, 其实就是在 SocketChannel.write 数据的个数是0, 则将 SocketChannel的OP_WRITE事件注册到 Selector 上(这里的 selector 是通过 NioSelectorPool 获取的, 有单例, 也有对象池), 再通过一个 CountDownLatch 来进行阻塞, 直到 NioSelectorPool.selector 通知其有对应的 OP_WRITE 事件;
    问题来了, 这里怎么又有个 NioSelectorPool, 我们明明可以注册到 Poller 中的 selector 上, 干嘛还要注册到 NioSelectorPool.selector 上?
    补充知识:

    selector 内部有3 个SelectionKeys 集合
    1. publicKeys               : 所有注册的 SelectionKeys (PS: 包括部分取消的SelectionKeys)        
        (通过 selector.keys() 来获取)
    2. publicSelectedKeys       :通过底层select获取到的有触发的 SelectionKeys 的集合 
        (通过 selector.selectedKeys() 来获取)
    3. cancelledKeys            : SelectionKey.cancel 来触发加入这个集合中, 或调用 SocketChannel.close() 也行
    
    4. 调用 selector.select 或 selector.register 都会阻塞 publicKeys (通过 Synchronized 关键字, 见代码 [SelectorImpl](http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b27/sun/nio/ch/SelectorImpl.java?av=f))
    

    上面的第4点其实已经说明 select 或 register 都会通过 Synchronized 来进行阻塞操作, 所以这里也就是增加 selector 来减少 对 selector 操作的并发度

    6. Tomcat NIO 与 BIO 对比之 KeepAlive
    开启 KeepAlive 功能下
    
    NIOEndPoint.SocketProcessor.doRun(SelectionKey key, KeyAttachment ka) 方法下的 finally 中
        getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
        在处理好请求后, 只要再将 SocketChannel 再次注册到Selector上就可以
    
    JioEndPoint.SocketProcessor.doRun 方法下的 finally 中
        getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
        在处理好请求后, 将 socket 封装出 SocketProcessor 来接着处理请求
    

    可以看出, BIO对比NIO 在KeepAlive 的情况下, 需要开启更多的线程处理 socket, 从而使得系统的压力更大(PS: Http 的 KeepAlive 是默认开启的, 就因为这个 KeepAlive 的开启从而使得 NIO 相对于 BIO 在同等硬件资源下 更能并发处理请求)

    7. 总结:

    Reactor 的线程模型其实大多数开源项目都是差不多的(主要区别在是否开启多个 Selector ), 而这里只是对 NIO 与 BIO 做了一个简单的对比, 随着代码的深入, 越发觉得 要正真掌握 NIO 是需要 深入理解 TCP/IP + Unix 网络编程(PS: 至于粘包拆包 + NIO CPU 100% 的bug 好像 Tomcat 没做对应的修复, 这两步其实在 Netty 里面已经做了, 可以参照 Netty 的代码获知)

    8. 参考

    Tomcat源码分析之:ServletOutputStream的实现
    Tomcat源码阅读之底层IO封装(1)InternalNioInputBuffer的分析
    Tomcat 7.0 原理与源码分析
    Tomcat 内核设计剖析
    Tomcat 架构解析

    相关文章

      网友评论

          本文标题:Tomcat 源码分析 NIO (基于8.0.5)

          本文链接:https://www.haomeiwen.com/subject/mpxslxtx.html