美文网首页
tomcat源码分析

tomcat源码分析

作者: 简书徐小耳 | 来源:发表于2018-12-14 22:36 被阅读0次

bootstrap的main方法就是tomcat的启动类,其中就是daemon 就是类Bootstrap

public static void main(String args[]) {

        synchronized (daemonLock) {
            if (daemon == null) {
              首先new一个对象,该对象会触发Bootstrap的类初始化(即执行static代码块和静态变量)
不过这个main方法就是在Bootstrap类中执行的所以会在main方法执行前执行初始化操作
设置catalina.home和catalina.base的文件目录path
                Bootstrap bootstrap = new Bootstrap();
                try {
执行初始化方法,初始化三个类加载器,并实例化Catalina
                    bootstrap.init();
                } catch (Throwable t) {
                    handleThrowable(t);
                    t.printStackTrace();
                    return;
                }
                daemon = bootstrap;
            } else {
        设置线程上下文
                Thread.currentThread().setContextClassLoader(daemon.catalinaLoader);
            }
        }

        try {
一般启动额时候我们会执行start
            String command = "start";
            if (args.length > 0) {
                command = args[args.length - 1];
            }

            if (command.equals("startd")) {
                args[args.length - 1] = "start";
                daemon.load(args);
                daemon.start();
            } else if (command.equals("stopd")) {
                args[args.length - 1] = "stop";
                daemon.stop();
            } else if (command.equals("start")) {
这个await标识后期是用来辅助tomcat进行退出的,即设置catalina类的await
                daemon.setAwait(true);
最终还是调用catalina类的的load方法
                daemon.load(args);
最终还是调用catalina类的的start方法
                daemon.start();
                if (null == daemon.getServer()) {
                    System.exit(1);
                }
            } else if (command.equals("stop")) {
                daemon.stopServer(args);
            } else if (command.equals("configtest")) {
                daemon.load(args);
                if (null == daemon.getServer()) {
                    System.exit(1);
                }
                System.exit(0);
            } else {
                log.warn("Bootstrap: command \"" + command + "\" does not exist.");
            }
        } catch (Throwable t) {
            // Unwrap the Exception for clearer error reporting
            if (t instanceof InvocationTargetException &&
                    t.getCause() != null) {
                t = t.getCause();
            }
            handleThrowable(t);
            t.printStackTrace();
            System.exit(1);
        }

    }

Bootstrap的初始化操作 就是设置catalina.home和catalina.base的文件目录path

  static {
       获取当前项目的位置
        String userDir = System.getProperty("user.dir");

获取我们home文件夹
        String home = System.getProperty(Globals.CATALINA_HOME_PROP);
        File homeFile = null;
todo getCanonicalPath会将文件路径解析为与操作系统相关的唯一的规范形式的字符串,而getAbsolutePath并不会
        if (home != null) {
创建这个文件
            File f = new File(home);
            try {
                homeFile = f.getCanonicalFile();
            } catch (IOException ioe) {
                homeFile = f.getAbsoluteFile();
            }
        }
如果homeFile ==null
        if (homeFile == null) {
         
检查当前的目录是否是tomcat的bin目录
            File bootstrapJar = new File(userDir, "bootstrap.jar");
 如果是的话则尝试获取bin的上一级目录(即与home在同一个目录)作为home的文件
            if (bootstrapJar.exists()) {
                File f = new File(userDir, "..");
                try {
                    homeFile = f.getCanonicalFile();
                } catch (IOException ioe) {
                    homeFile = f.getAbsoluteFile();
                }
            }
        }
如果homeFile 还是为null
        if (homeFile == null) {
            // Second fall-back. Use current directory
我们采用当前目录作为homeFile
            File f = new File(userDir);
            try {
                homeFile = f.getCanonicalFile();
            } catch (IOException ioe) {
                homeFile = f.getAbsoluteFile();
            }
        }

        catalinaHomeFile = homeFile;
把catalian.home的目录设置为homeFile的path
        System.setProperty(
                Globals.CATALINA_HOME_PROP, catalinaHomeFile.getPath());

获取catalian.base 如果没有就使用catalinaHomeFile
        String base = System.getProperty(Globals.CATALINA_BASE_PROP);
        if (base == null) {
            catalinaBaseFile = catalinaHomeFile;
        } else {
            File baseFile = new File(base);
            try {
                baseFile = baseFile.getCanonicalFile();
            } catch (IOException ioe) {
                baseFile = baseFile.getAbsoluteFile();
            }
            catalinaBaseFile = baseFile;
        }
设置catalian.base 
        System.setProperty(
                Globals.CATALINA_BASE_PROP, catalinaBaseFile.getPath());
    }

bootstrap的init方法

 public void init() throws Exception {
初始化commonclassloader,catalinaLoader,sharedclasslaoder
        initClassLoaders();
设置catalinaLoader线程上下文的classloader,一般情况下catalinaLoader会加载class专门给tomcat用
但是这边catalinaLoader与commonclassloader和sharedclasslaoder一样
        Thread.currentThread().setContextClassLoader(catalinaLoader);
预加载一些class 避免AccessControlException
        SecurityClassLoad.securityClassLoad(catalinaLoader);

获取Catalina类的实例并调用setParentLoader方法
        if (log.isDebugEnabled())
            log.debug("Loading startup class");
不会触发类的初始化
        Class<?> startupClass = catalinaLoader.loadClass("org.apache.catalina.startup.Catalina");
        Object startupInstance = startupClass.getConstructor().newInstance();

        // Set the shared extensions class loader
准备调用Catalina的setParentClassLoader,即设置sharedLoader为catalina的parentClassLoader
        if (log.isDebugEnabled())
            log.debug("Setting startup class properties");
        String methodName = "setParentClassLoader";
        Class<?> paramTypes[] = new Class[1];
        paramTypes[0] = Class.forName("java.lang.ClassLoader");
        Object paramValues[] = new Object[1];
        paramValues[0] = sharedLoader;
        Method method =
            startupInstance.getClass().getMethod(methodName, paramTypes);
        method.invoke(startupInstance, paramValues);

        catalinaDaemon = startupInstance;

    }

catalina的load方法

  public void load() {
loaded代表是否加载过
        if (loaded) {
            return;
        }
        loaded = true;

        long t1 = System.nanoTime();
获取并创建temp目录
        initDirs();

        // Before digester - it may be needed
//设置naming有可能是的JNI
        initNaming();

      创建并执行我们的Digester,主要是解析我们的xml
        Digester digester = createStartDigester();

        InputSource inputSource = null;
        InputStream inputStream = null;
        File file = null;
        try {
            try {
创建server.xml的file
                file = configFile();
                inputStream = new FileInputStream(file);
即将该url转化成唯一标识
                inputSource = new InputSource(file.toURI().toURL().toString());
            } catch (Exception e) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("catalina.configFail", file), e);
                }
            }
            if (inputStream == null) {
                try {
如果是未获取到inputStream  我们再以相对路径的方式去获取
                    inputStream = getClass().getClassLoader()
                        .getResourceAsStream(getConfigFile());
                    inputSource = new InputSource
                        (getClass().getClassLoader()
                         .getResource(getConfigFile()).toString());
                } catch (Exception e) {
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("catalina.configFail",
                                getConfigFile()), e);
                    }
                }
            }

            // This should be included in catalina.jar
            // Alternative: don't bother with xml, just create it manually.
如果还没有xml 那么这个时候我们去寻找内置的xml
            if (inputStream == null) {
                try {
                    inputStream = getClass().getClassLoader()
                            .getResourceAsStream("server-embed.xml");
                    inputSource = new InputSource
                    (getClass().getClassLoader()
                            .getResource("server-embed.xml").toString());
                } catch (Exception e) {
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("catalina.configFail",
                                "server-embed.xml"), e);
                    }
                }
            }

如果inputStream 和inputSource  都没有 那么  则打印日志直接返回
            if (inputStream == null || inputSource == null) {
                if  (file == null) {
                    log.warn(sm.getString("catalina.configFail",
                            getConfigFile() + "] or [server-embed.xml]"));
                } else {
                    log.warn(sm.getString("catalina.configFail",
                            file.getAbsolutePath()));
                    if (file.exists() && !file.canRead()) {
                        log.warn("Permissions incorrect, read permission is not allowed on the file.");
                    }
                }
                return;
            }
准备 让digester 按照规则解析xml
            try {
                inputSource.setByteStream(inputStream);
                digester.push(this);
                digester.parse(inputSource);
            } catch (SAXParseException spe) {
                log.warn("Catalina.start using " + getConfigFile() + ": " +
                        spe.getMessage());
                return;
            } catch (Exception e) {
                log.warn("Catalina.start using " + getConfigFile() + ": " , e);
                return;
            }
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    // Ignore
                }
            }
        }
设置server的catalina属性,其中server在解析xml的时候已经创建了
        getServer().setCatalina(this);
设置catalinahome和catalinabase的属性
        getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile());
        getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile());

        // Stream redirection
SystemLogHandler调用startCapture 会使用caputrelog里面的printStream,该
stream不会自动flush 且因为是线程独有的 所以不会被阻塞,当结束后调用stopCapture 把之前使用
System.out的打印都返回 采用其他方式一次性打印出来
        initStreams();

        // Start the new server
        try {
初始化server,注意这边的init是LifecycleMBeanBase的init方法
            getServer().init();
        } catch (LifecycleException e) {
            if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) {
                throw new java.lang.Error(e);
            } else {
                log.error("Catalina.start", e);
            }
        }

        long t2 = System.nanoTime();
        if(log.isInfoEnabled()) {
            log.info("Initialization processed in " + ((t2 - t1) / 1000000) + " ms");
        }
    }

Lifecycle的init


 public final synchronized void init() throws LifecycleException {
        if (!state.equals(LifecycleState.NEW)) {
            invalidTransition(Lifecycle.BEFORE_INIT_EVENT);
        }

        try {
设置state为INITIALIZING同时触发对应的监听事件
            setStateInternal(LifecycleState.INITIALIZING, null, false);
初始化真正的逻辑
            initInternal();
设置state为INITIALIZED同时触发对应的监听事件
            setStateInternal(LifecycleState.INITIALIZED, null, false);
        } catch (Throwable t) {
            handleSubClassException(t, "lifecycleBase.initFail", toString());
        }
    }

standServer的initInternal

  protected void initInternal() throws LifecycleException {
这边生成MBeanServer,即方便后期使用JMX查看tomcat的一些信息
ObjectName 是MBean在MBeanServer中的唯一标示
        super.initInternal();


注册一个StringCache的Mbean
        onameStringCache = register(new StringCache(), "type=StringCache");

       注册一个MBeanFactory 
        MBeanFactory factory = new MBeanFactory();
        factory.setContainer(this);
        onameMBeanFactory = register(factory, "type=MBeanFactory");

 globalNamingResources 执行初始化 跟standServer相似只是initInternal不太一样,
其内部主要是注册一些Mbean到自己的MBeanServer中
        globalNamingResources.init();

        // Populate the extension validator with JARs from common and shared
从我们shared(和shared的父类common,这边去除了系统类加载器)的classloader加载的jar包中检测是否有mainfest文件 如果有包装成对象加入集合
        if (getCatalina() != null) {
我们在bootstrap的init的中设置了catalina的ParentClassLoader是shareclassloader
            ClassLoader cl = getCatalina().getParentClassLoader();
            // Walk the class loader hierarchy. Stop at the system class loader.
            // This will add the shared (if present) and common class loaders
            while (cl != null && cl != ClassLoader.getSystemClassLoader()) {
                if (cl instanceof URLClassLoader) {
                    URL[] urls = ((URLClassLoader) cl).getURLs();
                    for (URL url : urls) {
                        if (url.getProtocol().equals("file")) {
                            try {
                                File f = new File (url.toURI());
                                if (f.isFile() &&
                                        f.getName().endsWith(".jar")) {
                                    ExtensionValidator.addSystemResource(f);
                                }
                            } catch (URISyntaxException e) {
                                // Ignore
                            } catch (IOException e) {
                                // Ignore
                            }
                        }
                    }
                }
                cl = cl.getParent();
            }
        }
我们的services内容在解析xml的时候就放入进行,即是包含connector和container(enigne)
        for (int i = 0; i < services.length; i++) {
这边是Service的初始化话
            services[i].init();
        }
    }

JMX的相关方法

 @Override
    protected void initInternal() throws LifecycleException {

        // If oname is not null then registration has already happened via
        // preRegister().
        if (oname == null) {
            mserver = Registry.getRegistry(null, null).getMBeanServer();

            oname = register(this, getObjectNameKeyProperties());
        }
    }

standService的init 初始化connector和engine

   protected void initInternal() throws LifecycleException {
  JMX的相关操作
        super.initInternal();
 container   容器的初始化,注意这边只是初始化了顶级容器engine ,子容器还未初始化
        if (engine != null) {
初始化Realm 如果没有就设置一个NullRealm,默认创建LockOutRealm
阻止别人暴力破解user密码,其设置密码错误锁定时长等,内部包含UserDatabaseRealm
其是存储用户登录的权限认定
            engine.init();
        }

初始化线程池 我们的tomcat的 connector 可以配置这个属性
        for (Executor executor : findExecutors()) {
            if (executor instanceof JmxEnabled) {
                ((JmxEnabled) executor).setDomain(getDomain());
            }
这个init只是注册一个MbeanServer和Mbean
            executor.init();
        }

这个init只是注册一个MbeanServer和Mbean
        mapperListener.init();

我们在上面已经初始化好了container,那么这边我就初始化connector
        synchronized (connectorsLock) {
            for (Connector connector : connectors) {
                connector.init();
            }
        }
    }

Connector的初始化

  protected void initInternal() throws LifecycleException {
JMX相关
        super.initInternal();
protocol 是在server.xml中配置在了connector里面的元素
        if (protocolHandler == null) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
        }

        // Initialize adapter
CoyoteAdapter 可主要是生成HttpServletRequest和HttpServletResponse
        adapter = new CoyoteAdapter(this);
        protocolHandler.setAdapter(adapter);

        // Make sure parseBodyMethodsSet has a default
设置一个parseBodyMethodsSet (解析请求体的方法),默认是POST
        if (null == parseBodyMethodsSet) {
            setParseBodyMethods(getParseBodyMethods());
        }
如果是APR协议但是APR监听者却不可用则抛出异常
        if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
            throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
                    getProtocolHandlerClassName()));
        }
如果APR监听者可用且需要设置SSL且协议是Http11JsseProtocol 则增加SSL
        if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
                protocolHandler instanceof AbstractHttp11JsseProtocol) {
            AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
                    (AbstractHttp11JsseProtocol<?>) protocolHandler;
            if (jsseProtocolHandler.isSSLEnabled() &&
                    jsseProtocolHandler.getSslImplementationName() == null) {
                // OpenSSL is compatible with the JSSE configuration, so use it if APR is available
                jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
            }
        }

        try {
协议的初始化
            protocolHandler.init();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
        }
    }

protocolHandler(协议)的初始化

    @Override
    public void init() throws Exception {
      先升级协议在初始化
        for (UpgradeProtocol upgradeProtocol : upgradeProtocols) {
            configureUpgradeProtocol(upgradeProtocol);
        }
真正初始化
        super.init();
    }
  public void init() throws Exception {
打印port的offset
        if (getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
            logPortOffset();
        }
如果ObjectName 为null 则创建一个 并注册到对应的MBeanServer
        if (oname == null) {
            // Component not pre-registered so register it
            oname = createObjectName();
            if (oname != null) {
                Registry.getRegistry(null, null).registerComponent(this, oname, null);
            }
        }
给domain也创建对应的MBean
        if (this.domain != null) {
            rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
            Registry.getRegistry(null, null).registerComponent(
                    getHandler().getGlobal(), rgOname, null);
        }
domain 只是做为JMX注册使用
        String endpointName = getName();
        endpoint.setName(endpointName.substring(1, endpointName.length()-1));
        endpoint.setDomain(domain);
首先选择对应的bind,创建对应的监听端口的socekt,然后选择一个shared的selector,设置pollerThreadCount(对应netty的worker专门去处理一系列socket的事件),acceptorThreadCount(对应netty的boss)
        endpoint.init();
    }

catalina的start方法

 public void start() {
 如果server不存在 重新调用load
        if (getServer() == null) {
            load();
        }
 如果还是为空直接返回
        if (getServer() == null) {
            log.fatal("Cannot start server. Server instance is not configured.");
            return;
        }

        long t1 = System.nanoTime();

   启动server,观察者模式会启动所有的子类
        try {
            getServer().start();
        } catch (LifecycleException e) {
            log.fatal(sm.getString("catalina.serverStartFail"), e);
            try {
                getServer().destroy();
            } catch (LifecycleException e1) {
                log.debug("destroy() failed for failed Server ", e1);
            }
            return;
        }

        long t2 = System.nanoTime();
        if(log.isInfoEnabled()) {
            log.info("Server startup in " + ((t2 - t1) / 1000000) + " ms");
        }

注册关闭时候的钩子,可以设置我们想注册的钩子
        if (useShutdownHook) {
            if (shutdownHook == null) {
                shutdownHook = new CatalinaShutdownHook();
            }
            Runtime.getRuntime().addShutdownHook(shutdownHook);

            // If JULI is being used, disable JULI's shutdown hook since
            // shutdown hooks run in parallel and log messages may be lost
            // if JULI's hook completes before the CatalinaShutdownHook()
            LogManager logManager = LogManager.getLogManager();
            if (logManager instanceof ClassLoaderLogManager) {
                ((ClassLoaderLogManager) logManager).setUseShutdownHook(
                        false);
            }
        }
  这边就是让tomcat进程不关闭
        if (await) {
如果是-2 说明不需要启动 直接关闭
这边根据我们的端口的开放选择不同的关闭方式,如果端口是-1
代表是内置的tomcat,这个时候无法通过socket监听,只能通过线程不停的轮询一个标识
如果不是-1 则可以开启一个socekt 监听命令 如果是关闭 就跳出循环 执行stop
            await();
关闭服务,如果存在钩子 先删除钩子 在关闭,防止重复关闭
            stop();
        }
    }

server的start方法

    protected void startInternal() throws LifecycleException {
  触发监听事件和设置state
        fireLifecycleEvent(CONFIGURE_START_EVENT, null);
        setState(LifecycleState.STARTING);
启动globalNamingResources
这边  触发监听事件和设置state
        globalNamingResources.start();

  启动server
        synchronized (servicesLock) {
            for (int i = 0; i < services.length; i++) {
                services[i].start();
            }
        }
    }

service的start方法

  protected void startInternal() throws LifecycleException {
    设置state
        if(log.isInfoEnabled())
            log.info(sm.getString("standardService.start.name", this.name));
        setState(LifecycleState.STARTING);
  同步启动engine
        // Start our defined Container first
        if (engine != null) {
            synchronized (engine) {
                engine.start();
            }
        }
  同步启动executor
        synchronized (executors) {
            for (Executor executor: executors) {
                executor.start();
            }
        }
启动mapperListener
        mapperListener.start();

      启动非失败的connector
        synchronized (connectorsLock) {
            for (Connector connector: connectors) {
                // If it has already failed, don't try and start it
                if (connector.getState() != LifecycleState.FAILED) {
                    connector.start();
                }
            }
        }
    }

engine的start,即调用ContainerBase的startInternal

 protected synchronized void startInternal() throws LifecycleException {

        // Start our subordinate components, if any
        logger = null;
        getLogger();
        Cluster cluster = getClusterInternal();
        if (cluster instanceof Lifecycle) {
            ((Lifecycle) cluster).start();
        }
        Realm realm = getRealmInternal();
        if (realm instanceof Lifecycle) {
            ((Lifecycle) realm).start();
        }

        // Start our child containers, if any
        Container children[] = findChildren();
        List<Future<Void>> results = new ArrayList<>();
        for (int i = 0; i < children.length; i++) {
            results.add(startStopExecutor.submit(new StartChild(children[i])));
        }

        MultiThrowable multiThrowable = null;

        for (Future<Void> result : results) {
            try {
等待子类容器启动成功
                result.get();
            } catch (Throwable e) {
                log.error(sm.getString("containerBase.threadedStartFailed"), e);
                if (multiThrowable == null) {
                    multiThrowable = new MultiThrowable();
                }
                multiThrowable.add(e);
            }

        }
        if (multiThrowable != null) {
            throw new LifecycleException(sm.getString("containerBase.threadedStartFailed"),
                    multiThrowable.getThrowable());
        }


四个基础阀门放在各自容器管道的最后
其他的valve 可以执行一些其他操作 比如打印日志之类的
        if (pipeline instanceof Lifecycle) {
            ((Lifecycle) pipeline).start();
        }

    设置状态
        setState(LifecycleState.STARTING);

        // Start our thread
启动线程,主要是执行ContainerBackgroundProcessor,执行backGroundProcessor
        threadStart();
    }
···
>connector的start方法
```java
    protected void startInternal() throws LifecycleException {

    查看端口,小于0则抛出错误
        if (getPortWithOffset() < 0) {
            throw new LifecycleException(sm.getString(
                    "coyoteConnector.invalidPort", Integer.valueOf(getPortWithOffset())));
        }

        setState(LifecycleState.STARTING);

        try {
启动协议
            protocolHandler.start();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
        }
    }

protocolHandler的start

    public void start() throws Exception {
        if (getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
            logPortOffset();
        }
 启动endpoint,主要是绑定端口,并启动一些监听链接的请求
        endpoint.start();

开启异步timeout的线程
检测超时的请求,并将该请求再转发到工作线程池处理
        asyncTimeout = new AsyncTimeout();
        Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
        int priority = endpoint.getThreadPriority();
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            priority = Thread.NORM_PRIORITY;
        }
        timeoutThread.setPriority(priority);
        timeoutThread.setDaemon(true);
        timeoutThread.start();
    }
  如果该endpoint还未绑定 则开始绑定,不过已经在endpoint的init的时候绑定了
   public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bindWithCleanup();
            bindState = BindState.BOUND_ON_START;
        }
        startInternal();
    }
endpoint的start方法
  public void startInternal() throws Exception {
如果还未运行则开始运行
        if (!running) {
            running = true;
            paused = false;
    三个缓存用同步栈保存
            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());

            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());

            // Create worker collection
创建真正执行请求的线程
            if ( getExecutor() == null ) {
                createExecutor();
            }
  初始化最大请求数,可以在server.xml配置
            initializeConnectionLatch();

            开启pollers线程 我们通过acceptor线程 把socket注册到poller上面
poller 内部持有一个selector 轮询发送的事件然后将事件交给executor处理
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }
创建acceptor 该线程遇到请求tomcat的socket 将其注册到poller上
            startAcceptorThreads();
        }
    }



  • 通过下图可知acceptor 单线程即可


    image.png

accept线程启动和run方法

    protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new ArrayList<>(count);
  获取acceptor的数量,创建并启动
        for (int i = 0; i < count; i++) {
            Acceptor<U> acceptor = new Acceptor<>(this);
            String threadName = getName() + "-Acceptor-" + i;
            acceptor.setThreadName(threadName);
            acceptors.add(acceptor);
            Thread t = new Thread(acceptor, threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }
- acceptor的run方法
 public void run() {

        int errorDelay = 0;

    一直循环除非我们接收到关闭命令
        while (endpoint.isRunning()) {

    如果可以暂停且是正在运行就沉睡一会
            while (endpoint.isPaused() && endpoint.isRunning()) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }
  如果不在运行了 直接退出
            if (!endpoint.isRunning()) {
                break;
            }
设置为Running
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
检测下 如果我们的connection已经达到最大值就等待,一般到这一步说我们已经准备去获取一个connection
从这也可以看出一个aceeptor会占着一个connection,那么如果我们启动太多的acceptor 也不会太好
                endpoint.countUpOrAwaitConnection();

                // Endpoint might have been paused while waiting for latch
                // If that is the case, don't accept new connections
有可能在等待connection的时候 已经暂停了 所以需要检查下
                if (endpoint.isPaused()) {
                    continue;
                }

                U socket = null;
                try {
            
获取请求
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                如果发生异常就释放connect
                    endpoint.countDownConnection();
                    if (endpoint.isRunning()) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // Successful accept, reset the error delay
这个标识是当发生异常的时候让线程sleep的
                errorDelay = 0;

            
配置socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
          
setSocketOptions方法 会将socket送给一个合适的processor
                    if (!endpoint.setSocketOptions(socket)) {
如果失败了就关闭socket
                        endpoint.closeSocket(socket);
                    }
                } else {
如果endpoint不是isRunning或者isPaused 则直接关闭socket
                    endpoint.destroySocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                String msg = sm.getString("endpoint.accept.fail");
                // APR specific.
                // Could push this down but not sure it is worth the trouble.
                if (t instanceof Error) {
                    Error e = (Error) t;
                    if (e.getError() == 233) {
                        // Not an error on HP-UX so log as a warning
                        // so it can be filtered out on that platform
                        // See bug 50273
                        log.warn(msg, t);
                    } else {
                        log.error(msg, t);
                    }
                } else {
                        log.error(msg, t);
                }
            }
        }
        state = AcceptorState.ENDED;
    }

设置socket
  protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
    设置该channel为非阻塞
            socket.configureBlocking(false);
            Socket sock = socket.socket();
给socket设置一些属性比如缓冲区大小,tcpNoDelay,soTimeOut
            socketProperties.setProperties(sock);
从同步栈中获取NioChannel的对象,其实数组
            NioChannel channel = nioChannels.pop();
如果为空则根据条件创建NioChannel
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
如果已经存在对象 则设置下socket并清除buf
                channel.setIOChannel(socket);
                channel.reset();
            }
将该channel注册到poller线程,由他去轮训该socket的事件
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

  public void register(final NioChannel socket) {
将NioChannel 和poller绑定
            socket.setPoller(this);
把NioChannel NioEndpoint和poller包装成对象  NioSocketWrapper 
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
绑定NioChannel 和NioSocketWrapper 
            socket.setSocketWrapper(ka);
            ka.setPoller(this);
设置后期该channel的timeout和keepalive ssl
            ka.setReadTimeout(getConnectionTimeout());
            ka.setWriteTimeout(getConnectionTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
从缓存获取PollerEvent ,PollerEvent 是一个runnable
            PollerEvent r = eventCache.pop();
代表对read事件感兴趣
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
如果PollerEvent为空 就new一个
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
添加event
            addEvent(r);
        }
 private void addEvent(PollerEvent event) {
            events.offer(event);
如果wakeupCounter=-1 则唤醒selector
            if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
        }
poller的run方法
 public void run() {

            while (true) {

                boolean hasEvents = false;

                try {
如果未close
                    if (!close) {
检测我们之前注册的event,然后调用所有event的run方法该方法会根据注册到selector上或者
检测我们selectedKey是否过期 以及注册该channel感兴趣的事情
调用完了就清除PollerEvent的属性
                        hasEvents = events();
如果wakeupCounter大于0,说明有event加入进来
                        if (wakeupCounter.getAndSet(-1) > 0) {
          非阻塞查询
                            keyCount = selector.selectNow();
                        } else {
   阻塞查询查询
                            keyCount = selector.select(selectorTimeout);
                        }
                        wakeupCounter.set(0);
                    }
如果关闭 
                    if (close) {
                        events();
清理selector和key
                        timeout(0, false);
                        try {
关闭selector
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error("",x);
                    continue;
                }
                //either we timed out or we woke up, process events first
我们超时或者醒来 我们先处理events
                if ( keyCount == 0 ) hasEvents = (hasEvents | events());

                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
遍历可以读取的keys 分派任何的激活的的event
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (attachment == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
处理事件的入口
                        processKey(sk, attachment);
                    }
                }//while

                //process timeouts
                timeout(keyCount,hasEvents);
            }//while
  poller 释放
            getStopLatch().countDown();
        }

pollerEvent的run方法

   public void run() {
如果是注册事件 先注册
            if (interestOps == OP_REGISTER) {
                try {
                    socket.getIOChannel().register(
                            socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                } catch (Exception x) {
                    log.error(sm.getString("endpoint.nio.registerFail"), x);
                }
            } else {
否则我们去selector寻找一个SelectionKey
                final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                try {
如果Key==null 我们就释放该请求
                    if (key == null) {
                        // The key was cancelled (e.g. due to socket closure)
                        // and removed from the selector while it was being
                        // processed. Count down the connections at this point
                        // since it won't have been counted down when the socket
                        // closed.
                        socket.socketWrapper.getEndpoint().countDownConnection();
                        ((NioSocketWrapper) socket.socketWrapper).closed = true;
                    } else {
注册感兴趣的事
                        final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                        if (socketWrapper != null) {
                            //we are registering the key to start with, reset the fairness counter.
                            int ops = key.interestOps() | interestOps;
                            socketWrapper.interestOps(ops);
                            key.interestOps(ops);
                        } else {
                            socket.getPoller().cancelledKey(key);
                        }
                    }
                } catch (CancelledKeyException ckx) {
                    try {
                        socket.getPoller().cancelledKey(key);
                    } catch (Exception ignore) {}
                }
            }
        }

处理请求的地方

   protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
            try {
如果close 就cancel
                if ( close ) {
                    cancelledKey(sk);
                } else if ( sk.isValid() && attachment != null ) {
如果可读或者可写就继续处理 否则cancelledKey
                    if (sk.isReadable() || sk.isWritable() ) {
如果是文件上传 调用processSendfile
                        if ( attachment.getSendfileData() != null ) {
                            processSendfile(sk,attachment, false);
                        } else {
  取消注册的事件
                            unreg(sk, attachment, sk.readyOps());
                            boolean closeSocket = false;
                            // Read goes before write
下面就是先读后写
                            if (sk.isReadable()) {
                                if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (!closeSocket && sk.isWritable()) {
                                if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (closeSocket) {
                                cancelledKey(sk);
                            }
                        }
                    }
                } else {
                    //invalid key
                    cancelledKey(sk);
                }
            } catch ( CancelledKeyException ckx ) {
                cancelledKey(sk);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error("",t);
            }
        }
dispatch是否启用线程,event代表是读写事件,socketWrapper包含了socket 等信息
  public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
从缓存获取Processor
            SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
获取线程池
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

最终是通过这个run方法进行执行
SocketProcessorBase的run方法

 protected void doRun() {
            NioChannel socket = socketWrapper.getSocket();
            SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

            try {
                int handshake = -1;

                try {
                    if (key != null) {
是否需要握手
                        if (socket.isHandshakeComplete()) {
                            // No TLS handshaking required. Let the handler
                            // process this socket / event combination.
                            handshake = 0;
                        } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                                event == SocketEvent.ERROR) {
                            // Unable to complete the TLS handshake. Treat it as
                            // if the handshake failed.
                            handshake = -1;
                        } else {
开始握手
                            handshake = socket.handshake(key.isReadable(), key.isWritable());
                            // The handshake process reads/writes from/to the
                            // socket. status may therefore be OPEN_WRITE once
                            // the handshake completes. However, the handshake
                            // happens when the socket is opened so the status
                            // must always be OPEN_READ after it completes. It
                            // is OK to always set this as it is only used if
                            // the handshake completes.
                            event = SocketEvent.OPEN_READ;
                        }
                    }
                } catch (IOException x) {
                    handshake = -1;
                    if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
                } catch (CancelledKeyException ckx) {
                    handshake = -1;
                }
handshake =0 才能出来event
                if (handshake == 0) {
                    SocketState state = SocketState.OPEN;
                    // Process the request from this socket
                    if (event == null) {
他就是处理的地方,最终调用了CoyoteAdapter的service方法进行处理
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == SocketState.CLOSED) {
                        close(socket, key);
                    }
                } else if (handshake == -1 ) {
                    close(socket, key);
                } else if (handshake == SelectionKey.OP_READ){
                    socketWrapper.registerReadInterest();
                } else if (handshake == SelectionKey.OP_WRITE){
                    socketWrapper.registerWriteInterest();
                }
            } catch (CancelledKeyException cx) {
                socket.getPoller().cancelledKey(key);
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            } catch (Throwable t) {
                log.error("", t);
                socket.getPoller().cancelledKey(key);
            } finally {
                socketWrapper = null;
                event = null;
                //return to cache
                if (running && !paused) {
                    processorCache.push(this);
                }
            }
        }
    }

相关文章

网友评论

      本文标题:tomcat源码分析

      本文链接:https://www.haomeiwen.com/subject/nuupqqtx.html