美文网首页Tomcat
结合 tomcat 处理流程分析 Filter 的执行

结合 tomcat 处理流程分析 Filter 的执行

作者: 晴天哥_王志 | 来源:发表于2022-08-10 23:29 被阅读0次

    缘起

    • 大约3-4年前看过 Tomcat 的源码,那时候主要聚焦在 Tomcat 加载 war 包的过程;一个星期前的周末在研究拦截器 Filter 的过程中发现可以整合Tomcat的执行流程来一并分析,所以就有了这篇文章。

    • 文章的输出很大程度取决于日常的积累,时机到了就会有相应的输出,基本上没有其他更快的路径了。

    Tomcat 线程模型

    • Acceptor线程:全局唯一,负责接受请求,并将请求放入Poller线程的事件队列。Accetpr线程在分发事件的时候,采用的Round Robin的方式来分发的

    • Poller线程:官方的建议是每个处理器配一个,但不要超过两个,由于现在几乎都是多核处理器,所以一般来说都是两个。每个Poller线程各自维护一个事件队列(无上限),它的职责是从事件队列里面拿出socket,往自己的selector上注册,然后等待selector选择读写事件,并交给SocketProcessor线程去实际处理请求。

    • SocketProcessor线程池:tomcat的默认配置是250(参见server.xml里面的maxThreads),它是实际的工作线程,用于处理请求。

    Tomcat 执行流程

    tomcat 执行流程

    Tomcat 监听过程

    public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
    
        private NioSelectorPool selectorPool = new NioSelectorPool();
        private ServerSocketChannel serverSock = null;
        private Executor executor = null;
    
        public final void start() throws Exception {
            if (bindState == BindState.UNBOUND) {
                bind();
                bindState = BindState.BOUND_ON_START;
            }
            startInternal();
        }
    
        @Override
        public void bind() throws Exception {
            // 创建 serverSock 执行绑定
            serverSock = ServerSocketChannel.open();
    
            socketProperties.setProperties(serverSock.socket());
            InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
            serverSock.socket().bind(addr,getAcceptCount());
            serverSock.configureBlocking(true); 
    
            if (acceptorThreadCount == 0) {
                acceptorThreadCount = 1;
            }
            if (pollerThreadCount <= 0) {
                pollerThreadCount = 1;
            }
            setStopLatch(new CountDownLatch(pollerThreadCount));
            initialiseSsl();
            selectorPool.open();
        }
    
    
        @Override
        public void startInternal() throws Exception {
            if (!running) {
                running = true;
                paused = false;
    
                processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getProcessorCache());
                eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                socketProperties.getEventCache());
                nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getBufferPool());
    
                // 创建 Poller 执行任务的线程
                if ( getExecutor() == null ) {
                    createExecutor();
                }
    
                initializeConnectionLatch();
    
                // 开启 Poller 相关的线程
                pollers = new Poller[getPollerThreadCount()];
                for (int i=0; i<pollers.length; i++) {
                    pollers[i] = new Poller();
                    Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                    pollerThread.setPriority(threadPriority);
                    pollerThread.setDaemon(true);
                    pollerThread.start();
                }
    
                // 启动Acceptor线程
                startAcceptorThreads();
            }
        }
    
        protected final void startAcceptorThreads() {
            int count = getAcceptorThreadCount();
            acceptors = new Acceptor[count];
    
            for (int i = 0; i < count; i++) {
                acceptors[i] = createAcceptor();
                String threadName = getName() + "-Acceptor-" + i;
                acceptors[i].setThreadName(threadName);
                Thread t = new Thread(acceptors[i], threadName);
                t.setPriority(getAcceptorThreadPriority());
                t.setDaemon(getDaemon());
                t.start();
            }
        }
    
    
        public void createExecutor() {
            internalExecutor = true;
            TaskQueue taskqueue = new TaskQueue();
            TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
            executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
            taskqueue.setParent( (ThreadPoolExecutor) executor);
        }
    }
    
    • Tomcat 监听过程负责创建监听 socket 并进行 bind 操作,参见bind()方法。
    • Tomcat 负责创建Poller对象并绑定对应的Poller线程。
    • Tomcat 负责创建Acceptor对象并绑定对应的Acceptor线程。
    • Tomcat 负责创建SocketProcessor执行的线程池对象 Executor。

    Tomcat 连接处理过程

    public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
    
        private ServerSocketChannel serverSock = null;
    
        protected class Acceptor extends AbstractEndpoint.Acceptor {
            @Override
            public void run() {
                while (running) {
                    // 省略相关代码
                    state = AcceptorState.RUNNING;
                    try {
                        SocketChannel socket = null;
                        try {
                            // 获取新的链接
                            socket = serverSock.accept();
                        } catch (IOException ioe) {
                        }
    
                        if (running && !paused) {
                            // 通过setSocketOptions设置新连接的socket
                            if (!setSocketOptions(socket)) {
                                closeSocket(socket);
                            }
                        } 
                    } catch (Throwable t) {
                    }
                }
                state = AcceptorState.ENDED;
            }
        }
    
        protected boolean setSocketOptions(SocketChannel socket) {
            try {
                socket.configureBlocking(false);
                Socket sock = socket.socket();
                socketProperties.setProperties(sock);
    
                NioChannel channel = nioChannels.pop();
                if (channel == null) {
                    SocketBufferHandler bufhandler = new SocketBufferHandler(
                            socketProperties.getAppReadBufSize(),
                            socketProperties.getAppWriteBufSize(),
                            socketProperties.getDirectBuffer());
                    if (isSSLEnabled()) {
                        channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                    } else {
                        channel = new NioChannel(socket, bufhandler);
                    }
                } else {
                    channel.setIOChannel(socket);
                    channel.reset();
                }
                // 注册对应的 Poller 当中
                getPoller0().register(channel);
            } catch (Throwable t) {
                return false;
            }
            return true;
        }
    
        public Poller getPoller0() {
            int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
            return pollers[idx];
        }
    }
    
    • Acceptor线程接受请求,从nioChannels里拿出NioChannel对象。从Poller 队列中轮询拿到一个 Poller 后关联 NioChannel。
    • setSocketOptions的getPoller0().register(channel)负责注册 accept 的NioChannel到Poller中。

    Tomcat Socket 注册过程

    public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
    
        public class Poller implements Runnable {
    
            private Selector selector;
            private final SynchronizedQueue<PollerEvent> events =
                    new SynchronizedQueue<>();
            private volatile boolean close = false;
            private long nextExpiration = 0;
            private AtomicLong wakeupCounter = new AtomicLong(0);
            private volatile int keyCount = 0;
    
            public Poller() throws IOException {
                this.selector = Selector.open();
            }
    
            public void register(final NioChannel socket) {
                socket.setPoller(this);
                // 包装 socket 成 NioSocketWrapper对象进行处理
                NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
                socket.setSocketWrapper(ka);
                ka.setPoller(this);
                ka.setReadTimeout(getSocketProperties().getSoTimeout());
                ka.setWriteTimeout(getSocketProperties().getSoTimeout());
                ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
                ka.setSecure(isSSLEnabled());
                ka.setReadTimeout(getConnectionTimeout());
                ka.setWriteTimeout(getConnectionTimeout());
                PollerEvent r = eventCache.pop();
    
                // 注意对应的事件
                ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
                if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
                else r.reset(socket,ka,OP_REGISTER);
                addEvent(r);
            }
    
            private void addEvent(PollerEvent event) {
                events.offer(event);
                if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
            }
    
            public boolean events() {
                boolean result = false;
                PollerEvent pe = null;
                while ( (pe = events.poll()) != null ) {
                    result = true;
                    try {
                        pe.run();
                        pe.reset();
                        if (running && !paused) {
                            eventCache.push(pe);
                        }
                    } catch ( Throwable x ) {
                    }
                }
    
                return result;
            }
    
            public void run() {
                while (true) {
                    boolean hasEvents = false;
                    try {
                        if (!close) {
                            hasEvents = events();
                        }
                    } catch (Throwable x) {
                    }
                    // 省略代码
                }
            }
        }
    
    
        public static class PollerEvent implements Runnable {
    
            private NioChannel socket;
            private int interestOps;
            private NioSocketWrapper socketWrapper;
    
            public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
                reset(ch, w, intOps);
            }
    
            @Override
            public void run() {
                if (interestOps == OP_REGISTER) {
                    try {
                        socket.getIOChannel().register(
                                socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                    } catch (Exception x) {
                    }
                } 
            }
        }
    }
    
    • Poller实现NioChannel的register过程通过PollerEvent的创建和提交来实现。
    • Poller的 run 方法中会调用PollerEvent的 run 方法,进而实现NioChannel的注册。

    Tomcat Socket 任务处理过程

    public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
        public class Poller implements Runnable {
            @Override
            public void run() {
                while (true) {
                    // 省略代码
                    Iterator<SelectionKey> iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
    
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
    
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            iterator.remove();
                            // 处理连接请求
                            processKey(sk, attachment);
                        }
                    }
                    timeout(keyCount,hasEvents);
                }
            }
    
            protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
                try {
                    if ( close ) {
                        cancelledKey(sk);
                    } else if ( sk.isValid() && attachment != null ) {
                        if (sk.isReadable() || sk.isWritable() ) {
                            if ( attachment.getSendfileData() != null ) {
                                processSendfile(sk,attachment, false);
                            } else {
                                unreg(sk, attachment, sk.readyOps());
                                boolean closeSocket = false;
                                if (sk.isReadable()) {
                                    // 处理可读事件
                                    if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                        closeSocket = true;
                                    }
                                }
                            }
                        }
                    } 
                }  catch (Throwable t) {
                }
            }
        }
    
        public boolean processSocket(SocketWrapperBase<S> socketWrapper,
                SocketEvent event, boolean dispatch) {
            try {
                if (socketWrapper == null) {
                    return false;
                }
                // SocketProcessor对象
                SocketProcessorBase<S> sc = processorCache.pop();
                if (sc == null) {
                    sc = createSocketProcessor(socketWrapper, event);
                } else {
                    sc.reset(socketWrapper, event);
                }
                Executor executor = getExecutor();
                if (dispatch && executor != null) {
                    // 执行对应的连接请求
                    executor.execute(sc);
                } else {
                    sc.run();
                }
            } catch (RejectedExecutionException ree) {
                return false;
            } catch (Throwable t) {
                return false;
            }
            return true;
        }
    }
    
    • Poller注册NioChannel后通过 select 方法返回待处理的NioSocketWrapper对象,进而包装成SocketProcessor后提交Executor 进行处理。

    Tomcat 请求处理过程

    public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
    
       protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
    
            public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
                super(socketWrapper, event);
            }
    
            @Override
            protected void doRun() {
                NioChannel socket = socketWrapper.getSocket();
                SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
    
                try {
                    int handshake = -1;
    
                    // 省略代码
    
                    if (handshake == 0) {
                        SocketState state = SocketState.OPEN;
                        // 通过ConnectionHandler 处理请求
                        if (event == null) {
                            state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                        } else {
                            state = getHandler().process(socketWrapper, event);
                        }
                        if (state == SocketState.CLOSED) {
                            close(socket, key);
                        }
                    } 
                } catch (Throwable t) {
                } finally {
                }
            }
        }
    }
    
    • SocketProcessor作为封装任务内部仅执行ConnectionHandler的 proces 方法。

    ConnectionHandler处理过程

    public abstract class AbstractProtocol<S> implements ProtocolHandler,
            MBeanRegistration {
    
        protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {
    
            private final AbstractProtocol<S> proto;
            private final RequestGroupInfo global = new RequestGroupInfo();
            private final AtomicLong registerCount = new AtomicLong(0);
            private final Map<S,Processor> connections = new ConcurrentHashMap<>();
            private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
    
            @Override
            public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
                    // 省略相关代码
                    if (processor == null) {
                        processor = recycledProcessors.pop();
                    }
                    if (processor == null) {
                        processor = getProtocol().createProcessor();
                        register(processor);
                    }
    
                    do {
                        // 执行Http11Processor的 process 方法
                        state = processor.process(wrapper, status);
                    } while ( state == SocketState.UPGRADING);
    
                    return state;
                } catch (Throwable e) {
                } 
            }
        }
    }
    
    public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
        @Override
        protected Processor createProcessor() {
            // 创建Http11Processor对象
            Http11Processor processor = new Http11Processor(getMaxHttpHeaderSize(),
                    getAllowHostHeaderMismatch(), getRejectIllegalHeaderName(), getEndpoint(),
                    getMaxTrailerSize(), allowedTrailerHeaders, getMaxExtensionSize(),
                    getMaxSwallowSize(), httpUpgradeProtocols, getSendReasonPhrase());
            // 省略相关代码
            return processor;
        }
    }
    
    public class Http11Processor extends AbstractProcessor {
    
        public Http11Processor(int maxHttpHeaderSize, boolean allowHostHeaderMismatch,
                boolean rejectIllegalHeaderName, AbstractEndpoint<?> endpoint, int maxTrailerSize,
                Set<String> allowedTrailerHeaders, int maxExtensionSize, int maxSwallowSize,
                Map<String,UpgradeProtocol> httpUpgradeProtocols, boolean sendReasonPhrase) {
    
            super(endpoint);
            // 省略相关代码
        }
    }
    
    public abstract class AbstractProcessor extends AbstractProcessorLight implements ActionHook {
        protected Adapter adapter;
        protected final AsyncStateMachine asyncStateMachine;
        private volatile long asyncTimeout = -1;
        protected final AbstractEndpoint<?> endpoint;
        protected final Request request;
        protected final Response response;
        protected volatile SocketWrapperBase<?> socketWrapper = null;
        protected volatile SSLSupport sslSupport;
        private ErrorState errorState = ErrorState.NONE;
    
        public AbstractProcessor(AbstractEndpoint<?> endpoint) {
            this(endpoint, new Request(), new Response());
        }
    
        protected AbstractProcessor(AbstractEndpoint<?> endpoint, Request coyoteRequest,
                Response coyoteResponse) {
            this.endpoint = endpoint;
            asyncStateMachine = new AsyncStateMachine(this);
            request = coyoteRequest;
            response = coyoteResponse;
            response.setHook(this);
            request.setResponse(response);
            request.setHook(this);
        }
    }
    
    • ConnectionHandler通过Http11NioProtocol的createProcessor方法创建 Processor对象。通过recycledProcessors实现processor的重用性。
    • Http11Processor作为processor 对象,内部包含Request和Response对象。
    • Http11Processor的process方法负责处理请求逻辑。
    • Http11Processor的重用意味着每次请来请求都要重置内部Request和Response对象。

    Http11Processor 处理过程

    public abstract class AbstractProcessorLight implements Processor {
    
        private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();
        protected Adapter adapter;
        // Http11Processor内部包含Request 和 Response对象
        protected final Request request;
        protected final Response response;
    
        @Override
        public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
                throws IOException {
    
            SocketState state = SocketState.CLOSED;
            Iterator<DispatchType> dispatches = null;
            do {
                if (dispatches != null) {
                } else if (status == SocketEvent.OPEN_READ){
                    // 执行对应的逻辑
                    state = service(socketWrapper);
                } 
                // 省略相关代码
            } while (state == SocketState.ASYNC_END ||
                    dispatches != null && state != SocketState.CLOSED);
    
            return state;
        }
    }
    
    public abstract class AbstractProcessor 
           extends AbstractProcessorLight implements ActionHook {
    
        protected Adapter adapter;
        private volatile long asyncTimeout = -1;
        protected final AbstractEndpoint<?> endpoint;
        protected final Request request;
        protected final Response response;
        protected volatile SocketWrapperBase<?> socketWrapper = null;
    }
    
    public class Http11Processor extends AbstractProcessor {
    
        @Override
        public SocketState service(SocketWrapperBase<?> socketWrapper)
            throws IOException {
            // 省略相关代码
            while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
                    sendfileState == SendfileState.DONE && !endpoint.isPaused()) {
                try {
                    // 解析请求
                    if (!inputBuffer.parseRequestLine(keptAlive)) {
                        if (inputBuffer.getParsingRequestLinePhase() == -1) {
                            return SocketState.UPGRADING;
                        } else if (handleIncompleteRequestLineRead()) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                }
    
                // 解析 Request 对象
                if (!getErrorState().isError()) {
                    try {
                        prepareRequest();
                    } catch (Throwable t) {
                    }
                }
    
                if (!getErrorState().isError()) {
                    try {
                        getAdapter().service(request, response);
                    } catch (Throwable t) {
                    }
                }
    
                sendfileState = processSendfile(socketWrapper);
            }
        }
    }
    
    • Http11Processor对象能够被重用,每次处理请求都会重新初始化Request对象。
    • Http11Processor处理每次请求通过inputBuffer.parseRequestLine和prepareRequest来重新初始化Request对象。
    • Http11Processor的 service 内部调用CoyoteAdapter的service方法。

    CoyoteAdapter 执行流程

    org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
          // 5、StandardWrapperValve
          at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
          // 4、StandardContextValve
          at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
          at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478)
          // 3、StandardHostValve
          at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
          at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
          // 2、StandardEngineValve
          at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
          // 1、CoyoteAdapter
          at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
          at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
          at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
          at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
          at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
          at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
          - locked <0x19b6> (a org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
          at java.lang.Thread.run(Thread.java:748)
    
    • CoyoteAdapter的执行流程按照 Tomcat 内部对象的经典顺序进行执行,见调用栈注释。
    • 整个执行过程最终调用的是StandardWrapperValve的 invoke 方法。

    Tomcat Filter职责链

    final class StandardWrapperValve
        extends ValveBase {
    
        public StandardWrapperValve() {
            super(true);
        }
    
        @Override
        public final void invoke(Request request, Response response)
            throws IOException, ServletException {
    
            Servlet servlet = null;
    
            try {
                if (!unavailable) {
                    // wrapper是StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[].StandardWrapper
                    // 创建一个 servlet 对象
                    servlet = wrapper.allocate();
                }
            } catch (Throwable e) {
            }
    
            // 创建过滤器职责链
            ApplicationFilterChain filterChain =
                    ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
    
            try {
                if ((servlet != null) && (filterChain != null)) {
                    if (context.getSwallowOutput()) {
                        // 省略代码
                    } else {
                        if (request.isAsyncDispatching()) {
                            request.getAsyncContextInternal().doInternalDispatch();
                        } else {
                            // 通过职责链开始执行
                            filterChain.doFilter
                                (request.getRequest(), response.getResponse());
                        }
                    }
                }
            } catch (Throwable e) {
            }
    
            // 释放职责链,职责链的释放表示所有的filter 设置为 null
            if (filterChain != null) {
                filterChain.release();
            }
    
            // 回收 servlet 对象
            try {
                if (servlet != null) {
                    wrapper.deallocate(servlet);
                }
            } catch (Throwable e) {
            }
        }
    
        public HttpServletResponse getResponse() {
            if (facade == null) {
                facade = new ResponseFacade(this);
            }
            if (applicationResponse == null) {
                applicationResponse = facade;
            }
            return applicationResponse;
        }
    
        public HttpServletRequest getRequest() {
            if (facade == null) {
                facade = new RequestFacade(this);
            }
            if (applicationRequest == null) {
                applicationRequest = facade;
            }
            return applicationRequest;
        }
    }
    
    • StandardWrapperValve负责创建经典的 servlet 对象,通过wrapper.allocate创建,内部实现了 servlet 的单例模式。
    • StandardWrapperValve负责创建 FilterChain 对象,通过createFilterChain方法。
    • filterChain.doFilter方法中的request.getRequest()和response.getResponse()负责将 request 对象转换成RequestFacade对象,将 response 对象转换成ResponseFacade对象。

    Tomcat FilterChain创建和释放

    public final class ApplicationFilterChain implements FilterChain {
    
        private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];
        private int pos = 0;
        private int n = 0;
        public static final int INCREMENT = 10;
        private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];
        private Servlet servlet = null;
    }
    
    public final class ApplicationFilterFactory {
        public static ApplicationFilterChain createFilterChain(ServletRequest request,
                Wrapper wrapper, Servlet servlet) {
    
            ApplicationFilterChain filterChain = null;
    
            if (request instanceof Request) {
                Request req = (Request) request;
                if (Globals.IS_SECURITY_ENABLED) {
                    filterChain = new ApplicationFilterChain();
                } else {
                    filterChain = (ApplicationFilterChain) req.getFilterChain();
                    if (filterChain == null) {
                        filterChain = new ApplicationFilterChain();
                        req.setFilterChain(filterChain);
                    }
                }
            } else {
                filterChain = new ApplicationFilterChain();
            }
    
            // 设置职责链的filterChain内部的 sevlet 对象
            filterChain.setServlet(servlet);
            filterChain.setServletSupportsAsync(wrapper.isAsyncSupported());
    
            StandardContext context = (StandardContext) wrapper.getParent();
            FilterMap filterMaps[] = context.findFilterMaps();
    
            if ((filterMaps == null) || (filterMaps.length == 0))
                return (filterChain);
    
            DispatcherType dispatcher =
                    (DispatcherType) request.getAttribute(Globals.DISPATCHER_TYPE_ATTR);
    
            String requestPath = null;
            Object attribute = request.getAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR);
            if (attribute != null){
                requestPath = attribute.toString();
            }
    
            String servletName = wrapper.getName();
    
            // Add the relevant path-mapped filters to this filter chain
            for (int i = 0; i < filterMaps.length; i++) {
                if (!matchDispatcher(filterMaps[i] ,dispatcher)) {
                    continue;
                }
                if (!matchFiltersURL(filterMaps[i], requestPath))
                    continue;
                ApplicationFilterConfig filterConfig = (ApplicationFilterConfig)
                    context.findFilterConfig(filterMaps[i].getFilterName());
                if (filterConfig == null) {
                    // FIXME - log configuration problem
                    continue;
                }
                filterChain.addFilter(filterConfig);
            }
    
            // Add filters that match on servlet name second
            for (int i = 0; i < filterMaps.length; i++) {
                if (!matchDispatcher(filterMaps[i] ,dispatcher)) {
                    continue;
                }
                if (!matchFiltersServlet(filterMaps[i], servletName))
                    continue;
                ApplicationFilterConfig filterConfig = (ApplicationFilterConfig)
                    context.findFilterConfig(filterMaps[i].getFilterName());
                if (filterConfig == null) {
                    continue;
                }
                filterChain.addFilter(filterConfig);
            }
    
            return filterChain;
        }
    
        void addFilter(ApplicationFilterConfig filterConfig) {
    
            for(ApplicationFilterConfig filter:filters)
                if(filter==filterConfig)
                    return;
    
            if (n == filters.length) {
                ApplicationFilterConfig[] newFilters =
                    new ApplicationFilterConfig[n + INCREMENT];
                System.arraycopy(filters, 0, newFilters, 0, n);
                filters = newFilters;
            }
            filters[n++] = filterConfig;
        }
    
    
        void release() {
            for (int i = 0; i < n; i++) {
                filters[i] = null;
            }
            n = 0;
            pos = 0;
            servlet = null;
            servletSupportsAsync = false;
        }
    }
    
    • FilterChain内部包含ApplicationFilterConfig数组保存Filter 对象。
    • FilterChain每次请求开始的时候进行创建,结束的时候负责回收(参考release方法)。

    Tomcat FilterChain的执行

    public final class ApplicationFilterChain implements FilterChain {
    
        private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];
        private int pos = 0;
        private int n = 0;
        public static final int INCREMENT = 10;
        private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];
        private Servlet servlet = null;
    
        @Override
        public void doFilter(ServletRequest request, ServletResponse response)
            throws IOException, ServletException {
    
            if( Globals.IS_SECURITY_ENABLED ) {
                // 省略相关代码
            } else {
                internalDoFilter(request,response);
            }
    
        private void internalDoFilter(ServletRequest request,
                                      ServletResponse response)
            throws IOException, ServletException {
    
            // 执行职责链
            if (pos < n) {
                ApplicationFilterConfig filterConfig = filters[pos++];
                try {
                    Filter filter = filterConfig.getFilter();
                    if( Globals.IS_SECURITY_ENABLED ) {
                       // 省略代码
                    } else {
                        filter.doFilter(request, response, this);
                    }
                } catch (Throwable e) {
                }
                return;
            }
    
            // 执行 servlet 本身的服务
            try {
                if ((request instanceof HttpServletRequest) &&
                        (response instanceof HttpServletResponse) &&
                        Globals.IS_SECURITY_ENABLED ) {
                   // 省略相关代码
                } else {
                    // 执行 serlvet 的服务,默认是DispatcherServlet对象
                    servlet.service(request, response);
                }
            } catch (Throwable e) {
            } finally {
            }
        }
     }
    
    • 职责链的执行有两种视角,一种是以上帝视角外部调用 for 循环执行,一种是每个 Filter 自驱执行并触发下一个 Filter,FilterChain选择的是后者。
    • FilterChain的执行内部通过 pos 和 n 的位置标记判断是否执行结束。
    public class TimeFilter implements Filter {
        @Override
        public void init(FilterConfig filterConfig) throws ServletException {
            System.out.println("time filter init");
        }
    
        @Override
        public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
            System.out.println("time filter start");
            long startTime = System.currentTimeMillis();
            // filter内部需要调用filterChain继续执行下一个Filter
            filterChain.doFilter(servletRequest, servletResponse);
    
            long endTime = System.currentTimeMillis();
            System.out.println("time filter consume " + (endTime - startTime) + " ms");
            System.out.println("time filter end");
        }
    }
    
    • Tomcat 的 Filter 是通过 filter 对象内部执行filterChain.doFilter来驱动下一个Filter对象的执行。

    Tomcat Servlet 的创建

    public class StandardWrapper extends ContainerBase
        implements ServletConfig, Wrapper, NotificationEmitter {
    
        @Override
        public Servlet allocate() throws ServletException {
            boolean newInstance = false;
    
            // singleThreadModel = false
            if (!singleThreadModel) {
                // instance = DispatcherServlet
                if (instance == null || !instanceInitialized) {
                    synchronized (this) {
                        if (instance == null) {
                            try {
                                instance = loadServlet();
                                newInstance = true;
                                if (!singleThreadModel) {
                                    countAllocated.incrementAndGet();
                                }
                            } catch (ServletException e) {
                                throw e;
                            } catch (Throwable e) {
                            }
                        }
                        if (!instanceInitialized) {
                            initServlet(instance);
                        }
                    }
                }
    
                if (singleThreadModel) {
                    if (newInstance) {
                        synchronized (instancePool) {
                            instancePool.push(instance);
                            nInstances++;
                        }
                    }
                } else {
                    if (!newInstance) {
                        countAllocated.incrementAndGet();
                    }
                    // 最终返回的单例
                    return instance;
                }
            }
    
            synchronized (instancePool) {
                while (countAllocated.get() >= nInstances) {
                    if (nInstances < maxInstances) {
                        try {
                            instancePool.push(loadServlet());
                            nInstances++;
                        } catch (ServletException e) {
                        } catch (Throwable e) {
                        }
                    } else {
                        try {
                            instancePool.wait();
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }
                }
                countAllocated.incrementAndGet();
                return instancePool.pop();
            }
        }
    }
    
    • Servlet属于单例实例。

    Spring的DispatcherServlet

    public abstract class HttpServlet extends GenericServlet {
    
        private static final long serialVersionUID = 1L;
        private static final String METHOD_DELETE = "DELETE";
        private static final String METHOD_HEAD = "HEAD";
        private static final String METHOD_GET = "GET";
        private static final String METHOD_OPTIONS = "OPTIONS";
        private static final String METHOD_POST = "POST";
        private static final String METHOD_PUT = "PUT";
        private static final String METHOD_TRACE = "TRACE";
        private static final String HEADER_IFMODSINCE = "If-Modified-Since";
        private static final String HEADER_LASTMOD = "Last-Modified";
        private static final String LSTRING_FILE =
            "javax.servlet.http.LocalStrings";
        private static final ResourceBundle lStrings =
            ResourceBundle.getBundle(LSTRING_FILE);
    
        public HttpServlet() {
        }
    
        protected void doGet(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException
        {
        }
    
        protected void doHead(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException {
        }
    
        protected void doPost(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException {
        }
    
        protected void doPut(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException {
        }
    
        protected void doDelete(HttpServletRequest req,
                                HttpServletResponse resp)
            throws ServletException, IOException {
        }
    
    
        @Override
        public void service(ServletRequest req, ServletResponse res)
            throws ServletException, IOException {
    
            HttpServletRequest  request;  // RequestFacade
            HttpServletResponse response;  // ResponseFacade
    
            try {
                request = (HttpServletRequest) req;
                response = (HttpServletResponse) res;
            } catch (ClassCastException e) {
                throw new ServletException("non-HTTP request or response");
            }
            service(request, response);
        }
    }
    
    
    public abstract class HttpServlet extends GenericServlet {
    
        protected void service(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException {
            String method = req.getMethod();
    
            if (method.equals(METHOD_GET)) {
                long lastModified = getLastModified(req);
                if (lastModified == -1) {
                    doGet(req, resp);
                } else {
                    long ifModifiedSince;
                    try {
                        ifModifiedSince = req.getDateHeader(HEADER_IFMODSINCE);
                    } catch (IllegalArgumentException iae) {
                        ifModifiedSince = -1;
                    }
                    if (ifModifiedSince < (lastModified / 1000 * 1000)) {
                        maybeSetLastModified(resp, lastModified);
                        doGet(req, resp);
                    } else {
                        resp.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
                    }
                }
            } else if (method.equals(METHOD_HEAD)) {
                long lastModified = getLastModified(req);
                maybeSetLastModified(resp, lastModified);
                doHead(req, resp);
            } else if (method.equals(METHOD_POST)) {
                doPost(req, resp);
            } else if (method.equals(METHOD_PUT)) {
                doPut(req, resp);
            } else if (method.equals(METHOD_DELETE)) {
                doDelete(req, resp);
            } else if (method.equals(METHOD_OPTIONS)) {
                doOptions(req,resp);
            } else if (method.equals(METHOD_TRACE)) {
                doTrace(req,resp);
            } else {
            }
        }
    }
    
    • DispatcherServlet内部通过 method 区分不同的逻辑执行。
    public class DispatcherServlet extends FrameworkServlet {
    
        protected void doService(HttpServletRequest request, HttpServletResponse response) throws Exception {
            
            // 省略代码
            try {
                this.doDispatch(request, response);
            } finally {
            }
        }
    
        protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
            HttpServletRequest processedRequest = request;
            HandlerExecutionChain mappedHandler = null;
            boolean multipartRequestParsed = false;
            WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
    
            try {
                try {
                    ModelAndView mv = null;
                    Object dispatchException = null;
    
                    try {
                        processedRequest = this.checkMultipart(request);
                        multipartRequestParsed = processedRequest != request;
                        mappedHandler = this.getHandler(processedRequest);
    
                        HandlerAdapter ha = this.getHandlerAdapter(mappedHandler.getHandler());
                        String method = request.getMethod();
                        // ha = RequestMappingHandlerAdapter,对应的执行对象
                        mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
                        if (asyncManager.isConcurrentHandlingStarted()) {
                            return;
                        }
    
                        this.applyDefaultViewName(processedRequest, mv);
                        mappedHandler.applyPostHandle(processedRequest, response, mv);
                    } catch (Exception var20) {
                    } catch (Throwable var21) {
                    }
    
                    this.processDispatchResult(processedRequest, response, mappedHandler, mv, (Exception)dispatchException);
                } catch (Exception var22) {
                } catch (Throwable var23) {
                }
            } finally {
            }
        }
    }
    
    • DispatcherServlet的doDispatch通过对应的mappedHandler执行对应的逻辑
    public class InvocableHandlerMethod extends HandlerMethod {
    
        private Object[] getMethodArgumentValues(NativeWebRequest request, ModelAndViewContainer mavContainer,
                Object... providedArgs) throws Exception {
    
            MethodParameter[] parameters = getMethodParameters();
            Object[] args = new Object[parameters.length];
            for (int i = 0; i < parameters.length; i++) {
                MethodParameter parameter = parameters[i];
                parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
                args[i] = resolveProvidedArgument(parameter, providedArgs);
                if (args[i] != null) {
                    continue;
                }
                if (this.argumentResolvers.supportsParameter(parameter)) {
                    try {
                        args[i] = this.argumentResolvers.resolveArgument(
                                parameter, mavContainer, request, this.dataBinderFactory);
                        continue;
                    }
                    catch (Exception ex) {
                    }
                }
            }
            return args;
        }
    }
    
    public class HandlerMethodArgumentResolverComposite implements HandlerMethodArgumentResolver {
    
        private final List<HandlerMethodArgumentResolver> argumentResolvers =
                new LinkedList<HandlerMethodArgumentResolver>();
        private final Map<MethodParameter, HandlerMethodArgumentResolver> argumentResolverCache =
                new ConcurrentHashMap<MethodParameter, HandlerMethodArgumentResolver>(256);
    
    
        private HandlerMethodArgumentResolver getArgumentResolver(MethodParameter parameter) {
            HandlerMethodArgumentResolver result = this.argumentResolverCache.get(parameter);
            if (result == null) {
                for (HandlerMethodArgumentResolver methodArgumentResolver : this.argumentResolvers) {
                    if (methodArgumentResolver.supportsParameter(parameter)) {
                        result = methodArgumentResolver;
                        this.argumentResolverCache.put(parameter, result);
                        break;
                    }
                }
            }
            return result;
        }
    }
    
    • DispatcherServlet通过InvocableHandlerMethod类实现不同协议对应的请求参数的解析。

    参考

    相关文章

      网友评论

        本文标题:结合 tomcat 处理流程分析 Filter 的执行

        本文链接:https://www.haomeiwen.com/subject/ddwbwrtx.html