美文网首页Java 杂谈JavavaJJava
Tomcat 启动及请求处理过程

Tomcat 启动及请求处理过程

作者: iHelin | 来源:发表于2019-07-24 12:35 被阅读0次

    本文使用Spring Boot提供的嵌入式Tomcat为例来分析Tomcat的启动过程、线程池管理及请求过程。

    org.apache.tomcat.util.net.NioEndpoint#startInternal开始说起:

        @Override
        public void startInternal() throws Exception {
    
            if (!running) {
                running = true;
                paused = false;
    
                processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getProcessorCache());
                eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                socketProperties.getEventCache());
                nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getBufferPool());
    
                // Create worker collection
                if ( getExecutor() == null ) {
                    createExecutor();
                }
    
                initializeConnectionLatch();
    
                // Start poller threads
                pollers = new Poller[getPollerThreadCount()];
                for (int i=0; i<pollers.length; i++) {
                    pollers[i] = new Poller();
                    Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                    pollerThread.setPriority(threadPriority);
                    pollerThread.setDaemon(true);
                    pollerThread.start();
                }
    
                startAcceptorThreads();
            }
        }
    

    这里我们关注下创建Executor的代码,通过条件判断是否存在Executor,不存在则执行createExecutor()方法。

        public void createExecutor() {
            internalExecutor = true;
            TaskQueue taskqueue = new TaskQueue();
            TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
            executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
            taskqueue.setParent( (ThreadPoolExecutor) executor);
        }
    

    TaskThreadFactory是一个ThreadFactory的实现,里面定义了工作线程的线程名前缀和线程优先级,然后实例化ThreadPoolExecutor,这里的ThreadPoolExecutor其实是Tomcat自己的实现,全称为org.apache.tomcat.util.threads.ThreadPoolExecutor,其继承自juc的java.util.concurrent.ThreadPoolExecutor,我们可以理解为tomcat的ThreadPoolExecutor对juc提供的ThreadPoolExecutor进行了一层包装,最终返回一个Executor实例,这里创建Executor的工作就结束了。
    值得注意的是在new ThreadPoolExecutor的时候就会创建核心线程,因为这个是通过TaskThreadFactory来创建线程的,所以我们可以稍微关注下它里面创建线程的方法:

        @Override
        public Thread newThread(Runnable r) {
            TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
            t.setDaemon(daemon);
            t.setPriority(threadPriority);
    
            // Set the context class loader of newly created threads to be the class
            // loader that loaded this factory. This avoids retaining references to
            // web application class loaders and similar.
            if (Constants.IS_SECURITY_ENABLED) {
                PrivilegedAction<Void> pa = new PrivilegedSetTccl(
                        t, getClass().getClassLoader());
                AccessController.doPrivileged(pa);
            } else {
                t.setContextClassLoader(getClass().getClassLoader());
            }
    
            return t;
        }
    

    可以看到这里已经指定了线程的名称,形如http-nio-8080-exec-3
    回到startInternal方法,接着便是创建pollers并启动(线程),pollers的大小默认为2和处理器核数的最小值,创建Poller的代码如下:

            public Poller() throws IOException {
                this.selector = Selector.open();
            }
    

    这里说明下Poller实现了Runnable接口,且是NioEndpoint的一个内部类。

        /**
         * Poller class.
         */
        public class Poller implements Runnable {
            ...
        }
    

    startInternal方法的介绍就告一段落,接着来看看Pollerrun方法:

            @Override
            public void run() {
                // Loop until destroy() is called
                while (true) {
    
                    boolean hasEvents = false;
    
                    try {
                        if (!close) {
                            hasEvents = events();
                            if (wakeupCounter.getAndSet(-1) > 0) {
                                //if we are here, means we have other stuff to do
                                //do a non blocking select
                                keyCount = selector.selectNow();
                            } else {
                                keyCount = selector.select(selectorTimeout);
                            }
                            wakeupCounter.set(0);
                        }
                        if (close) {
                            events();
                            timeout(0, false);
                            try {
                                selector.close();
                            } catch (IOException ioe) {
                                log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                            }
                            break;
                        }
                    } catch (Throwable x) {
                        ExceptionUtils.handleThrowable(x);
                        log.error("",x);
                        continue;
                    }
                    //either we timed out or we woke up, process events first
                    if ( keyCount == 0 ) hasEvents = (hasEvents | events());
    
                    Iterator<SelectionKey> iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
                    // Walk through the collection of ready keys and dispatch
                    // any active event.
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            iterator.remove();
                            processKey(sk, attachment);
                        }
                    }//while
    
                    //process timeouts
                    timeout(keyCount,hasEvents);
                }//while
    
                getStopLatch().countDown();
            }
    

    主要的逻辑是通过while循环去遍历SelectionKey,如果有请求过来,则会交给processKey方法处理。

            protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
                try {
                    if ( close ) {
                        cancelledKey(sk);
                    } else if ( sk.isValid() && attachment != null ) {
                        if (sk.isReadable() || sk.isWritable() ) {
                            if ( attachment.getSendfileData() != null ) {
                                processSendfile(sk,attachment, false);
                            } else {
                                unreg(sk, attachment, sk.readyOps());
                                boolean closeSocket = false;
                                // Read goes before write
                                if (sk.isReadable()) {
                                    if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (!closeSocket && sk.isWritable()) {
                                    if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (closeSocket) {
                                    cancelledKey(sk);
                                }
                            }
                        }
                    } else {
                        //invalid key
                        cancelledKey(sk);
                    }
                } catch ( CancelledKeyException ckx ) {
                    cancelledKey(sk);
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error("",t);
                }
            }
    

    这里对nio的SelectionKey实例进行判断,发现sk.isReadable()true,最后交给processSocket方法。

        public boolean processSocket(SocketWrapperBase<S> socketWrapper,
                SocketEvent event, boolean dispatch) {
            try {
                if (socketWrapper == null) {
                    return false;
                }
                SocketProcessorBase<S> sc = processorCache.pop();
                if (sc == null) {
                    sc = createSocketProcessor(socketWrapper, event);
                } else {
                    sc.reset(socketWrapper, event);
                }
                Executor executor = getExecutor();
                if (dispatch && executor != null) {
                    executor.execute(sc);
                } else {
                    sc.run();
                }
            } catch (RejectedExecutionException ree) {
                getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
                return false;
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                // This means we got an OOM or similar creating a thread, or that
                // the pool and its queue are full
                getLog().error(sm.getString("endpoint.process.fail"), t);
                return false;
            }
            return true;
        }
    

    这里通过getExecutor方法获取之前创建的Executor实例,然后执行execute方法提交请求任务。至此,一个完整的请求便会经过Tomcat到Servlet,并最终进入对应的handler对请求进行业务处理。

    以上。

    相关文章

      网友评论

        本文标题:Tomcat 启动及请求处理过程

        本文链接:https://www.haomeiwen.com/subject/wwotrctx.html