Server

作者: stayiwithime | 来源:发表于2021-03-29 10:06 被阅读0次
server and service

Server

类图

server

继承关系图

StandardServer

源码阅读

构造方法

 public StandardServer() {
        super();
        globalNamingResources = new NamingResourcesImpl();
        globalNamingResources.setContainer(this);
        if (isUseNaming()) {
            namingContextListener = new NamingContextListener();
            addLifecycleListener(namingContextListener);
        } else {
            namingContextListener = null;
        }
    }

 private boolean isUseNaming() {
        boolean useNaming = true;
        // Reading the "catalina.useNaming" environment variable
        String useNamingProperty = System.getProperty("catalina.useNaming");
        if ((useNamingProperty != null)
            && (useNamingProperty.equals("false"))) {
            useNaming = false;
        }
        return useNaming;
    }
  • 初始化globalNamingResources
  • 根据配置是否添加NamingContextListener监听器
   final PropertyChangeSupport support = new PropertyChangeSupport(this);
   public void addPropertyChangeListener(PropertyChangeListener listener) {
        support.addPropertyChangeListener(listener);
    }
    public void removePropertyChangeListener(PropertyChangeListener listener) {
        support.removePropertyChangeListener(listener);
    }
  • 属性变化触发的监听器

实现Server接口的方法及相关属性

   private NamingResourcesImpl globalNamingResources = null;
   public NamingResourcesImpl getGlobalNamingResources() {
        return this.globalNamingResources;
    }
   public void setGlobalNamingResources
        (NamingResourcesImpl globalNamingResources) {

        NamingResourcesImpl oldGlobalNamingResources =
            this.globalNamingResources;
        this.globalNamingResources = globalNamingResources;
        this.globalNamingResources.setContainer(this);
        support.firePropertyChange("globalNamingResources",
                                   oldGlobalNamingResources,
                                   this.globalNamingResources);

    }

    private javax.naming.Context globalNamingContext = null;
    public javax.naming.Context getGlobalNamingContext() {
        return this.globalNamingContext;
    }
    public void setGlobalNamingContext(javax.naming.Context globalNamingContext) {
        this.globalNamingContext = globalNamingContext;
    }

private final NamingContextListener namingContextListener;
  • 全局资源组件的设置,在构造方法中有过初始化
  • support.firePropertyChange是PropertyChangeSupport监控相关属性修改的组件
 private int port = 8005;
    public int getPort() {
        return this.port;
    }   
    public void setPort(int port) {
        this.port = port;
    }

    private int portOffset = 0;
    public int getPortOffset() {
        return portOffset;
    }
    public void setPortOffset(int portOffset) {
        if (portOffset < 0) {
            throw new IllegalArgumentException(
                    sm.getString("standardServer.portOffset.invalid", Integer.valueOf(portOffset)));
        }
        this.portOffset = portOffset;
    }

    public int getPortWithOffset() {
        // Non-positive port values have special meanings and the offset should
        // not apply.
        int port = getPort();
        if (port > 0) {
            return port + getPortOffset();
        } else {
            return port;
        }
    }

    private String address = "localhost";
    public String getAddress() {
        return this.address;
    }
    public void setAddress(String address) {
        this.address = address;
    }

    private Random random = null;
    private String shutdown = "SHUTDOWN";
    public String getShutdown() {
        return this.shutdown;
    }
    public void setShutdown(String shutdown) {
        this.shutdown = shutdown;
    }
  • 监听的目标端口,用来接收shutdown 命令
private ClassLoader parentClassLoader = null;
     public ClassLoader getParentClassLoader() {
        if (parentClassLoader != null)
            return parentClassLoader;
        if (catalina != null) {
            return catalina.getParentClassLoader();
        }
        return ClassLoader.getSystemClassLoader();
    }
    public void setParentClassLoader(ClassLoader parent) {
        ClassLoader oldParentClassLoader = this.parentClassLoader;
        this.parentClassLoader = parent;
        support.firePropertyChange("parentClassLoader", oldParentClassLoader,
                                   this.parentClassLoader);
    }
  • 类加载器
  • 属性监听器监听类加载器的变化
private Catalina catalina = null;
    public Catalina getCatalina() {
        return catalina;
    }
    public void setCatalina(Catalina catalina) {
        this.catalina = catalina;
    }
    
    private File catalinaBase = null;    
    public File getCatalinaBase() {
        if (catalinaBase != null) {
            return catalinaBase;
        }

        catalinaBase = getCatalinaHome();
        return catalinaBase;
    }
    public void setCatalinaBase(File catalinaBase) {
        this.catalinaBase = catalinaBase;
    }
    private File catalinaHome = null;
    public File getCatalinaHome() {
        return catalinaHome;
    }
    public void setCatalinaHome(File catalinaHome) {
        this.catalinaHome = catalinaHome;
    }
  • catalina :处理服务相关的start,stop命令
  • 设置catalina的相关文件地址
protected int utilityThreads = 2;
    protected boolean utilityThreadsAsDaemon = false;
    private ScheduledThreadPoolExecutor utilityExecutor = null;
    private ScheduledExecutorService utilityExecutorWrapper = null;
    public int getUtilityThreads() {
        return utilityThreads;
    }
    public void setUtilityThreads(int utilityThreads) {
        // Use local copies to ensure thread safety
        int oldUtilityThreads = this.utilityThreads;
        if (getUtilityThreadsInternal(utilityThreads) < getUtilityThreadsInternal(oldUtilityThreads)) {
            return;
        }
        this.utilityThreads = utilityThreads;
        if (oldUtilityThreads != utilityThreads && utilityExecutor != null) {
            reconfigureUtilityExecutor(getUtilityThreadsInternal(utilityThreads));
        }
    }

private static int getUtilityThreadsInternal(int utilityThreads) {
        int result = utilityThreads;
        if (result <= 0) {
            result = Runtime.getRuntime().availableProcessors() + result;
            if (result < 2) {
                result = 2;
            }
        }
        return result;
    }

private synchronized void reconfigureUtilityExecutor(int threads) {
        // The ScheduledThreadPoolExecutor doesn't use MaximumPoolSize, only CorePoolSize is available
        if (utilityExecutor != null) {
            utilityExecutor.setCorePoolSize(threads);
        } else {
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
                    new ScheduledThreadPoolExecutor(threads,
                            new TaskThreadFactory("Catalina-utility-", utilityThreadsAsDaemon, Thread.MIN_PRIORITY));
            scheduledThreadPoolExecutor.setKeepAliveTime(10, TimeUnit.SECONDS);
            scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
            scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
            utilityExecutor = scheduledThreadPoolExecutor;
            utilityExecutorWrapper = new org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor(utilityExecutor);
        }
    }
  • 设置Server的实用线程数
  • 该线程池用来执行周期性监听器
    private Service services[] = new Service[0];
    private final Object servicesLock = new Object();
    public void addService(Service service) {

        service.setServer(this);

        synchronized (servicesLock) {
            Service results[] = new Service[services.length + 1];
            System.arraycopy(services, 0, results, 0, services.length);
            results[services.length] = service;
            services = results;

            if (getState().isAvailable()) {
                try {
                    service.start();
                } catch (LifecycleException e) {
                    // Ignore
                }
            }

            // Report this property change to interested listeners
            support.firePropertyChange("service", null, service);
        }

    }
    public Service findService(String name) {
        if (name == null) {
            return null;
        }
        synchronized (servicesLock) {
            for (Service service : services) {
                if (name.equals(service.getName())) {
                    return service;
                }
            }
        }
        return null;
    }
    public Service[] findServices() {
        return services;
    }
    public void removeService(Service service) {

        synchronized (servicesLock) {
            int j = -1;
            for (int i = 0; i < services.length; i++) {
                if (service == services[i]) {
                    j = i;
                    break;
                }
            }
            if (j < 0)
                return;
            try {
                services[j].stop();
            } catch (LifecycleException e) {
                // Ignore
            }
            int k = 0;
            Service results[] = new Service[services.length - 1];
            for (int i = 0; i < services.length; i++) {
                if (i != j)
                    results[k++] = services[i];
            }
            services = results;

            // Report this property change to interested listeners
            support.firePropertyChange("service", service, null);
        }

    }
  • 管理Service
  • addService():
    1.通过servicesLock 来保证线程安全,且采用写时复制的方式(不会影响到已经读取的线程的处理)来处理新增Serveice
    2.并执行service的start方法
    3.触发service修改的属性变化监听器
  • findService(String name)和findServices() 就是获取service的方法
  • removeService():
    1.获取锁,保证线程安全
    2.获取到指定要移除的service,并执行其stop方法
    3.复制到新的数组中(写时复制的方式)
    4.触发service修改的属性变化监听器
 private volatile Thread awaitThread = null;
    private volatile ServerSocket awaitSocket = null;
    private volatile boolean stopAwait = false;
    public void await() {
        // Negative values - don't wait on port - tomcat is embedded or we just don't like ports
        if (getPortWithOffset() == -2) {
            // undocumented yet - for embedding apps that are around, alive.
            return;
        }
        if (getPortWithOffset() == -1) {
            try {
                awaitThread = Thread.currentThread();
                while(!stopAwait) {
                    try {
                        Thread.sleep( 10000 );
                    } catch( InterruptedException ex ) {
                        // continue and check the flag
                    }
                }
            } finally {
                awaitThread = null;
            }
            return;
        }

        // Set up a server socket to wait on
        try {
            awaitSocket = new ServerSocket(getPortWithOffset(), 1,
                    InetAddress.getByName(address));
        } catch (IOException e) {
            log.error(sm.getString("standardServer.awaitSocket.fail", address,
                    String.valueOf(getPortWithOffset()), String.valueOf(getPort()),
                    String.valueOf(getPortOffset())), e);
            return;
        }

        try {
            awaitThread = Thread.currentThread();

            // Loop waiting for a connection and a valid command
            while (!stopAwait) {
                ServerSocket serverSocket = awaitSocket;
                if (serverSocket == null) {
                    break;
                }

                // Wait for the next connection
                Socket socket = null;
                StringBuilder command = new StringBuilder();
                try {
                    InputStream stream;
                    long acceptStartTime = System.currentTimeMillis();
                    try {
                        socket = serverSocket.accept();
                        socket.setSoTimeout(10 * 1000);  // Ten seconds
                        stream = socket.getInputStream();
                    } catch (SocketTimeoutException ste) {
                        // This should never happen but bug 56684 suggests that
                        // it does.
                        log.warn(sm.getString("standardServer.accept.timeout",
                                Long.valueOf(System.currentTimeMillis() - acceptStartTime)), ste);
                        continue;
                    } catch (AccessControlException ace) {
                        log.warn(sm.getString("standardServer.accept.security"), ace);
                        continue;
                    } catch (IOException e) {
                        if (stopAwait) {
                            // Wait was aborted with socket.close()
                            break;
                        }
                        log.error(sm.getString("standardServer.accept.error"), e);
                        break;
                    }

                    // Read a set of characters from the socket
                    int expected = 1024; // Cut off to avoid DoS attack
                    while (expected < shutdown.length()) {
                        if (random == null)
                            random = new Random();
                        expected += (random.nextInt() % 1024);
                    }
                    while (expected > 0) {
                        int ch = -1;
                        try {
                            ch = stream.read();
                        } catch (IOException e) {
                            log.warn(sm.getString("standardServer.accept.readError"), e);
                            ch = -1;
                        }
                        // Control character or EOF (-1) terminates loop
                        if (ch < 32 || ch == 127) {
                            break;
                        }
                        command.append((char) ch);
                        expected--;
                    }
                } finally {
                    // Close the socket now that we are done with it
                    try {
                        if (socket != null) {
                            socket.close();
                        }
                    } catch (IOException e) {
                        // Ignore
                    }
                }

                // Match against our command string
                boolean match = command.toString().equals(shutdown);
                if (match) {
                    log.info(sm.getString("standardServer.shutdownViaPort"));
                    break;
                } else
                    log.warn(sm.getString("standardServer.invalidShutdownCommand", command.toString()));
            }
        } finally {
            ServerSocket serverSocket = awaitSocket;
            awaitThread = null;
            awaitSocket = null;

            // Close the server socket and return
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    // Ignore
                }
            }
        }
    }
  • 该方法主要是保持主线程一直在运行,除非收到shutdown命令
  • 或者是Server的生命周期结束执行了stop()方法

实现LifecycleMBeanBase接口的相关方法(生命周期以及MBean相关

 protected void initInternal() throws LifecycleException {

        super.initInternal();

        // Initialize utility executor
        reconfigureUtilityExecutor(getUtilityThreadsInternal(utilityThreads));
        register(utilityExecutor, "type=UtilityExecutor");

        // Register global String cache
        // Note although the cache is global, if there are multiple Servers
        // present in the JVM (may happen when embedding) then the same cache
        // will be registered under multiple names
        onameStringCache = register(new StringCache(), "type=StringCache");

        // Register the MBeanFactory
        MBeanFactory factory = new MBeanFactory();
        factory.setContainer(this);
        onameMBeanFactory = register(factory, "type=MBeanFactory");

        // Register the naming resources
        globalNamingResources.init();

        // Populate the extension validator with JARs from common and shared
        // class loaders
        if (getCatalina() != null) {
            ClassLoader cl = getCatalina().getParentClassLoader();
            // Walk the class loader hierarchy. Stop at the system class loader.
            // This will add the shared (if present) and common class loaders
            while (cl != null && cl != ClassLoader.getSystemClassLoader()) {
                if (cl instanceof URLClassLoader) {
                    URL[] urls = ((URLClassLoader) cl).getURLs();
                    for (URL url : urls) {
                        if (url.getProtocol().equals("file")) {
                            try {
                                File f = new File (url.toURI());
                                if (f.isFile() &&
                                        f.getName().endsWith(".jar")) {
                                    ExtensionValidator.addSystemResource(f);
                                }
                            } catch (URISyntaxException | IOException e) {
                                // Ignore
                            }
                        }
                    }
                }
                cl = cl.getParent();
            }
        }
        // Initialize our defined Services
        for (Service service : services) {
            service.init();
        }
    }
  • 初始化阶段
    1.初始化utilityexecutor(用于处理周期性监听器
    2.注册全局字符串缓存
    3.创建MBeanFactory并注册到MBeanServer中
    4.全局资源组件初始化
    5.ClassLoader加载class
    6.Services初始化
protected void startInternal() throws LifecycleException {

        fireLifecycleEvent(CONFIGURE_START_EVENT, null);
        setState(LifecycleState.STARTING);

        globalNamingResources.start();

        // Start our defined Services
        synchronized (servicesLock) {
            for (Service service : services) {
                service.start();
            }
        }

        if (periodicEventDelay > 0) {
            monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
                    () -> startPeriodicLifecycleEvent(), 0, 60, TimeUnit.SECONDS);
        }
    }

protected void startPeriodicLifecycleEvent() {
        if (periodicLifecycleEventFuture == null || (periodicLifecycleEventFuture != null && periodicLifecycleEventFuture.isDone())) {
            if (periodicLifecycleEventFuture != null && periodicLifecycleEventFuture.isDone()) {
                // There was an error executing the scheduled task, get it and log it
                try {
                    periodicLifecycleEventFuture.get();
                } catch (InterruptedException | ExecutionException e) {
                    log.error(sm.getString("standardServer.periodicEventError"), e);
                }
            }
            periodicLifecycleEventFuture = getUtilityExecutor().scheduleAtFixedRate(
                    () -> fireLifecycleEvent(Lifecycle.PERIODIC_EVENT, null), periodicEventDelay, periodicEventDelay, TimeUnit.SECONDS);
        }
    }
  • startInternal()
    1.触发监听器(configure_start)
    2.全局资源组件start()
    3.start()Server下的Services
    4.执行周期性监听器
 protected void stopInternal() throws LifecycleException {

        setState(LifecycleState.STOPPING);

        if (monitorFuture != null) {
            monitorFuture.cancel(true);
            monitorFuture = null;
        }
        if (periodicLifecycleEventFuture != null) {
            periodicLifecycleEventFuture.cancel(false);
            periodicLifecycleEventFuture = null;
        }

        fireLifecycleEvent(CONFIGURE_STOP_EVENT, null);

        // Stop our defined Services
        for (Service service : services) {
            service.stop();
        }

        globalNamingResources.stop();

        stopAwait();
    }

public void stopAwait() {
        stopAwait=true;
        Thread t = awaitThread;
        if (t != null) {
            ServerSocket s = awaitSocket;
            if (s != null) {
                awaitSocket = null;
                try {
                    s.close();
                } catch (IOException e) {
                    // Ignored
                }
            }
            t.interrupt();
            try {
                t.join(1000);
            } catch (InterruptedException e) {
                // Ignored
            }
        }
    }
  • stop阶段
    1.关闭周期性阶段的监听器的执行
    2.执行对应生命周期状态的监听器
    3.Services.stop()
    4.全局资源组件stop()
    5.stopAwait(),放开对主线程循环执行的来结束服务,看await()方法
protected void destroyInternal() throws LifecycleException {
        // Destroy our defined Services
        for (Service service : services) {
            service.destroy();
        }

        globalNamingResources.destroy();

        unregister(onameMBeanFactory);

        unregister(onameStringCache);

        if (utilityExecutor != null) {
            utilityExecutor.shutdownNow();
            unregister("type=UtilityExecutor");
            utilityExecutor = null;
        }

        super.destroyInternal();
    }
  • 销毁阶段
    1.services的销毁
    2.全局名称资源的销毁
    3.注销MBeanServer中的相关资源
    4.关闭线程池等
    5.执行父类销毁方法
监听器

启动过程会创建监听器,通过server.xml配置的

相关文章

网友评论

      本文标题:Server

      本文链接:https://www.haomeiwen.com/subject/crtthltx.html