美文网首页
Tomcat学习笔记之NIO处理分析(一)

Tomcat学习笔记之NIO处理分析(一)

作者: 夏目手札 | 来源:发表于2019-05-07 21:50 被阅读0次

    前言

    在前面[Tomcat学习笔记之启动分析(Connector)(七)]一文中,介绍了Connector容器的初始化与启动,这里以NioEndpoint为例,详细分析一下请求处理流程。

    组件结构图

    先来看一下整个Connector组件的结构图:


    Connector组件的结构图

    在之前的文章中,已经介绍了Acceptor、Poller、Worker 等核心组件的初始化过程。下面就这些核心组件一个一个来看。
    备注:这个图是Tomcat7.0版本的,所以Acceptor还在NioEndpoint中,而在9.0版本,Acceptor已经单独提出去了。

    NioEndpoint介绍

    • 主要属性
        /**
         * 线程安全的非阻塞selector池
         */
        private NioSelectorPool selectorPool = new NioSelectorPool();
        /**
         * Server socket "pointer".
         */
        private volatile ServerSocketChannel serverSock = null;
        private volatile CountDownLatch stopLatch = null;
        /**
         * PollerEvent缓存
         */
        private SynchronizedStack<PollerEvent> eventCache;
        /**
         * NioChannel缓存,每个NioChannel持有一部分Bytebuffer(一般2个,SSL有4个),
         */
        private SynchronizedStack<NioChannel> nioChannels;
    /**
         * poller线程的默认优先级
         */
        private int pollerThreadPriority = Thread.NORM_PRIORITY;
    /**
         * Poller线程数,最多2个
         */
        private int pollerThreadCount = Math.min(2, Runtime.getRuntime().availableProcessors());
    /**
         * selector.select()超时时间
         */
        private long selectorTimeout = 1000;
     /**
         * The socket pollers.
         */
        private Poller[] pollers = null;
    

    其他方法介绍后面会说明。

    Acceptor接受请求

    Acceptor在7.0版本属于NioEndpoint内部类,9.0版本引入了Nio2Endpoit,所以Acceptor被提出来了。

    public void run() {
    
            int errorDelay = 0;
    
            // 循环,知道收到shutdown命令
            while (endpoint.isRunning()) {
    
                // 如果endpoint暂停,循环
                while (endpoint.isPaused() && endpoint.isRunning()) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
    
                if (!endpoint.isRunning()) {
                    break;
                }
                state = AcceptorState.RUNNING;
    
                try {
                    //如果达到最大连接,等待
                    endpoint.countUpOrAwaitConnection();
    
                    // Endpoint might have been paused while waiting for latch
                    // If that is the case, don't accept new connections
                    if (endpoint.isPaused()) {
                        continue;
                    }
    
                    U socket = null;
                    try {
                        //Acceptor获取socket
                        socket = endpoint.serverSocketAccept();
                    } catch (Exception ioe) {
                        // We didn't get a socket
                        endpoint.countDownConnection();
                        if (endpoint.isRunning()) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;
    
                    // Configure the socket
                    if (endpoint.isRunning() && !endpoint.isPaused()) {
                        //设置socket,注册到pollerevent队列中,如果失败,关闭scoket
                        if (!endpoint.setSocketOptions(socket)) {
                            endpoint.closeSocket(socket);
                        }
                    } else {
                        endpoint.destroySocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    String msg = sm.getString("endpoint.accept.fail");
                    // APR specific.
                    // Could push this down but not sure it is worth the trouble.
                    if (t instanceof Error) {
                        Error e = (Error) t;
                        if (e.getError() == 233) {
                            // Not an error on HP-UX so log as a warning
                            // so it can be filtered out on that platform
                            // See bug 50273
                            log.warn(msg, t);
                        } else {
                            log.error(msg, t);
                        }
                    } else {
                            log.error(msg, t);
                    }
                }
            }
            state = AcceptorState.ENDED;
        }
    protected boolean setSocketOptions(SocketChannel socket) {
            // Process the connection
            try {
                //1. socket设置为非阻塞
                socket.configureBlocking(false);
                Socket sock = socket.socket();
                //2. 设置Socket参数值(从server.xml的Connector节点上获取参数值)比如Socket发送、接收的缓存大小、心跳检测等
                socketProperties.setProperties(sock);
                //3. 从NioChannel的缓存栈中取出一个NioChannel,NioChannel是SocketChannel的一个的包装类
                NioChannel channel = nioChannels.pop();
                //4. 缓存队列中没有则新建一个NioChannel,并将SocketChannel关联到从缓存队列中获取的NioChannel上来
                if (channel == null) {
                    SocketBufferHandler bufhandler = new SocketBufferHandler(
                            socketProperties.getAppReadBufSize(),
                            socketProperties.getAppWriteBufSize(),
                            socketProperties.getDirectBuffer());
                    if (isSSLEnabled()) {
                        channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                    } else {
                        channel = new NioChannel(socket, bufhandler);
                    }
                } else {
                    channel.setIOChannel(socket);
                    channel.reset();
                }
                //5. 注册到Poller中
                getPoller0().register(channel);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                try {
                    log.error(sm.getString("endpoint.socketOptionsError"), t);
                } catch (Throwable tt) {
                    ExceptionUtils.handleThrowable(tt);
                }
                // Tell to close the socket
                return false;
            }
            return true;
        }
    

    主要流程如下:

    • 获取请求;
    • 将socket设置为非阻塞的;
    • 从NioChannel的缓存栈中获取NioChannel(socket和buf的包装类),没有则新建一个。
    • 注册到Poller中。

    事件注册

          public void register(final NioChannel socket) {
                //1. 设置关联的poller
                socket.setPoller(this);
                //2. NioChannel的包装类,增加了一些控制,读写超时时间之类的
                NioSocketWrapper socketWrapper = new NioSocketWrapper(socket, NioEndpoint.this);
                socket.setSocketWrapper(socketWrapper);
                socketWrapper.setPoller(this);
                socketWrapper.setReadTimeout(getConnectionTimeout());
                socketWrapper.setWriteTimeout(getConnectionTimeout());
                socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
                socketWrapper.setSecure(isSSLEnabled());
                //3. 从pollerevent缓存栈中获取缓存,如果没有新建
                PollerEvent r = eventCache.pop();
                socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
                if (r == null) {
                    r = new PollerEvent(socket, socketWrapper, OP_REGISTER);
                } else {
                    r.reset(socket, socketWrapper, OP_REGISTER);
                }
                //4. 添加至pollerevent缓存栈
                addEvent(r);
            }
    

    主要流程:

    • 设置关联的poller;
    • NioChannel进一步包装;
    • 从缓存中获取或者新建一个pollerevent;
    • 放入pollerevent缓存栈中。

    PollerEvent流程处理

    上面说到NioChannel和NioSocketWrapper会被包装到PollerEvent,然后添加到PollerEvent队列中去,我们在这里看一下PollerEvent会做些什么:

            public void run() {
                //如果socket第一次注册到selector中,完成对socket读事件的注册
                if (interestOps == OP_REGISTER) {
                    try {
                        socket.getIOChannel().register(
                                socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                    } catch (Exception x) {
                        log.error(sm.getString("endpoint.nio.registerFail"), x);
                    }
                } else {
                    //寻找selector中的key,如果key为空,连接数减一,并且socketWrapper.close置位true;否则更新更新socket所感兴趣的事件
                    final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                    try {
                        if (key == null) {
                            // The key was cancelled (e.g. due to socket closure)
                            // and removed from the selector while it was being
                            // processed. Count down the connections at this point
                            // since it won't have been counted down when the socket
                            // closed.
                            socket.socketWrapper.getEndpoint().countDownConnection();
                            ((NioSocketWrapper) socket.socketWrapper).closed = true;
                        } else {
                            final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                            if (socketWrapper != null) {
                                // We are registering the key to start with, reset the fairness counter.
                                int ops = key.interestOps() | interestOps;
                                socketWrapper.interestOps(ops);
                                key.interestOps(ops);
                            } else {
                                socket.getPoller().cancelledKey(key);
                            }
                        }
                    } catch (CancelledKeyException ckx) {
                        try {
                            socket.getPoller().cancelledKey(key);
                        } catch (Exception ignore) {
                        }
                    }
                }
            }
    

    PollerEvent主要用来更新selector所感兴趣的事件。

    总结

    Connector初步处理请求的流程如下:


    初步流程图

    接下来看Poller的流程处理。

    相关文章

      网友评论

          本文标题:Tomcat学习笔记之NIO处理分析(一)

          本文链接:https://www.haomeiwen.com/subject/abhrnqtx.html