美文网首页
SOFARPC 源码分析4 - 服务端启动流程

SOFARPC 源码分析4 - 服务端启动流程

作者: 原水寒 | 来源:发表于2018-11-24 15:46 被阅读49次

    服务端启动整体流程包含:准备工作 + 发布服务。

    准备工作

    1. RpcConfigs 读取并加载全局配置
    2. 使用 SPI 机制动态安装第三方模块(例如 SOFATracer、FaultTolerance 等)
    3. 添加优雅停机的关闭钩子线程
    4. 配置各种 Config(例如 RegistryConfig、ServerConfig、ProviderConfig 等)

    发布服务

    1. 使用 SPI 获取 ProviderBootstrap 实例(发布服务辅助类),默认是 DefaultProviderBootstrap 实例
    2. 使用 ProviderBootstrap 执行真正的服务发布操作

    2.1. 检查参数
    2.2. 构造请求处理链 ProviderProxyInvoker
    2.3. 使用 SPI 创建并初始化配置的多个 Registry 实例(支持多注册中心)
    2.4. 使用 SPI 创建并初始化 Server 实例,默认是 BoltServer
    2.5. 将 2.2 中构建的请求处理链 ProviderProxyInvoker 实例注册到 2.4 中创建好的 Server 实例中
    2.6. 启动 Server,对于默认的 BoltServer 来讲,这里会启动 Netty 客户端
    2.7. 创建 provider 配置变化监听器 ProviderAttributeListener(包括方法级和接口级的配置)并注册到 ProviderConfig
    2.8. 获取 2.3 中初始化好的注册中心 Registry,初始化并启动 Registry,最后注册服务到 Registry

    为什么要在 2.3 中提前进行 Registry 的创建和初始化,而不是在 2.8 中再进行操作?因为如果 2.3 中直接抛错的话,就不会在执行之后的操作了,实现了快速失败。

    一、准备工作

    1.1、RpcConfigs 读取并加载全局配置

    public class RpcConfigs {
        // 全部配置容器
        private final static ConcurrentMap<String, Object> CFG = new ConcurrentHashMap<>();
        // 配置变化监听器, key为属性key
        private final static ConcurrentMap<String, List<RpcConfigListener>> CFG_LISTENER = new ConcurrentHashMap<>();
    
        static {
            // 加载配置文件
            init();
        }
    
        /**
         * 配置文件的优先级从高到低:
         * 1. System.getProperties()
         * 2. 高order的sofa-rpc/rpc-config.json文件
         * 3. 低order的sofa-rpc/rpc-config.json文件
         * 4. 高order的META-INF/sofa-rpc/rpc-config.json文件
         * 5. 低order的META-INF/sofa-rpc/rpc-config.json文件
         * 6. rpc-config-default.json
         */
        private static void init() {
            // 1. 读取解析设置默认的全局配置文件 rpc-config-default.json 
            String json = FileUtils.file2String(RpcConfigs.class, "rpc-config-default.json", "UTF-8");
            Map map = JSON.parseObject(json, Map.class);
            CFG.putAll(map);
    
            // 2. 读取解析设置自定义的全局配置文件
            loadCustom("sofa-rpc/rpc-config.json");
            loadCustom("META-INF/sofa-rpc/rpc-config.json");
    
            // 3. 读取设置系统属性:注意部分属性可能被覆盖为字符串
            CFG.putAll(new HashMap(System.getProperties())); 
        }
    
        /**
         * 加载自定义配置文件
         */
        private static void loadCustom(String fileName) throws IOException {
            ClassLoader classLoader = ClassLoaderUtils.getClassLoader(RpcConfigs.class);
            Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fileName) : ClassLoader.getSystemResources(fileName);
            if (urls != null) { // 可能存在多个文件
                List<CfgFile> allFile = new ArrayList<CfgFile>();
                while (urls.hasMoreElements()) {
                    // 读取每一个文件
                    URL url = urls.nextElement();
                    InputStreamReader input = new InputStreamReader(url.openStream(), "utf-8");
                    BufferedReader reader = new BufferedReader(input);
                    StringBuilder context = new StringBuilder();
                    String line;
                    while ((line = reader.readLine()) != null) {
                        context.append(line).append("\n");
                    }
                    Map map = JSON.parseObject(context.toString(), Map.class);
                    // 当前配置文件的 order,order 越大优先级越高,即越后加载,即会覆盖前边文件的同名配置
                    Integer order = (Integer) map.get(RpcOptions.RPC_CFG_ORDER);
                    allFile.add(new CfgFile(url, order == null ? 0 : order, map));
                }
                Collections.sort(allFile, new OrderedComparator<CfgFile>()); // 从小到大排下序
                for (CfgFile file : allFile) {
                    CFG.putAll(file.getMap()); // 顺序加载,越大越后加载
                }
            }
        }
    
        /**
         * Put value.
         * @param key      the key
         * @param newValue the new value
         */
        public static void putValue(String key, Object newValue) {
            Object oldValue = CFG.get(key);
            if (changed(oldValue, newValue)) {
                // 1. 设置属性
                CFG.put(key, newValue);
                // 2. 获取属性key的值变化监听器,执行监听器逻辑
                List<RpcConfigListener> rpcConfigListeners = CFG_LISTENER.get(key);
                if (CommonUtils.isNotEmpty(rpcConfigListeners)) {
                    for (RpcConfigListener rpcConfigListener : rpcConfigListeners) {
                        rpcConfigListener.onChange(oldValue, newValue);
                    }
                }
            }
        }
    
        /**
         * 订阅配置变化
         * @param key      属性key
         * @param listener 配置监听器
         */
        public static synchronized void subscribe(String key, RpcConfigListener listener) {
            List<RpcConfigListener> listeners = CFG_LISTENER.get(key);
            if (listeners == null) {
                listeners = new ArrayList<RpcConfigListener>();
                CFG_LISTENER.put(key, listeners);
            }
            listeners.add(listener);
        }
    
        /**
         * 取消订阅配置变化
         * @param key      属性key
         * @param listener 配置监听器
         */
        public static synchronized void unSubscribe(String key, RpcConfigListener listener) {
            List<RpcConfigListener> listeners = CFG_LISTENER.get(key);
            if (listeners != null) {
                listeners.remove(listener);
                if (listeners.size() == 0) {
                    CFG_LISTENER.remove(key);
                }
            }
        }
    
        /**
         * 值是否发生变化
         * @param oldObj 旧值
         * @param newObj 新值
         */
        protected static boolean changed(Object oldObj, Object newObj) {
            return oldObj == null ? newObj != null : !oldObj.equals(newObj);
        }
    }
    

    注意点:

    1. 配置文件的优先级从高到低:(自定义配置文件的配置order由 rpc.config.order 来指定,该配置直接配置在配置文件中,参考 rpc-config-default.json
    • System.getProperties()
    • 高order的sofa-rpc/rpc-config.json文件
    • 低order的sofa-rpc/rpc-config.json文件
    • 高order的META-INF/sofa-rpc/rpc-config.json文件
    • 低order的META-INF/sofa-rpc/rpc-config.json文件
    • rpc-config-default.json
    1. 加载配置文件的执行时机:

    (1)static 块的执行时机 - 所在类被初始化的时候:https://blog.csdn.net/berber78/article/details/46472789
    当执行 RpcConfigs 类的 static 方法时,RpcConfigs 类会被初始化,此时会执行其 static 块,进而进行配置文件的加载。
    (2)在 com.alipay.sofa.rpc.log.LoggerFactory#getLogger 执行的时候,先执行其如下 static 方法:

    private static String implClass = RpcConfigs.getStringValue(RpcOptions.LOGGER_IMPL);
    

    这里执行了 RpcConfigs 类的 static 方法 getStringValue,所以此时会做配置文件的加载操作。
    (3)各种 Config 的顶级父级抽象类 AbstractIdConfig 的 static 块如下

       static {
           RpcRuntimeContext.now();
       }
    

    RpcRuntimeContext 会执行 com.alipay.sofa.rpc.log.LoggerFactory#getLogger,接下来的 1.2 和 1.3 流程是在 RpcRuntimeContext 的 static 中执行的。

    1.2、使用 SPI 机制动态安装第三方模块

    public class RpcRuntimeContext {
        static {
            put(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION);
            // 初始化一些上下文
            initContext();
            // 初始化其它模块
            ModuleFactory.installModules();
            // 增加jvm关闭事件
            if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) {
                Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                    @Override
                    public void run() {
                        destroy(false);
                    }
                }, "SOFA-RPC-ShutdownHook"));
            }
        }
    }
    

    模块的安装代码在 ModuleFactory 中,实现流程如下:

    public class ModuleFactory {
        // 已加载的模块
        static final ConcurrentMap<String, Module> INSTALLED_MODULES = new ConcurrentHashMap<String, Module>();
    
        // 是否需要加载指定 moduleName 的模块:具体见 ModuleFactoryTest
        static boolean needLoad(String moduleLoadList, String moduleName) {
            String[] activatedModules = StringUtils.splitWithCommaOrSemicolon(moduleLoadList);
            boolean match = false;
            for (String activatedModule : activatedModules) {
                if (StringUtils.ALL.equals(activatedModule)) {
                    match = true;
                } else if (activatedModule.equals(moduleName)) {
                    match = true;
                } else if (match && (activatedModule.equals("!" + moduleName)
                        || activatedModule.equals("-" + moduleName))) {
                    match = false;
                    break;
                }
            }
            return match;
        }
    
        // 加载全部模块
        public static void installModules() {
            ExtensionLoader<Module> loader = ExtensionLoaderFactory.getExtensionLoader(Module.class);
            // 默认配置为 *
            String moduleLoadList = RpcConfigs.getStringValue(RpcOptions.MODULE_LOAD_LIST);
            for (Map.Entry<String, ExtensionClass<Module>> o : loader.getAllExtensions().entrySet()) {
                String moduleName = o.getKey();
                Module module = o.getValue().getExtInstance();
                // 全局配置文件配置
                if (needLoad(moduleLoadList, moduleName)) {
                    // Module 的实现本身也可以设置可以加载的条件
                    if (module.needLoad()) {
                        // 执行 module 的加载操作
                        module.install();
                        // 添加到已加载 module 列表
                        INSTALLED_MODULES.put(moduleName, module);
                    }
                }
            }
        }
    
        // 卸载全部模块
        public static void uninstallModules() {
            for (Map.Entry<String, Module> o : INSTALLED_MODULES.entrySet()) {
                String moduleName = o.getKey();
                o.getValue().uninstall();
                INSTALLED_MODULES.remove(moduleName);
            }
        }
    
        // 卸载模块
        public static void uninstallModule(String moduleName) {
            Module module = INSTALLED_MODULES.get(moduleName);
            if (module != null) {
                module.uninstall();
                INSTALLED_MODULES.remove(moduleName);
            }
        }
    }
    

    各种具体的模块实现在后续分析中进行分析

    1.3、添加优雅停机的关闭钩子线程

    优雅停机会在《SOFARPC 源码分析 - 优雅停机的设计与实现》中分析

    1.4、配置各种 Config

    SOFARPC 源码分析3 - 各种 Config 配置类

    二、发布服务

    2.1、使用 SPI 获取 ProviderBootstrap 实例

    image.png

    Bootstrap 有两类,ProviderBootstrap 和 ConsumerBootstrap。二者均是 SPI 可扩展抽象类。

    sofa = DefaultProviderBootstrap
    dubbo = DubboProviderBootstrap
    bolt = BoltProviderBootstrap
    h2c = Http2ClearTextProviderBootstrap
    rest = RestProviderBootstrap

    ======================= ProviderConfig =======================
        public synchronized void export() {
            if (providerBootstrap == null) {
                providerBootstrap = Bootstraps.from(this);
            }
            providerBootstrap.export();
        }
    
    ======================= Bootstraps =======================
        public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
            String bootstrap = providerConfig.getBootstrap();
            if (StringUtils.isEmpty(bootstrap)) {
                // 默认 sofa,即 DefaultProviderBootstrap
                bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
                providerConfig.setBootstrap(bootstrap);
            }
            // spi 获取 ProviderBootstrap 实现类
            ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)
                .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });
            return (ProviderBootstrap<T>) providerBootstrap;
        }
    
    ======================= ProviderBootstrap =======================
    /**
     * 发布服务的包装类,模板模式
     */
    @Extensible(singleton = false)
    public abstract class ProviderBootstrap<T> {
        // 服务发布者配置
        protected final ProviderConfig<T> providerConfig;
    
        // 发布一个服务
        public abstract void export();
        // 反发布一个服务
        public abstract void unExport();
    }
    
    ======================= DefaultProviderBootstrap =======================
    @Extension("sofa")
    public class DefaultProviderBootstrap<T> extends ProviderBootstrap<T> {
        // 服务端Invoker对象
        protected transient Invoker providerProxyInvoker;
    
        @Override
        public void export() {
            if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
                Thread thread = factory.newThread(new Runnable() {
                    @Override
                    public void run() {
                        Thread.sleep(providerConfig.getDelay());
                        doExport();
                    }
                });
                thread.start();
            } else {
                doExport();
            }
        }
    
        private void doExport() {
            /**
             * 1. 检查参数
             * a. 检查注入的 ref 是否接口实现类
             * b. 加载接口实现类
             * c. 检查 server 是否为空
             * c. 方法黑白名单的设置
             */
            checkParameters();
    
            // 2. 构造请求处理链
            providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
            // 3. 如果配置为需要注册,则 SPI 创建并初始化 Registry 实例
            if (providerConfig.isRegister()) {
                for (RegistryConfig registryConfig : providerConfig.getRegistry()) {
                    RegistryFactory.getRegistry(registryConfig);
                }
            }
            
            for (ServerConfig serverConfig : providerConfig.getServer()) {
                // 4. SPI创建并初始化Server实例
                Server server = serverConfig.buildIfAbsent();
                // 5. 注册最底层的业务逻辑处理器 - 请求处理链
                server.registerProcessor(providerConfig, providerProxyInvoker);
                if (serverConfig.isAutoStart()) {
                    // 6. 启动 Server,对于默认的 BoltServer 来讲,这里会启动 Netty 客户端
                    server.start();
                }
            }
    
            // 7. Provider配置发生变化监听器,如果配置发生变化,需要重新 export
            providerConfig.setConfigListener(new ProviderAttributeListener());
            // 8. 注册到注册中心
            register();
        }
    }
    

    2.2、构造请求处理链 ProviderProxyInvoker

    SOFARPC 源码分析5 - 服务端调用链的设计与实现

    2.3、使用 SPI 创建并初始化配置的多个 Registry 实例

    见后续的《SOFARPC 源码解析 - 注册中心的设计与实现》

    2.4、使用 SPI 创建并初始化 Server 实例 + 注册请求处理链 ProviderProxyInvoker 到 Server 实例 + 启动 Server

    SOFARPC 源码分析6 - 通信层的设计与实现

    2.5、创建 provider 配置变化监听器 ProviderAttributeListener 并注册到 ProviderConfig 中

    见后续的《SOFARPC 源码解析 - 配置监听器的设计与实现》

    2.6、初始化并启动 Registry + 注册服务到 Registry

        protected void register() {
            if (providerConfig.isRegister()) {
                for (RegistryConfig registryConfig : providerConfig.getRegistry()) {
                    // 如果已经实例化了注册中心,此处直接获取,否则使用SPI创建Registry实例
                    Registry registry = RegistryFactory.getRegistry(registryConfig);
                    // 初始化Registry,对于Zookeeper来讲,会创建ZkClient客户端
                    registry.init();
                    // 启动Registry
                    registry.start();
                    // 注册provider到Registry
                    registry.register(providerConfig);
                }
            }
        }
    

    详细源码分析见后续的《SOFARPC 源码解析 - 注册中心的设计与实现》

    相关文章

      网友评论

          本文标题:SOFARPC 源码分析4 - 服务端启动流程

          本文链接:https://www.haomeiwen.com/subject/ttuvfqtx.html