美文网首页
Tomcat源码分析 -- 6

Tomcat源码分析 -- 6

作者: sschrodinger | 来源:发表于2019-03-08 09:44 被阅读0次

    Tomcat源码分析 -- 6

    sschrodinger

    2019/01/09


    参考


    • 《深入剖析 Tomcat》 - 基于Tomcat 4.x
    • 《Tomcat 架构解析》刘光瑞 著
    • 《大话设计模式》程杰 著
    • Tomcat 8.5.x 源码

    tomcat 核心组件


    tomcat 使用两个核心部件处理到来的 socket 连接,分别是 ConnectorContainer。其中connector主要是处理外来的连接,并将它发往Container进行处理,最后通过connector重新发回客户端。

    核心组件

    一个 service 组件管理多个 Connector 和一个 Container。


    Connector (连接器)


    Connector 在新的 Socket 到来之后,对 Socket 的内容进行解析,解析出 Http request,并创建 request 对象和 respnse。

    Connector都遵循以下的结构处理 socket 数据。

    coyote结构

    其中,ProtocolHandler 和 Adaptor 都是 Connector 直接持有的对象。

    在 Tomcat 8 中,Connector 是继承了 LifecycleMBeanBase 的一个类,源码截取如下:

    public class Connector extends LifecycleMBeanBase  {
    
        public Connector() {
            this(null);
        }
    
        public Connector(String protocol) {
            setProtocol(protocol);
            // Instantiate protocol handler
            ProtocolHandler p = null;
            try {
                Class<?> clazz = Class.forName(protocolHandlerClassName);
                p = (ProtocolHandler) clazz.getConstructor().newInstance();
            } catch (Exception e) {
                log.error(sm.getString(
                        "coyoteConnector.protocolHandlerInstantiationFailed"), e);
            } finally {
                this.protocolHandler = p;
            }
    
            if (Globals.STRICT_SERVLET_COMPLIANCE) {
                uriCharset = StandardCharsets.ISO_8859_1;
            } else {
                uriCharset = StandardCharsets.UTF_8;
            }
        }
        
        protected String protocolHandlerClassName =
            "org.apache.coyote.http11.Http11NioProtocol";
        
        protected Service service = null;
        
        protected final ProtocolHandler protocolHandler;
        
        protected Adapter adapter = null;
        
        public Service getService() {
            return this.service;
        }
        
        @Override
        protected void initInternal() throws LifecycleException {
    
            super.initInternal();
    
            // Initialize adapter
            adapter = new CoyoteAdapter(this);
            protocolHandler.setAdapter(adapter);
    
            // Make sure parseBodyMethodsSet has a default
            if (null == parseBodyMethodsSet) {
                setParseBodyMethods(getParseBodyMethods());
            }
    
            if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
                throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
                        getProtocolHandlerClassName()));
            }
            if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
                    protocolHandler instanceof AbstractHttp11JsseProtocol) {
                AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
                        (AbstractHttp11JsseProtocol<?>) protocolHandler;
                if (jsseProtocolHandler.isSSLEnabled() &&
                        jsseProtocolHandler.getSslImplementationName() == null) {
                    // OpenSSL is compatible with the JSSE configuration, so use it if APR is available
                    jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
                }
            }
    
            try {
                protocolHandler.init();
            } catch (Exception e) {
                throw new LifecycleException(
                        sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
            }
        }
    
    
        /**
         * Begin processing requests via this Connector.
         *
         * @exception LifecycleException if a fatal startup error occurs
         */
        @Override
        protected void startInternal() throws LifecycleException {
    
            // Validate settings before starting
            if (getPort() < 0) {
                throw new LifecycleException(sm.getString(
                        "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
            }
    
            setState(LifecycleState.STARTING);
    
            try {
                protocolHandler.start();
            } catch (Exception e) {
                throw new LifecycleException(
                        sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
            }
        }
    
    
        /**
         * Terminate processing requests via this Connector.
         *
         * @exception LifecycleException if a fatal shutdown error occurs
         */
        @Override
        protected void stopInternal() throws LifecycleException {
    
            setState(LifecycleState.STOPPING);
    
            try {
                protocolHandler.stop();
            } catch (Exception e) {
                throw new LifecycleException(
                        sm.getString("coyoteConnector.protocolHandlerStopFailed"), e);
            }
        }
    
    
        @Override
        protected void destroyInternal() throws LifecycleException {
            try {
                protocolHandler.destroy();
            } catch (Exception e) {
                throw new LifecycleException(
                        sm.getString("coyoteConnector.protocolHandlerDestroyFailed"), e);
            }
    
            if (getService() != null) {
                getService().removeConnector(this);
            }
    
            super.destroyInternal();
        }
        
    

    在 server.xml 未指定的情况下,Connector 新建org.apache.coyote.http11.Http11NioProtocol处理器,并新建org.apache.catalina.connector.CoyoteAdapter适配器。适配器负责和容器沟通,处理器根据协议处理来源 socket 连接。

    ProtocolHandler 接口如下所示:

    public interface ProtocolHandler {
    
        public void setAdapter(Adapter adapter);
        public Adapter getAdapter();
    
        public Executor getExecutor();
    
        public void init() throws Exception;
    
        public void start() throws Exception;
    
        public void pause() throws Exception;
    
        public void resume() throws Exception;
    
        public void stop() throws Exception;
    
        public void destroy() throws Exception;
    
        public void closeServerSocketGraceful();
    
        public boolean isAprRequired();
    
        public boolean isSendfileSupported();
    
        public void addSslHostConfig(SSLHostConfig sslHostConfig);
        public SSLHostConfig[] findSslHostConfigs();
    
        public void addUpgradeProtocol(UpgradeProtocol upgradeProtocol);
        public UpgradeProtocol[] findUpgradeProtocols();
    }
    

    ProtocolHandler 持有一个 Adapter 对象,用于与容器进行交流。

    根据应用层协议和传输层协议不同,如应用层使用 HTTP、AJP 或者 HTTP2,传输层使用 NIO、NIO2 或者 APR 进行通信,ProtocolHandler 分别实现了Http11NioProtocolHttp2NioProtocolAJPNioProtocol等 ProtocolHandler 实例。

    ProtocolHandler 的直接实现,AbstractProtocol,通过持有 AbstractEndPoint 和 Handler ,分别处理底层的 socket 业务(socket 监听)和应用层业务(应用协议解析)。AbstractProtocol的部分代码如下:

    public abstract class AbstractProtocol<S> implements ProtocolHandler,
            MBeanRegistration {
        
        private final AbstractEndpoint<S> endpoint;
        private Handler<S> handler;
        
        public AbstractProtocol(AbstractEndpoint<S> endpoint) {
            this.endpoint = endpoint;
            setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
            setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
        }
        
        protected Adapter adapter;
        @Override
        public void setAdapter(Adapter adapter) { this.adapter = adapter; }
        @Override
        public Adapter getAdapter() { return adapter; }
        
        protected abstract Processor createProcessor();
        
        @Override
        public void init() throws Exception {
            if (getLog().isInfoEnabled()) {
                getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
            }
    
            if (oname == null) {
                // Component not pre-registered so register it
                oname = createObjectName();
                if (oname != null) {
                    Registry.getRegistry(null, null).registerComponent(this, oname, null);
                }
            }
    
            if (this.domain != null) {
                rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
                Registry.getRegistry(null, null).registerComponent(
                        getHandler().getGlobal(), rgOname, null);
            }
    
            String endpointName = getName();
            endpoint.setName(endpointName.substring(1, endpointName.length()-1));
            endpoint.setDomain(domain);
    
            endpoint.init();
        }
    
    
        @Override
        public void start() throws Exception {
            if (getLog().isInfoEnabled()) {
                getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
            }
    
            endpoint.start();
    
            // Start async timeout thread
            asyncTimeout = new AsyncTimeout();
            Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
            int priority = endpoint.getThreadPriority();
            if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
                priority = Thread.NORM_PRIORITY;
            }
            timeoutThread.setPriority(priority);
            timeoutThread.setDaemon(true);
            timeoutThread.start();
        }
    
    
        @Override
        public void pause() throws Exception {
            if (getLog().isInfoEnabled()) {
                getLog().info(sm.getString("abstractProtocolHandler.pause", getName()));
            }
    
            endpoint.pause();
        }
    
    
        @Override
        public void resume() throws Exception {
            if(getLog().isInfoEnabled()) {
                getLog().info(sm.getString("abstractProtocolHandler.resume", getName()));
            }
    
            endpoint.resume();
        }
    
    
        @Override
        public void stop() throws Exception {
            if(getLog().isInfoEnabled()) {
                getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
            }
    
            if (asyncTimeout != null) {
                asyncTimeout.stop();
            }
    
            endpoint.stop();
        }
    
    
        @Override
        public void destroy() throws Exception {
            if(getLog().isInfoEnabled()) {
                getLog().info(sm.getString("abstractProtocolHandler.destroy", getName()));
            }
    
            try {
                endpoint.destroy();
            } finally {
                if (oname != null) {
                    if (mserver == null) {
                        Registry.getRegistry(null, null).unregisterComponent(oname);
                    } else {
                        // Possibly registered with a different MBeanServer
                        try {
                            mserver.unregisterMBean(oname);
                        } catch (MBeanRegistrationException | InstanceNotFoundException e) {
                            getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed",
                                    oname, mserver));
                        }
                    }
                }
    
                if (rgOname != null) {
                    Registry.getRegistry(null, null).unregisterComponent(rgOname);
                }
            }
        }
    
    }
    

    当 AbstractProtocol 启动时,会启动 EndPoint 对 socket 进行操作。具体操作流程如下:

    image

    Endpoint 并行运行多个线程,每个线程运行一个Abstract.Accept实例。在AbstractEndpoint.Acceptor实例中监听端口通信,只要Endpoint处于运行状态,则始终监听循环。

    与Endpoint启动和Acceptor相关的代码如下:

    public abstract class AbstractEndpoint<S> {
    
        //...
    
        protected Acceptor[] acceptors;
        
        public abstract void bind() throws Exception;
        public abstract void startInternal() throws Exception;
        
        public void init() throws Exception {
            if (bindOnInit) {
                bind();
                bindState = BindState.BOUND_ON_INIT;
            }
            //...
        }
        
        public final void start() throws Exception {
            if (bindState == BindState.UNBOUND) {
                bind();
                bindState = BindState.BOUND_ON_START;
            }
            startInternal();
        }
        
        protected final void startAcceptorThreads() {
            int count = getAcceptorThreadCount();
            acceptors = new Acceptor[count];
    
            for (int i = 0; i < count; i++) {
                acceptors[i] = createAcceptor();
                String threadName = getName() + "-Acceptor-" + i;
                acceptors[i].setThreadName(threadName);
                Thread t = new Thread(acceptors[i], threadName);
                t.setPriority(getAcceptorThreadPriority());
                t.setDaemon(getDaemon());
                t.start();
            }
        }
        
        protected abstract Acceptor createAcceptor();
        
        public abstract static class Acceptor implements Runnable {
            public enum AcceptorState {
                NEW, RUNNING, PAUSED, ENDED
            }
    
            protected volatile AcceptorState state = AcceptorState.NEW;
            public final AcceptorState getState() {
                return state;
            }
    
            private String threadName;
            protected final void setThreadName(final String threadName) {
                this.threadName = threadName;
            }
            protected final String getThreadName() {
                return threadName;
            }
        }
    }
    
    public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
    
        //...
        
        private volatile ServerSocketChannel serverSock = null;
        
         @Override
        public void bind() throws Exception {
    
            if (!getUseInheritedChannel()) {
                serverSock = ServerSocketChannel.open();
                socketProperties.setProperties(serverSock.socket());
                InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
                serverSock.socket().bind(addr,getAcceptCount());
            } else {
                Channel ic = System.inheritedChannel();
                if (ic instanceof ServerSocketChannel) {
                    serverSock = (ServerSocketChannel) ic;
                }
                if (serverSock == null) {
                    throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
                }
            }
            serverSock.configureBlocking(true); //mimic APR behavior
    
            // Initialize thread count defaults for acceptor, poller
            if (acceptorThreadCount == 0) {
                // FIXME: Doesn't seem to work that well with multiple accept threads
                acceptorThreadCount = 1;
            }
            if (pollerThreadCount <= 0) {
                //minimum one poller thread
                pollerThreadCount = 1;
            }
            setStopLatch(new CountDownLatch(pollerThreadCount));
    
            // Initialize SSL if needed
            initialiseSsl();
    
            selectorPool.open();
        }
        
        @Override
        public void startInternal() throws Exception {
    
            if (!running) {
                running = true;
                paused = false;
    
                processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getProcessorCache());
                eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                socketProperties.getEventCache());
                nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getBufferPool());
    
                // Create worker collection
                if ( getExecutor() == null ) {
                    createExecutor();
                }
    
                initializeConnectionLatch();
    
                // Start poller threads
                pollers = new Poller[getPollerThreadCount()];
                for (int i=0; i<pollers.length; i++) {
                    pollers[i] = new Poller();
                    Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                    pollerThread.setPriority(threadPriority);
                    pollerThread.setDaemon(true);
                    pollerThread.start();
                }
    
                startAcceptorThreads();
            }
        }
        
        protected class Acceptor extends AbstractEndpoint.Acceptor {
    
            @Override
            public void run() {
    
                int errorDelay = 0;
    
                while (running) {
    
                    // Loop if endpoint is paused
                    //...
                    state = AcceptorState.RUNNING;
    
                    try {
                        //if we have reached max connections, wait
                        countUpOrAwaitConnection();
    
                        SocketChannel socket = null;
                        try {
                            // Accept the next incoming connection from the server
                            // socket
                            socket = serverSock.accept();
                        } catch (IOException ioe) {
                            // We didn't get a socket
                            countDownConnection();
                            //...必要的处理
                        }
                        // Successful accept, reset the error delay
                        errorDelay = 0;
    
                        // Configure the socket
                        if (running && !paused) {
                            // setSocketOptions() will hand the socket off to
                            // an appropriate processor if successful
                            if (!setSocketOptions(socket)) {
                                closeSocket(socket);
                            }
                        } else {
                            closeSocket(socket);
                        }
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        log.error(sm.getString("endpoint.accept.fail"), t);
                    }
                }
                state = AcceptorState.ENDED;
            }
            //...
        }
    
        
    }
    

    如上代码,调用函数的顺序是

    graph LR
    AbstractEndpoint.start-->NioEndpoint.startInternal
    NioEndpoint.startInternal-->NioEndpoint.Acceptor.run
    

    在 NioEndPoint 中,bind 函数负责将 serverChanel 绑定在特定端口上,并以阻塞方式运行,详见NioEndpoint.bind()函数。所有的线程都阻塞在 ServerSock.accept() 之中

    NioEndpoint.Acceptor.run() 函数中,除了最必要的处理,最重要的就是setSocketOptions(socket)函数,除了设置 socket 信息,还将 socket 发送给对应的 handler 进行处理。

    setSocketOptions 代码如下:

    rotected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
    
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket,bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }
    

    NioEndpoint 使用 Poller 管理 socket,Poller 是 NioEndpoint 新建时产生的类,一个 NioEndpoint 对象持有一个 Poller 数组,每个 Poller 有自己独立的 Selector 对象,setSocketOptions 函数通过 getPoller0 函数实现对 Poller 对象的轮询选择,并将 socket 注册到 Poller 对象中。

    Poller 部分代码如下:

    public class Poller implements Runnable {
    
        private Selector selector;
        private final SynchronizedQueue<PollerEvent> events =
                    new SynchronizedQueue<>();
    
        private volatile boolean close = false;
    
        public Poller() throws IOException {
            this.selector = Selector.open();
        }
    
        public Selector getSelector() { return selector;}
    
    
        /*
         * 将新连接的 Socket 注册到 poller 中
         */
        public void register(final NioChannel socket) {
            socket.setPoller(this);
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
            socket.setSocketWrapper(ka);
            ka.setPoller(this);
            ka.setReadTimeout(getSocketProperties().getSoTimeout());
            ka.setWriteTimeout(getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            ka.setReadTimeout(getConnectionTimeout());
            ka.setWriteTimeout(getConnectionTimeout());
            PollerEvent r = eventCache.pop();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }
    
        @Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
    
                boolean hasEvents = false;
                try {
                    if (!close) {
                        hasEvents = events();
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            //if we are here, means we have other stuff to do
                            //do a non blocking select
                            keyCount = selector.selectNow();
                        } else {
                            keyCount = selector.select(selectorTimeout);
                        }
                        wakeupCounter.set(0);
                    }
                    //...
                } catch (Throwable x) {
                    //...
                }
                //either we timed out or we woke up, //...
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (attachment == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        processKey(sk, attachment);
                    }
                }//while
    
                //process timeouts
                timeout(keyCount,hasEvents);
            }//while
    
            getStopLatch().countDown();
        }
    
        protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
            try {
                if ( close ) {
                    cancelledKey(sk);
                } else if ( sk.isValid() && attachment != null ) {
                    if (sk.isReadable() || sk.isWritable() ) {
                        if ( attachment.getSendfileData() != null ) {
                            processSendfile(sk,attachment, false);
                        } else {
                            unreg(sk, attachment, sk.readyOps());
                            boolean closeSocket = false;
                            // Read goes before write
                            if (sk.isReadable()) {
                                if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (!closeSocket && sk.isWritable()) {
                                if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (closeSocket) {
                                cancelledKey(sk);
                            }
                        }
                    }
                } else {
                    //invalid key
                    cancelledKey(sk);
                }
            } catch ( CancelledKeyException ckx ) {
                //...
            } catch (Throwable t) {
                //...
            }
        }
    }
    

    核心代码为 register 和 run。register 函数将 socket 注册到 poller 中,并提供 OP_REGISTER 的事件,run 函数通过事件运行,并对每一个连接执行processKey(sk, attachment)方法。

    当 processKey 执行到 isReadable 分支时,即如下代码段:

    if (sk.isReadable()) {
        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
            closeSocket = true;
        }
    }
    

    在 processSocket 函数之中,会调用 SocketProcessorBase 类的 doRun 方法,处理,最终 doRun 方法会调用 Endpoint 类的 Handler 方法进行处理。具体流程如下

    graph LR
    NioEndpoint.Poller.processKey-->SocketProcessorBase.run
    SocketProcessorBase.run-->SocketProcessor.doRun
    SocketProcessor.doRun-->AbstractEndpoint.Handler
    

    以上是处理 socket 监听的全过程。

    实际上,NioEndPoint 调用的是 AbstractProtocol 的Handler实现。因为在 AbstractProtocol 的子类构造函数中,将该 Handler 赋值给了 NioEndpoint。如下代码段:

    public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
        super(endpoint);
        setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
        setHandler(cHandler);
        getEndpoint().setHandler(cHandler);
    }
    

    Handler 调用 Processor 的 process 方法,最终调用 Http11Processor 的 service 方法对 socket 请求等进行解码,并组成 Request 和 Response。

    service 解码的过程如下:

    @Override
    public SocketState service(SocketWrapperBase<?> socketWrapper)
        throws IOException {
        //...
    
        while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
            try {
                //第一阶段
                if (!inputBuffer.parseRequestLine(keptAlive)) {
                    if (inputBuffer.getParsingRequestLinePhase() == -1) {
                        return SocketState.UPGRADING;
                    } else if (handleIncompleteRequestLineRead()) {
                        break;
                    }
                }
    
                if (endpoint.isPaused()) {
                    // 503 - Service unavailable
                    //...
                } else {
                    keptAlive = true;
                    request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
                    //第二阶段
                    if (!inputBuffer.parseHeaders()) {
                        // We've read part of the request, don't recycle it
                        // instead associate it with the socket
                        openSocket = true;
                        readComplete = false;
                        break;
                    }
                    if (!disableUploadTimeout) {
                        socketWrapper.setReadTimeout(connectionUploadTimeout);
                    }
                }
            } catch (IOException e) {
                //...
                break;
            } catch (Throwable t) {
                //...
                // 400 - Bad Request
                response.setStatus(400);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
    
            // Has an upgrade been requested?
            //...
    
            //第三阶段
            if (!getErrorState().isError()) {
                // Setting up filters, and parse some request headers
                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
                try {
                    prepareRequest();
                } catch (Throwable t) {
                    //...
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    //...
                }
            }
    
            if (maxKeepAliveRequests == 1) {
                keepAlive = false;
            } else if (maxKeepAliveRequests > 0 &&
                    socketWrapper.decrementKeepAlive() <= 0) {
                keepAlive = false;
            }
    
            //第四阶段
            if (!getErrorState().isError()) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    getAdapter().service(request, response);
                    if(keepAlive && !getErrorState().isError() && !isAsync() &&
                            statusDropsConnection(response.getStatus())) {
                        setErrorState(ErrorState.CLOSE_CLEAN, null);
                    }
                } catch (InterruptedIOException e) {
                setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
                } catch (HeadersTooLargeException e) {
                    log.error(sm.getString("http11processor.request.process"), e);
                    // The response should not have been committed but check it
                    // anyway to be safe
                    if (response.isCommitted()) {
                        setErrorState(ErrorState.CLOSE_NOW, e);
                    } else {
                        response.reset();
                        response.setStatus(500);
                        setErrorState(ErrorState.CLOSE_CLEAN, e);
                        response.setHeader("Connection", "close"); // TODO: Remove
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("http11processor.request.process"), t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, t);
                    getAdapter().log(request, response, 0);
                }
            }
        }
    }
    

    Processor 对 Socket 的解析分成了三个阶段:
    包括解析 http 请求行,解析 http 头,准备 request 信息,第四阶段调用 Adapter 的 service 方法,将解析的数据交由容器处理。

    默认实现的 CoyoteAdapter 实现了 Adapter 接口,并将 response 和 request 交由容器处理(invoke)方法,代码片段如下:

    @Override
    public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
                throws Exception {
        //...
        try {
            // Parse and set Catalina and configuration specific
            // request parameters
            postParseSuccess = postParseRequest(req, request, res, response);
            if (postParseSuccess) {
                //check valves if we support async
                request.setAsyncSupported(
                        connector.getService().getContainer().getPipeline().isAsyncSupported());
                // Calling the container
                connector.getService().getContainer().getPipeline().getFirst().invoke(
                       request, response);
            }
            //...
        }//...
    }
    

    Container


    Container 使用责任链的设计模式处理从 Connector 发送过来的请求,责任链用 Pipeline 维持,每一个容器维持一个Pipeline,每一个Pipeline 维持多个 Value 对象来处理连接(调用 value 的 invoke 方法)。

    实例如下:

    责任链模式

    note

    • 每一个 Container 都会给他自己持有的 Pipeline 增加一些默认 Value,以处理一些必要的信息,比如说,Wrapper 容器持有的 默认 Value,StandardWrapperValve,就实现了过滤机制,以满足servlet 编程的 Filter 的使用。

    相关文章

      网友评论

          本文标题:Tomcat源码分析 -- 6

          本文链接:https://www.haomeiwen.com/subject/xavcpqtx.html