服务端启动整体流程包含:准备工作 + 发布服务。
准备工作
RpcConfigs
读取并加载全局配置- 使用 SPI 机制动态安装第三方模块(例如 SOFATracer、FaultTolerance 等)
- 添加优雅停机的关闭钩子线程
- 配置各种 Config(例如 RegistryConfig、ServerConfig、ProviderConfig 等)
发布服务
- 使用 SPI 获取
ProviderBootstrap
实例(发布服务辅助类),默认是 DefaultProviderBootstrap 实例- 使用
ProviderBootstrap
执行真正的服务发布操作2.1. 检查参数
2.2. 构造请求处理链ProviderProxyInvoker
2.3. 使用 SPI 创建并初始化配置的多个Registry
实例(支持多注册中心)
2.4. 使用 SPI 创建并初始化Server
实例,默认是 BoltServer
2.5. 将 2.2 中构建的请求处理链ProviderProxyInvoker
实例注册到 2.4 中创建好的Server
实例中
2.6. 启动Server
,对于默认的 BoltServer 来讲,这里会启动 Netty 客户端
2.7. 创建 provider 配置变化监听器 ProviderAttributeListener(包括方法级和接口级的配置)并注册到ProviderConfig
中
2.8. 获取 2.3 中初始化好的注册中心Registry
,初始化并启动Registry
,最后注册服务到Registry
为什么要在 2.3 中提前进行
Registry
的创建和初始化,而不是在 2.8 中再进行操作?因为如果 2.3 中直接抛错的话,就不会在执行之后的操作了,实现了快速失败。
一、准备工作
1.1、RpcConfigs 读取并加载全局配置
public class RpcConfigs {
// 全部配置容器
private final static ConcurrentMap<String, Object> CFG = new ConcurrentHashMap<>();
// 配置变化监听器, key为属性key
private final static ConcurrentMap<String, List<RpcConfigListener>> CFG_LISTENER = new ConcurrentHashMap<>();
static {
// 加载配置文件
init();
}
/**
* 配置文件的优先级从高到低:
* 1. System.getProperties()
* 2. 高order的sofa-rpc/rpc-config.json文件
* 3. 低order的sofa-rpc/rpc-config.json文件
* 4. 高order的META-INF/sofa-rpc/rpc-config.json文件
* 5. 低order的META-INF/sofa-rpc/rpc-config.json文件
* 6. rpc-config-default.json
*/
private static void init() {
// 1. 读取解析设置默认的全局配置文件 rpc-config-default.json
String json = FileUtils.file2String(RpcConfigs.class, "rpc-config-default.json", "UTF-8");
Map map = JSON.parseObject(json, Map.class);
CFG.putAll(map);
// 2. 读取解析设置自定义的全局配置文件
loadCustom("sofa-rpc/rpc-config.json");
loadCustom("META-INF/sofa-rpc/rpc-config.json");
// 3. 读取设置系统属性:注意部分属性可能被覆盖为字符串
CFG.putAll(new HashMap(System.getProperties()));
}
/**
* 加载自定义配置文件
*/
private static void loadCustom(String fileName) throws IOException {
ClassLoader classLoader = ClassLoaderUtils.getClassLoader(RpcConfigs.class);
Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fileName) : ClassLoader.getSystemResources(fileName);
if (urls != null) { // 可能存在多个文件
List<CfgFile> allFile = new ArrayList<CfgFile>();
while (urls.hasMoreElements()) {
// 读取每一个文件
URL url = urls.nextElement();
InputStreamReader input = new InputStreamReader(url.openStream(), "utf-8");
BufferedReader reader = new BufferedReader(input);
StringBuilder context = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
context.append(line).append("\n");
}
Map map = JSON.parseObject(context.toString(), Map.class);
// 当前配置文件的 order,order 越大优先级越高,即越后加载,即会覆盖前边文件的同名配置
Integer order = (Integer) map.get(RpcOptions.RPC_CFG_ORDER);
allFile.add(new CfgFile(url, order == null ? 0 : order, map));
}
Collections.sort(allFile, new OrderedComparator<CfgFile>()); // 从小到大排下序
for (CfgFile file : allFile) {
CFG.putAll(file.getMap()); // 顺序加载,越大越后加载
}
}
}
/**
* Put value.
* @param key the key
* @param newValue the new value
*/
public static void putValue(String key, Object newValue) {
Object oldValue = CFG.get(key);
if (changed(oldValue, newValue)) {
// 1. 设置属性
CFG.put(key, newValue);
// 2. 获取属性key的值变化监听器,执行监听器逻辑
List<RpcConfigListener> rpcConfigListeners = CFG_LISTENER.get(key);
if (CommonUtils.isNotEmpty(rpcConfigListeners)) {
for (RpcConfigListener rpcConfigListener : rpcConfigListeners) {
rpcConfigListener.onChange(oldValue, newValue);
}
}
}
}
/**
* 订阅配置变化
* @param key 属性key
* @param listener 配置监听器
*/
public static synchronized void subscribe(String key, RpcConfigListener listener) {
List<RpcConfigListener> listeners = CFG_LISTENER.get(key);
if (listeners == null) {
listeners = new ArrayList<RpcConfigListener>();
CFG_LISTENER.put(key, listeners);
}
listeners.add(listener);
}
/**
* 取消订阅配置变化
* @param key 属性key
* @param listener 配置监听器
*/
public static synchronized void unSubscribe(String key, RpcConfigListener listener) {
List<RpcConfigListener> listeners = CFG_LISTENER.get(key);
if (listeners != null) {
listeners.remove(listener);
if (listeners.size() == 0) {
CFG_LISTENER.remove(key);
}
}
}
/**
* 值是否发生变化
* @param oldObj 旧值
* @param newObj 新值
*/
protected static boolean changed(Object oldObj, Object newObj) {
return oldObj == null ? newObj != null : !oldObj.equals(newObj);
}
}
注意点:
- 配置文件的优先级从高到低:(自定义配置文件的配置order由
rpc.config.order
来指定,该配置直接配置在配置文件中,参考rpc-config-default.json
)
- System.getProperties()
- 高order的sofa-rpc/rpc-config.json文件
- 低order的sofa-rpc/rpc-config.json文件
- 高order的META-INF/sofa-rpc/rpc-config.json文件
- 低order的META-INF/sofa-rpc/rpc-config.json文件
- rpc-config-default.json
- 加载配置文件的执行时机:
(1)static 块的执行时机 - 所在类被初始化的时候:https://blog.csdn.net/berber78/article/details/46472789
当执行RpcConfigs
类的 static 方法时,RpcConfigs
类会被初始化,此时会执行其 static 块,进而进行配置文件的加载。
(2)在com.alipay.sofa.rpc.log.LoggerFactory#getLogger
执行的时候,先执行其如下 static 方法:private static String implClass = RpcConfigs.getStringValue(RpcOptions.LOGGER_IMPL);
这里执行了
RpcConfigs
类的 static 方法 getStringValue,所以此时会做配置文件的加载操作。
(3)各种 Config 的顶级父级抽象类 AbstractIdConfig 的 static 块如下static { RpcRuntimeContext.now(); }
RpcRuntimeContext 会执行
com.alipay.sofa.rpc.log.LoggerFactory#getLogger
,接下来的 1.2 和 1.3 流程是在 RpcRuntimeContext 的 static 中执行的。
1.2、使用 SPI 机制动态安装第三方模块
public class RpcRuntimeContext {
static {
put(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION);
// 初始化一些上下文
initContext();
// 初始化其它模块
ModuleFactory.installModules();
// 增加jvm关闭事件
if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
destroy(false);
}
}, "SOFA-RPC-ShutdownHook"));
}
}
}
模块的安装代码在 ModuleFactory 中,实现流程如下:
public class ModuleFactory {
// 已加载的模块
static final ConcurrentMap<String, Module> INSTALLED_MODULES = new ConcurrentHashMap<String, Module>();
// 是否需要加载指定 moduleName 的模块:具体见 ModuleFactoryTest
static boolean needLoad(String moduleLoadList, String moduleName) {
String[] activatedModules = StringUtils.splitWithCommaOrSemicolon(moduleLoadList);
boolean match = false;
for (String activatedModule : activatedModules) {
if (StringUtils.ALL.equals(activatedModule)) {
match = true;
} else if (activatedModule.equals(moduleName)) {
match = true;
} else if (match && (activatedModule.equals("!" + moduleName)
|| activatedModule.equals("-" + moduleName))) {
match = false;
break;
}
}
return match;
}
// 加载全部模块
public static void installModules() {
ExtensionLoader<Module> loader = ExtensionLoaderFactory.getExtensionLoader(Module.class);
// 默认配置为 *
String moduleLoadList = RpcConfigs.getStringValue(RpcOptions.MODULE_LOAD_LIST);
for (Map.Entry<String, ExtensionClass<Module>> o : loader.getAllExtensions().entrySet()) {
String moduleName = o.getKey();
Module module = o.getValue().getExtInstance();
// 全局配置文件配置
if (needLoad(moduleLoadList, moduleName)) {
// Module 的实现本身也可以设置可以加载的条件
if (module.needLoad()) {
// 执行 module 的加载操作
module.install();
// 添加到已加载 module 列表
INSTALLED_MODULES.put(moduleName, module);
}
}
}
}
// 卸载全部模块
public static void uninstallModules() {
for (Map.Entry<String, Module> o : INSTALLED_MODULES.entrySet()) {
String moduleName = o.getKey();
o.getValue().uninstall();
INSTALLED_MODULES.remove(moduleName);
}
}
// 卸载模块
public static void uninstallModule(String moduleName) {
Module module = INSTALLED_MODULES.get(moduleName);
if (module != null) {
module.uninstall();
INSTALLED_MODULES.remove(moduleName);
}
}
}
各种具体的模块实现在后续分析中进行分析
1.3、添加优雅停机的关闭钩子线程
优雅停机会在《SOFARPC 源码分析 - 优雅停机的设计与实现》中分析
1.4、配置各种 Config
二、发布服务
2.1、使用 SPI 获取 ProviderBootstrap 实例
image.png
Bootstrap 有两类,ProviderBootstrap 和 ConsumerBootstrap。二者均是 SPI 可扩展抽象类。
sofa = DefaultProviderBootstrap
dubbo = DubboProviderBootstrap
bolt = BoltProviderBootstrap
h2c = Http2ClearTextProviderBootstrap
rest = RestProviderBootstrap
======================= ProviderConfig =======================
public synchronized void export() {
if (providerBootstrap == null) {
providerBootstrap = Bootstraps.from(this);
}
providerBootstrap.export();
}
======================= Bootstraps =======================
public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
String bootstrap = providerConfig.getBootstrap();
if (StringUtils.isEmpty(bootstrap)) {
// 默认 sofa,即 DefaultProviderBootstrap
bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
providerConfig.setBootstrap(bootstrap);
}
// spi 获取 ProviderBootstrap 实现类
ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)
.getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });
return (ProviderBootstrap<T>) providerBootstrap;
}
======================= ProviderBootstrap =======================
/**
* 发布服务的包装类,模板模式
*/
@Extensible(singleton = false)
public abstract class ProviderBootstrap<T> {
// 服务发布者配置
protected final ProviderConfig<T> providerConfig;
// 发布一个服务
public abstract void export();
// 反发布一个服务
public abstract void unExport();
}
======================= DefaultProviderBootstrap =======================
@Extension("sofa")
public class DefaultProviderBootstrap<T> extends ProviderBootstrap<T> {
// 服务端Invoker对象
protected transient Invoker providerProxyInvoker;
@Override
public void export() {
if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override
public void run() {
Thread.sleep(providerConfig.getDelay());
doExport();
}
});
thread.start();
} else {
doExport();
}
}
private void doExport() {
/**
* 1. 检查参数
* a. 检查注入的 ref 是否接口实现类
* b. 加载接口实现类
* c. 检查 server 是否为空
* c. 方法黑白名单的设置
*/
checkParameters();
// 2. 构造请求处理链
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
// 3. 如果配置为需要注册,则 SPI 创建并初始化 Registry 实例
if (providerConfig.isRegister()) {
for (RegistryConfig registryConfig : providerConfig.getRegistry()) {
RegistryFactory.getRegistry(registryConfig);
}
}
for (ServerConfig serverConfig : providerConfig.getServer()) {
// 4. SPI创建并初始化Server实例
Server server = serverConfig.buildIfAbsent();
// 5. 注册最底层的业务逻辑处理器 - 请求处理链
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
// 6. 启动 Server,对于默认的 BoltServer 来讲,这里会启动 Netty 客户端
server.start();
}
}
// 7. Provider配置发生变化监听器,如果配置发生变化,需要重新 export
providerConfig.setConfigListener(new ProviderAttributeListener());
// 8. 注册到注册中心
register();
}
}
2.2、构造请求处理链 ProviderProxyInvoker
2.3、使用 SPI 创建并初始化配置的多个 Registry 实例
见后续的《SOFARPC 源码解析 - 注册中心的设计与实现》
2.4、使用 SPI 创建并初始化 Server 实例 + 注册请求处理链 ProviderProxyInvoker 到 Server 实例 + 启动 Server
2.5、创建 provider 配置变化监听器 ProviderAttributeListener 并注册到 ProviderConfig 中
见后续的《SOFARPC 源码解析 - 配置监听器的设计与实现》
2.6、初始化并启动 Registry + 注册服务到 Registry
protected void register() {
if (providerConfig.isRegister()) {
for (RegistryConfig registryConfig : providerConfig.getRegistry()) {
// 如果已经实例化了注册中心,此处直接获取,否则使用SPI创建Registry实例
Registry registry = RegistryFactory.getRegistry(registryConfig);
// 初始化Registry,对于Zookeeper来讲,会创建ZkClient客户端
registry.init();
// 启动Registry
registry.start();
// 注册provider到Registry
registry.register(providerConfig);
}
}
}
详细源码分析见后续的《SOFARPC 源码解析 - 注册中心的设计与实现》
网友评论